mardi 22 octobre 2019

Sleeping Threadpool: worker thread awakens without notify() from main thread

I am implementing a thread pool where workers sleep when there is no work ready, and the main thread sleeps when workers are busy. I noticed the worker threads are proceeding to work after calling wait() even though the main thread did not notify_all().

The output looks something like below:

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
......

Worker function:

void TaskSystemParallelThreadPoolSleeping::waitFunc() {
    std::unique_lock<std::mutex> lock(*this->mutex_);
    while(true) {
        this->num_wait++;
        std::cout << "WORKER WAIT" << std::endl;
        this->cond_->wait(lock,
                        std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
        std::cout << "WORKER AWAKENS!" << std::endl;
        if (this->done_flag == true) {
            this->mutex_->unlock();
            break;
        }
        this->mutex_->unlock();

        std::cout << "WORKER START" << std::endl;
        while (true) {
            this->mutex_->lock();

            if (this->not_done == 0) {  // ALL work done
                if (this->total_work != 0) {  // 1st time seen by workers
                    this->total_work = 0;
                    this->num_wait = 0;
                    std::cout << "WORKER WAKE MAIN" << std::endl;
                    this->mutex_->unlock();
                    this->cond_->notify_all();
                }
                this->mutex_->unlock();
                break;
            }

            int total = this->total_work;
            int id = this->work_counter;
            if (id == total) {  // NO work initiated or NO work left
                this->mutex_->unlock();
                continue;
            }

            ++(this->work_counter);  // increment counter
            this->mutex_->unlock();  // Let others access counters to work

            this->runnable->runTask(id, total); // do work

            this->mutex_->lock();
            --(this->not_done); // decrement counter after work done
            this->mutex_->unlock();
        }
        std::cout << "WORKER DONE" << std::endl;
    }
    std::cout << "WORKER TERMINATE" << std::endl;
}

Main thread:

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
    //
    // TODO: CS149 students will modify the implementation of this
    // method in Part A.  The implementation provided below runs all
    // tasks sequentially on the calling thread.

    // Set-up work
    this->mutex_->lock();
    std::cout << "MAIN SETUP" << std::endl;
    this->runnable = runnable;
    this->work_counter = 0;
    this->not_done = num_total_tasks;
    this->total_work = num_total_tasks;

    // Tell workers there is work
    std::cout << "MAIN POLLS READINESS" << std::endl;
    while (this->num_wait < this->num_T) {  // Check if all ready
        this->mutex_->unlock();
        this->mutex_->lock();
    }
    std::cout << "ALL WORKERS READY" << std::endl;
    this->mutex_->unlock();
    this->cond_->notify_all();

    // Wait for workers to complete work
    std::unique_lock<std::mutex> lock(*this->mutex_);
    this->cond_->wait(lock,
                    std::bind(&TaskSystemParallelThreadPoolSleeping::wakeMain, this));
    std::cout << "MAIN END" << std::endl;
}

Condition to wake worker:

bool TaskSystemParallelThreadPoolSleeping::wakeWorker() {
    return (this->done_flag == true ||
                    (this->total_work != 0 && this->num_wait == this->num_T));
}

Condition to wake main thread:

bool TaskSystemParallelThreadPoolSleeping::wakeMain() {
    return this->total_work == 0;
}

Thread Pool Constructor:

TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
    //
    // TODO: CS149 student implementations may decide to perform setup
    // operations (such as thread pool construction) here.
    // Implementations are free to add new class member variables
    // (requiring changes to tasksys.h).
    //
    this->num_T = std::max(1, num_threads - 1);
    this->threads = new std::thread[this->num_T];
    this->mutex_ = new std::mutex();
    this->cond_ = new std::condition_variable();

    this->total_work = 0;
    this->not_done = 0;
    this->work_counter = 0;
    this->num_wait = 0;
    this->done_flag = {false};

    for (int i = 0; i < this->num_T; i++) {
        this->threads[i] = std::thread(&TaskSystemParallelThreadPoolSleeping::waitFunc, this);
    }
}

Thread pool destructor:

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
    this->done_flag = true;
    this->cond_->notify_all();

    for (int i = 0; i < this->num_T; i++) {
        this->threads[i].join();
    }

    delete this->mutex_;
    delete[] this->threads;
    delete this->cond_;
}

I think the beginning should be:

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
......

Ie. workers should only awaken after main's notify_all()

Aucun commentaire:

Enregistrer un commentaire