samedi 2 novembre 2019

std::condition_variable::notify_one: does it wake multiple threads if some have false predicate?

I have a ring buffer that is used for read/writers. I keep track of the number if entries in the ring buffer and do not allow overwriting entries that have not been read. I use std::condition_variable wait() and notify_one() to synchronize the readers and writers. Basically the condition on the reader is that the number of entries > 0. The condition on the writers is that the number of entries < capacity.

It all seems to work but there is one thing I don't understand. When a reader or writer calls notify_one(), it does not cause a context switch. I've read and understand that it works this way. However, in a case where a writer writes an entry to fill the buffer, the writer calls notify_one() and continues to write another in which case its predicate fails in its wait(). In this case I see that another writer() may wake up and its predicate will fail as well. Then a reader will wake up and its predicate succeeds and it can begin reading.

What I don't understand it why on one notify_one() multiple threads are being unblocked. Does a wait() with a failed predicate not eat up the notify? I can't find anything that states this is the case.

I could call notify_all() just to be sure but it seems to be working with notify_one().

Here's the code.

#include <iostream>
#include <stdint.h>

#include <boost/circular_buffer.hpp>
#include <condition_variable>
#include <thread>


// ring buffer with protection for overwrites 
template <typename T>
class ring_buffer {

  public:

    ring_buffer(size_t size) {
        cb.set_capacity(size);
    }

    void read(T& entry) {
        {
            std::unique_lock<std::mutex> lk(cv_mutex);
            cv.wait(lk, [this] {
                    std::cout << "read woke up, test=" << (cb.size() > 0) << std::endl; 
                    return 0 < cb.size();});
            auto iter = cb.begin();
            entry = *iter;
            cb.pop_front(); 
            std::cout << "Read notify_one" << std::endl;
        }
        cv.notify_one();
    } 

    void write(const T& entry) {
        {
            std::unique_lock<std::mutex> lk(cv_mutex);
            //std::cout << "Write wait" << std::endl;
            cv.wait(lk, [this] {
                    std::cout << "write woke up, test=" << (cb.size() < cb.capacity()) << std::endl; 
                    return cb.size() < cb.capacity();});
            cb.push_back(entry);
            std::cout << "Write notify_one" << std::endl;
        }
        cv.notify_one();
    }

    size_t get_number_entries() {
        std::unique_lock<std::mutex> lk(cv_mutex);
        return cb.size();
    }

  private:

    boost::circular_buffer<T> cb;
    std::condition_variable cv;
    std::mutex cv_mutex;
};

void write_loop(ring_buffer<int> *buffer) {

    for (int i = 0; i < 100000; ++i) {
        buffer->write(i);
    }
}

void read_loop(ring_buffer<int> *buffer) {

    for (int i = 0; i < 50000; ++i) {
        int val;
        buffer->read(val);
    }

}

int main() {

    ring_buffer<int> buffer(1000); 
    std::thread writer(write_loop, &buffer);
    std::thread reader(read_loop, &buffer);
    std::thread reader2(read_loop, &buffer);

    writer.join();
    reader.join();
    reader2.join();

    return 0;
}

I see the following in the output where multiple threads are awoken because the predicate is false.

read woke up, test=0 
read woke up, test=0 
write woke up, test=1 

Aucun commentaire:

Enregistrer un commentaire