I am trying to implement a program that consists of a producer thread adding objects to a std::vector
and multiple consumer threads removing objects from the same vector until it's empty. I am using a condition_variable
to let the consumers know that new objects have been produced. Problem is that in last iteration (n items left in storage where n is number of consumer threads), consumer threads get stuck waiting on a conditional variable, even though that condition should not be met (storage
is not empty -> at least that's what I figured with some debug logs).
#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#define CONSUMER_COUNT 4
#define STORAGE_SIZE CONSUMER_COUNT * 10000
class Foo {
private:
int _id;
public:
Foo(int id) : _id(id) {}
int getId() const { return _id; }
};
std::vector<Foo> storage;
std::mutex storageMutex;
std::condition_variable storageCV;
void Producer(int limit) {
for (int i = 0; i < limit; ++i) {
std::lock_guard<std::mutex> lg{storageMutex};
storage.emplace_back(Foo(i));
storageCV.notify_one();
}
storageCV.notify_all();
}
void Consumer(int id) {
while (true) {
std::unique_lock<std::mutex> ul{storageMutex};
storageCV.wait(ul, []() { return !storage.empty(); });
if (storage.empty())
return;
storage.pop_back();
}
}
int main(int argc, char *argv[]) {
std::vector<std::thread> consumers;
consumers.reserve(CONSUMER_COUNT);
auto producer = std::thread(Producer, STORAGE_SIZE);
for (int i = 0; i < CONSUMER_COUNT; ++i) {
consumers.emplace_back(std::thread(Consumer, i));
}
producer.join();
for (auto &consumer : consumers)
consumer.join();
storageCV.notify_all();
std::cout << "[MAIN] Done!" << std::endl;
std::cout << "Storage is left with " << storage.size() << " items!"
<< std::endl;
return 0;
}
I have tried adding a simple boolean flag that producer will toggle once it's done with adding all of the items, but then I am not sure (logically) how should I set the condition in consumer threads. Simply adding that check on top of current one is not enough because then a thread might stop running even though there are still some items in the storage.
Aucun commentaire:
Enregistrer un commentaire