lundi 1 février 2021

C++11 multi threaded producer/consumer program hangs

I am new to C++11 and using threading features. In the following program, the main thread starts 9 worker threads and pushes data into a queue and then goes to wait for thread termination. I see that the worker threads don't get woken up and the program just hangs.

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
#include <chrono>
#include <future>
#include <atomic>
using namespace std::chrono_literals;

std::mutex _rmtx;
std::mutex _wmtx;
std::queue<unsigned long long> dataq;
std::condition_variable _rcv;
std::condition_variable _wcv;
std::atomic_bool termthd;

void thdfunc(const int& num)
{
    std::cout << "starting thread#" << num << std::endl;

    std::unique_lock<std::mutex> rul(_rmtx);
    while (true) {
        while(!_rcv.wait_until(rul, std::chrono::steady_clock::now() + 10ms, [] {return !dataq.empty() || termthd.load(); }));
        if (termthd.load()) {
            std::terminate();
        }
        std::cout<<"thd#" << num << " : " << dataq.front() <<std::endl;
        dataq.pop();
        _wcv.notify_one();
    }
}

int main()
{
    std::vector<std::thread*> thdvec;
    std::unique_lock<std::mutex> wul(_rmtx);
    unsigned long long data = 0ULL;
    termthd.store(false);
    for (int i = 0; i < 9; i++) {
        thdvec.push_back(new std::thread(thdfunc, i));
    }

    for ( data = 0ULL; data < 2ULL; data++) {
        _wcv.wait_until(wul, std::chrono::steady_clock::now() + 10ms, [&] {return data > 1000000ULL; });
        dataq.push(std::ref(data));
        _rcv.notify_one();
    }
    termthd.store(true);
    _rcv.notify_all();
    //std::this_thread::yield();
    for (int i = 0; i < 9; i++) {
        thdvec[i]->join();
    }
}

I am unable to figure out the problem. How can I make sure the threads get woken up and processes the requests and terminates normally?

Aucun commentaire:

Enregistrer un commentaire