mardi 26 avril 2022

C++ Multithreading processes a queue, and the processed results are output to another queue in the order of the original queue

I just started learning about multithreading.I want to design multithreaded module for a problem. I have two queue,there is one thread which is putting messages in the messages queue,and there is many thread to process the messages from the message queue and put the processed messages to the output queue.And there is one thread to output from the output queue.The problem is that the order of the output queue is not same as the messages queue may because the processing threads take different time .Thanks for your help.

#include <thread>
#include <mutex>
#include<queue>
#include <string>
#include<time.h>

using namespace std;
std::mutex mut;
std::queue<int> data_queue; 
std::condition_variable data_cond;
std::queue <int> data_queue2;
std::condition_variable data_cond2;

void process(int& a)
{
    srand((unsigned)time(NULL));
    int b = 10, c = 200;
    int time= (rand() % (c - b)) + b ;
    std::this_thread::sleep_for(std::chrono::milliseconds(time));
    a=2*a;
}
void process2(int a)
{
    cout <<a << endl;

}
void data_preparation_thread()
{
    int i = 0;
    while (i<10)
    {
        int  data = i;
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data); 
        data_cond.notify_all(); 
        i++;
    }
}
void data_processing_thread()
{
    while (true)
    {
    std::unique_lock<std::mutex> lk(mut); 
    data_cond.wait(
        lk, [] {return !data_queue.empty(); }); 
    int data = data_queue.front();
    data_queue.pop();
    lk.unlock(); 
    process(data);
    std::lock_guard<std::mutex> lk2(mut);
    data_queue2.push(data);
    data_cond2.notify_one();
    }
}
void data_processing_thread2()
{
    while (true)
    {
       
        std::unique_lock<std::mutex> lk(mut); // 4
        data_cond2.wait(
            lk, [] {return !data_queue2.empty(); }); // 5
        int data = data_queue2.front();                                                            
        data_queue2.pop();
        lk.unlock(); // 6
        process2(data);
    }
}
int main()
{
    thread first(data_preparation_thread);
    thread second(data_processing_thread);
    thread second2(data_processing_thread);
    thread second3(data_processing_thread);
    thread third(data_processing_thread2);
    first.detach();
    second.detach();
    second2.detach();
    second3.detach();
    third.detach();                                             
    while (1) 
    {
        int j = 0;
    }
    return 0;


}

Aucun commentaire:

Enregistrer un commentaire