I am implementing a thread pool where workers sleep when there is no work ready, and the main thread sleeps when workers are busy. I noticed the worker threads are proceeding to work after calling wait()
even though the main thread did not notify_all()
.
The output looks something like below:
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
......
Worker function:
void TaskSystemParallelThreadPoolSleeping::waitFunc() {
std::unique_lock<std::mutex> lock(*this->mutex_);
while(true) {
this->num_wait++;
std::cout << "WORKER WAIT" << std::endl;
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
std::cout << "WORKER AWAKENS!" << std::endl;
if (this->done_flag == true) {
this->mutex_->unlock();
break;
}
this->mutex_->unlock();
std::cout << "WORKER START" << std::endl;
while (true) {
this->mutex_->lock();
if (this->not_done == 0) { // ALL work done
if (this->total_work != 0) { // 1st time seen by workers
this->total_work = 0;
this->num_wait = 0;
std::cout << "WORKER WAKE MAIN" << std::endl;
this->mutex_->unlock();
this->cond_->notify_all();
}
this->mutex_->unlock();
break;
}
int total = this->total_work;
int id = this->work_counter;
if (id == total) { // NO work initiated or NO work left
this->mutex_->unlock();
continue;
}
++(this->work_counter); // increment counter
this->mutex_->unlock(); // Let others access counters to work
this->runnable->runTask(id, total); // do work
this->mutex_->lock();
--(this->not_done); // decrement counter after work done
this->mutex_->unlock();
}
std::cout << "WORKER DONE" << std::endl;
}
std::cout << "WORKER TERMINATE" << std::endl;
}
Main thread:
void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
//
// TODO: CS149 students will modify the implementation of this
// method in Part A. The implementation provided below runs all
// tasks sequentially on the calling thread.
// Set-up work
this->mutex_->lock();
std::cout << "MAIN SETUP" << std::endl;
this->runnable = runnable;
this->work_counter = 0;
this->not_done = num_total_tasks;
this->total_work = num_total_tasks;
// Tell workers there is work
std::cout << "MAIN POLLS READINESS" << std::endl;
while (this->num_wait < this->num_T) { // Check if all ready
this->mutex_->unlock();
this->mutex_->lock();
}
std::cout << "ALL WORKERS READY" << std::endl;
this->mutex_->unlock();
this->cond_->notify_all();
// Wait for workers to complete work
std::unique_lock<std::mutex> lock(*this->mutex_);
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeMain, this));
std::cout << "MAIN END" << std::endl;
}
Condition to wake worker:
bool TaskSystemParallelThreadPoolSleeping::wakeWorker() {
return (this->done_flag == true ||
(this->total_work != 0 && this->num_wait == this->num_T));
}
Condition to wake main thread:
bool TaskSystemParallelThreadPoolSleeping::wakeMain() {
return this->total_work == 0;
}
Thread Pool Constructor:
TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
//
// TODO: CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
this->num_T = std::max(1, num_threads - 1);
this->threads = new std::thread[this->num_T];
this->mutex_ = new std::mutex();
this->cond_ = new std::condition_variable();
this->total_work = 0;
this->not_done = 0;
this->work_counter = 0;
this->num_wait = 0;
this->done_flag = {false};
for (int i = 0; i < this->num_T; i++) {
this->threads[i] = std::thread(&TaskSystemParallelThreadPoolSleeping::waitFunc, this);
}
}
Thread pool destructor:
TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
this->done_flag = true;
this->cond_->notify_all();
for (int i = 0; i < this->num_T; i++) {
this->threads[i].join();
}
delete this->mutex_;
delete[] this->threads;
delete this->cond_;
}
I think the beginning should be:
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
......
Ie. workers should only awaken after main's notify_all()
Aucun commentaire:
Enregistrer un commentaire