lundi 19 juin 2023

Single producer multiple consumers C

I am trying to implement a program that consists of a producer thread adding objects to a std::vector and multiple consumer threads removing objects from the same vector until it's empty. I am using a condition_variable to let the consumers know that new objects have been produced. Problem is that in last iteration (n items left in storage where n is number of consumer threads), consumer threads get stuck waiting on a conditional variable, even though that condition should not be met (storage is not empty -> at least that's what I figured with some debug logs).

#include <chrono>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

#define CONSUMER_COUNT 4
#define STORAGE_SIZE CONSUMER_COUNT * 10000

class Foo {
private:
  int _id;

public:
  Foo(int id) : _id(id) {}
  int getId() const { return _id; }
};

std::vector<Foo> storage;
std::mutex storageMutex;
std::condition_variable storageCV;

void Producer(int limit) {
  for (int i = 0; i < limit; ++i) {
    std::lock_guard<std::mutex> lg{storageMutex};
    storage.emplace_back(Foo(i));
    storageCV.notify_one();
  }
  storageCV.notify_all();
}

void Consumer(int id) {
  while (true) {
    std::unique_lock<std::mutex> ul{storageMutex};
    storageCV.wait(ul, []() { return !storage.empty(); });
    if (storage.empty())
      return;
    storage.pop_back();
  }
}

int main(int argc, char *argv[]) {
  std::vector<std::thread> consumers;
  consumers.reserve(CONSUMER_COUNT);

  auto producer = std::thread(Producer, STORAGE_SIZE);

  for (int i = 0; i < CONSUMER_COUNT; ++i) {
    consumers.emplace_back(std::thread(Consumer, i));
  }

  producer.join();
  for (auto &consumer : consumers)
    consumer.join();

  storageCV.notify_all();

  std::cout << "[MAIN] Done!" << std::endl;
  std::cout << "Storage is left with " << storage.size() << " items!"
            << std::endl;

  return 0;
}

I have tried adding a simple boolean flag that producer will toggle once it's done with adding all of the items, but then I am not sure (logically) how should I set the condition in consumer threads. Simply adding that check on top of current one is not enough because then a thread might stop running even though there are still some items in the storage.

Aucun commentaire:

Enregistrer un commentaire