mercredi 13 février 2019

How should conditional variables in producer-consumer implementations be initialized

I am trying to understand the use of conditional variables to implement producer-consumer buffers. I have the following code, which implements a queue for integers (which could be linux file descriptors). The code works as expected, but I am trying to understand why. Both enqueue and dequeue operations wait on some conditional variable before signaling the other conditional variable. Why are these waits unblocking? Is this due to spurious wakeups?

#include <iostream>
#include <thread>
#include <list>
#include <mutex>
#include <chrono>
#include <condition_variable>

using namespace std::chrono_literals;
using std::cout;
using std::endl;

class FDQueue
{
    std::mutex _mutex;
    std::condition_variable _notEmptyCv, _notFullCv;
    std::list<int> _fds;
    size_t _maxSize;

public:
    void add(int fd) {
        std::unique_lock<std::mutex> locker(this->_mutex);
        this->_notFullCv.wait(locker, [this](){return this->_fds.size() < this->_maxSize;});
        cout<<"Enqueue "<<endl;
        this->_fds.push_back(fd);
        locker.unlock();
        this->_notEmptyCv.notify_one();
    }

    int remove() {
        std::unique_lock<std::mutex> locker(_mutex);
        this->_notEmptyCv.wait(locker, [this](){return this->_fds.size() > 0;});
        int fd = this->_fds.front();
        this->_fds.pop_front();
        cout<<"Dequeue"<<endl;
        locker.unlock();
        this->_notFullCv.notify_all();
        return fd;
    }

    FDQueue(size_t maxSize) : _maxSize(maxSize) {}
};

FDQueue queue(5);

void producer() {
    while (true) {
        queue.add(0);
        std::this_thread::sleep_for(2s);
    }
}
void consumer() {
    while (true) {
        queue.remove();
    }
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join();
    t2.join();
}

Aucun commentaire:

Enregistrer un commentaire