Im trying to implement 3 additional threads to my main application that do non shared operations.
At first i thought its working, because if i uncomment the last printf call in the WorkerThread function, its not locking at WaitThread() after a random period of time. Sometimes it takes a few seconds till it locks at the mWaitCond.Wait() function, sometimes right after the start. printf seems to fix the timing of the threads.
The application doesnt crash, simply cpu usage of the application goes to 0% (and of each thread) and its not-responsive. Halting in the visual studio debugger shows the line while(mWakeUp) mWaitCondition.Wait() in the WaitThread() function as the current position. It also shows that mWakeUp is false for all threads, so it shouldnt stay in that while loop.
My idea behind the design:
- SetupThreads() is called before going into the main endless loop
- Inside the endless loop, WorkerInit() is called to wakup threads
- Before i access the data of the 3 threads, WorkerWait() is called to wait till they finished
- Inside the WorkerThread function(called by each thread), im locking the mutex and wait till the thread is woken up or stopped
-
After processing the data, wakeUp is set to false and the condition_variable notifys
static const ui32 NumContexts = 3; // array of pointers to threads std::thread* mThreadHandles[NumContexts]; // wakup std::atomic<bool> mWakeUp[NumContexts]; std::mutex mWakeMutex[NumContexts]; std::condition_variable mWakeCondition[NumContexts]; // wait for thread to finish task std::mutex mWaitMutex[NumContexts]; std::condition_variable mWaitCondition[NumContexts]; // stop signal std::atomic<bool> mStop[NumContexts]; void Framework::SetupThreading() { // create and start threads for (int i = 0; i < NumContexts; i++) { this->mWakeUp[i] = false; this->mStop[i] = false; this->mThreadHandles[i] = new std::thread(&Framework::WorkerThread, this, reinterpret_cast<void*>(i)); } } //--------------------------------------------- void Framework::WakeUpThread(int i) { { //auto lock = std::unique_lock<std::mutex>(this->mWakeMutex[i]); std::lock_guard<std::mutex> lock(this->mWakeMutex[i]); //printf("Waking up thread %i \n", i); this->mWakeUp[i] = true; } this->mWakeCondition[i].notify_one(); } // THIS FUNCTION LOCKS //--------------------------------------------- void Framework::WaitThread(int i) { auto lock = std::unique_lock<std::mutex>(this->mWaitMutex[i]); //printf("Waiting for thread %i to finish \n", i); while (this->mWakeUp[i]) this->mWaitCondition[i].wait(lock); //printf("Thread %i finished! \n", i); } //--------------------------------------------- void Framework::StopThread(int i) { auto lock = std::unique_lock<std::mutex>(this->mWakeMutex[i]); printf("Sending stop signal for thread %i \n", i); this->mStop[i] = true; this->mWakeCondition[i].notify_one(); } //--------------------------------------------- void Framework::JoinThread(int i) { printf("Waiting for join of thread %i \n", i); this->mThreadHandles[i]->join(); printf("Thread %i joined! \n", i); } // THESE ARE CALLED IN THE MAIN LOOP //--------------------------------------------- void Framework::WorkerInit() { for (int i = 0; i < NumContexts; i++) { this->WakeUpThread(i); } } void Framework::WorkerWait() { for (int i = 0; i < NumContexts; i++) { this->WaitThread(i); } } // THE FUNCTION CALLED BY THE THREADS //--------------------------------------------- void Framework::WorkerThread(LPVOID workerIndex) { int threadIndex = reinterpret_cast<int>(workerIndex); while (threadIndex < NumContexts && threadIndex >= 0) { { auto lock = std::unique_lock<std::mutex>(this->mWakeMutex[threadIndex]); //printf("thread %i: waiting for wakeup or stop signal...\n", threadIndex); // not stopped nor woken up? continue to wait while (this->mWakeUp[threadIndex] == false && this->mStop[threadIndex] == false) { this->mWakeCondition[threadIndex].wait(lock); } // stop signal sent? if (this->mStop[threadIndex]) { //printf("thread %i: got stop signal!\n", threadIndex); return; } //printf("thread %i: got wakeup signal!\n", threadIndex); // lock unlocks here (lock destructor) } // printf("thread %i: running the task...\n", threadIndex); // RUN CODE HERE //printf("thread %i finished! Sending signal!...\n", threadIndex); // m_wakeup is atomic so there is no concurrency issue with wait() this->mWakeUp[threadIndex] = false; this->mWaitCondition[threadIndex].notify_all(); } }
Aucun commentaire:
Enregistrer un commentaire