I have a ring buffer that is used for read/writers. I keep track of the number if entries in the ring buffer and do not allow overwriting entries that have not been read. I use std::condition_variable wait() and notify_one() to synchronize the readers and writers. Basically the condition on the reader is that the number of entries > 0. The condition on the writers is that the number of entries < capacity.
It all seems to work but there is one thing I don't understand. When a reader or writer calls notify_one(), it does not cause a context switch. I've read and understand that it works this way. However, in a case where a writer writes an entry to fill the buffer, the writer calls notify_one() and continues to write another in which case its predicate fails in its wait(). In this case I see that another writer() may wake up and its predicate will fail as well. Then a reader will wake up and its predicate succeeds and it can begin reading.
What I don't understand it why on one notify_one() multiple threads are being unblocked. Does a wait() with a failed predicate not eat up the notify? I can't find anything that states this is the case.
I could call notify_all() just to be sure but it seems to be working with notify_one().
Here's the code.
#include <iostream>
#include <stdint.h>
#include <boost/circular_buffer.hpp>
#include <condition_variable>
#include <thread>
// ring buffer with protection for overwrites
template <typename T>
class ring_buffer {
public:
ring_buffer(size_t size) {
cb.set_capacity(size);
}
void read(T& entry) {
{
std::unique_lock<std::mutex> lk(cv_mutex);
cv.wait(lk, [this] {
std::cout << "read woke up, test=" << (cb.size() > 0) << std::endl;
return 0 < cb.size();});
auto iter = cb.begin();
entry = *iter;
cb.pop_front();
std::cout << "Read notify_one" << std::endl;
}
cv.notify_one();
}
void write(const T& entry) {
{
std::unique_lock<std::mutex> lk(cv_mutex);
//std::cout << "Write wait" << std::endl;
cv.wait(lk, [this] {
std::cout << "write woke up, test=" << (cb.size() < cb.capacity()) << std::endl;
return cb.size() < cb.capacity();});
cb.push_back(entry);
std::cout << "Write notify_one" << std::endl;
}
cv.notify_one();
}
size_t get_number_entries() {
std::unique_lock<std::mutex> lk(cv_mutex);
return cb.size();
}
private:
boost::circular_buffer<T> cb;
std::condition_variable cv;
std::mutex cv_mutex;
};
void write_loop(ring_buffer<int> *buffer) {
for (int i = 0; i < 100000; ++i) {
buffer->write(i);
}
}
void read_loop(ring_buffer<int> *buffer) {
for (int i = 0; i < 50000; ++i) {
int val;
buffer->read(val);
}
}
int main() {
ring_buffer<int> buffer(1000);
std::thread writer(write_loop, &buffer);
std::thread reader(read_loop, &buffer);
std::thread reader2(read_loop, &buffer);
writer.join();
reader.join();
reader2.join();
return 0;
}
I see the following in the output where multiple threads are awoken because the predicate is false.
read woke up, test=0
read woke up, test=0
write woke up, test=1
Aucun commentaire:
Enregistrer un commentaire