We have implemented TaskRunner whose functions will be called by different threads to start, stop and post tasks. TaskRunner will internally create a thread and if the queue is not empty, it will pop the task from queue and executes it. Start() will check if the thread is running. If not creates a new thread. Stop() will join the thread. The code is as below.
bool TaskRunnerImpl::PostTask(Task* task) {
tasks_queue_.push_back(task);
return true;
}
void TaskRunnerImpl::Start() {
std::lock_guard<std::mutex> lock(is_running_mutex_);
if(is_running_) {
return;
}
is_running_ = true;
runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);
}
void TaskRunnerImpl::Run() {
while(is_running_) {
if(tasks_queue_.empty()) {
continue;
}
Task* task_to_run = tasks_queue_.front();
task_to_run->Run();
tasks_queue_.pop_front();
delete task_to_run;
}
}
void TaskRunnerImpl::Stop() {
std::lock_guard<std::mutex> lock(is_running_mutex_);
is_running_ = false;
if(runner_thread_.joinable()) {
runner_thread_.join();
}
}
We want to use conditional variables now otherwise the thread will be continuously checking whether the task queue is empty or not. We implemented as below.
- Thread function (Run()) will wait on condition variable.
- PostTask() will signal if some one posts a task.
- Stop() will signal if some one calls stop.
Code is as below.
bool TaskRunnerImpl::PostTask(Task* task) {
std::lock_guard<std::mutex> taskGuard(m_task_mutex);
tasks_queue_.push_back(task);
m_task_cond_var.notify_one();
return true;
}
void TaskRunnerImpl::Start() {
std::lock_guard<std::mutex> lock(is_running_mutex_);
if(is_running_) {
return;
}
is_running_ = true;
runner_thread_ = std::thread(&TaskRunnerImpl::Run, this);
}
void TaskRunnerImpl::Run() {
while(is_running_) {
Task* task_to_run = nullptr;
{
std::unique_lock<std::mutex> mlock(m_task_mutex);
m_task_cond_var.wait(mlock, [this]() {
return !(is_running_ && tasks_queue_.empty());
});
if(!is_running_) {
return;
}
if(!tasks_queue_.empty()) {
task_to_run = tasks_queue_.front();
task_to_run->Run();
tasks_queue_.pop_front();
}
}
if(task_to_run)
delete task_to_run;
}
}
void TaskRunnerImpl::Stop() {
std::lock_guard<std::mutex> lock(is_running_mutex_);
is_running_ = false;
m_task_cond_var.notify_one();
if(runner_thread_.joinable()) {
runner_thread_.join();
}
}
I have couple of questions as below. Can some one please help me to understand these.
-
Condition variable m_task_cond_var is linked with mutex m_task_mutex. But Stop() already locks mutex is_running_mutex to gaurd 'is_running_'. Do I need to lock m_task_mutex before signaling? Here I am not convinced why to lock m_task_mutex as we are not protecting any thing related to task queue.
-
In Thread function(Run()), we are reading is_running_ without locking is_running_mutex. Is this correct?
Aucun commentaire:
Enregistrer un commentaire