So I am trying to implement a double buffer for a typical producer and consumer problem.
1.get_items() basically produces 10 items at a time.
2.producer basically push 10 items onto a write queue. Assume that currently we only have one producer.
3.consumers will consume one item from the queue. There are many consumers.
So I am sharing my code as the following. The implementation idea is simple, consume from the readq until it is empty and then swap the queue pointer, which the readq would point to the writeq now and writeq would now points to the emptied queue and would starts to fill it again. So producer and consumer can work independently without halting each other. This sort of swaps space for time.
However, my code does not work in multiple consumer cases. In my code, I initiated 10 consumer threads, and it always stuck at the .join().
So I am thinking that my code is definitely buggy. However, by examine carefully, I did not find where that bug is. And it seems the code stuck after lk1.unlock(), so it is not stuck in a while or something obvious.
mutex m1;
mutex m2; // using 2 mutex, so when producer is locked, consumer can still run
condition_variable put;
condition_variable fetch;
queue<int> q1;
queue<int> q2;
queue<int>* readq = &q1;
queue<int>* writeq = &q2;
bool flag{ true };
vector<int> get_items() {
vector<int> res;
for (int i = 0; i < 10; i++) {
res.push_back(i);
}
return res;
}
void producer_mul() {
unique_lock<mutex> lk2(m2);
put.wait(lk2, [&]() {return flag == false; }); //producer waits for consumer signal
vector<int> items = get_items();
for (auto it : items) {
writeq->push(it);
}
flag = true; //signal queue is filled
fetch.notify_one();
lk2.unlock();
}
int get_one_item_mul() {
unique_lock<mutex> lk1(m1);
int res;
if (!(*readq).empty()) {
res = (*readq).front(); (*readq).pop();
if ((*writeq).empty() && flag == true) { //if writeq is empty
flag = false;
put.notify_one();
}
}
else {
readq = writeq; // swap queue pointer
while ((*readq).empty()) { // not yet write
if (flag) {
flag = false;
put.notify_one();//start filling process
}
if (readq->empty()) { //readq now points to writeq, so if producer finished, readq is not empty and flag = true.
fetch.wait(lk1, [&]() {return flag == true; });
}
}
if (flag) {
writeq = writeq == &q1 ? &q2 : &q1; //swap the writeq to the alternative queue and fill it again
flag = false;
//put.notify_one(); //fill that queue again if needed. but in my case, 10 item is produced and consumed, so no need to use the 2nd round, plus the code does not working in this simple case..so commented out for now.
}
res = readq->front(); readq->pop();
}
lk1.unlock();
this_thread::sleep_for(10ms);
return res;
}
int main()
{
std::vector<std::thread> threads;
std::packaged_task<void(void)> job1(producer_mul);
vector<std::future<int>> res;
for (int i = 0; i < 10; i++) {
std::packaged_task<int(void)> job2(get_one_item_mul);
res.push_back(job2.get_future());
threads.push_back(std::thread(std::move(job2)));
}
threads.push_back(std::thread(std::move(job1)));
for (auto& t : threads) {
t.join();
}
for (auto& a : res) {
cout << a.get() << endl;
}
return 0;
}
I added some comments, but the idea and code is pretty simple and self-explanatory.
I am trying to figure out where the problem is in my code. Does it work for multiple consumer? Further more, if there are multiple producers here, does it work? I do not see a problem since basically in the code the lock is not fine grained. Producer and Consumer both are locked from the beginning till the end.
Looking forward to discussion and any help is appreciated.
Aucun commentaire:
Enregistrer un commentaire