mercredi 18 novembre 2020

Failure to create a monitor with mutex and condition_variable in C++11

I am trying to create a monitor class to get sum of a data[]. In the class, I use a monitor a do the reading/writer working. However, the condition_variable, unique_lock and mutex confuse me. When the datasize is not over 56, my code work correctly but with a bigger datasize the code fails and the condition_variable.wait(cond_lock) cannot work when debugging in lldb. The error: type std::__1::system_error: unique_lock::unlock: not locked: Operation not permitted does not help me to understand the problem.

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

const int datasize = 58;//*****datasize cannot be bigger than 56*****//
int *data = new int[datasize];

void data_init(){
    for (int i = 0; i < datasize; ++i) {
        data[i] = random() % datasize + 1;
    }
    int sum = 0;
    for (int i = 0; i < datasize; ++i) {
        sum += data[i];
    }
    std::cout << "true answer: " << sum << std::endl;
}
class monitor{
public:
    std::mutex the_mutex;//the mutex to lock the function
    std::unique_lock<std::mutex> cond_mutex;//trying to use this for condition_variable
    std::condition_variable read_to_go, write_to_go;
    int active_reader, active_writer, waiting_reader, waiting_writer;
    bool write_flag;
    void getTask(int Rank, int& task_one, int& task_two, int& second_rank);//**reader**//
    void putResult(int Rank, int the_answer, int next_Rank);//**writer**//
    explicit monitor(){
        write_flag = true;
        active_reader = active_writer = waiting_reader = waiting_writer = 0;
    }
private:
    inline void startRead();
    inline void endRead();
    inline void startWrite();
    inline void endWrite();
};

monitor imonitor;

inline void monitor::startRead() {
    the_mutex.lock();//lock the function code
    while((active_writer + active_reader) > 0){//if there are working reader and writer
        waiting_reader++;//add one 
        read_to_go.wait(cond_mutex);//wait the thread
        /*****when debugging with lldb, error appears here*****/
        waiting_reader--;//one less reader waiting when notified
    }
    active_reader++;//one more working reader
    the_mutex.unlock();
}

inline void monitor::endRead() {
    the_mutex.lock();
    active_reader--;//one less reader working
    if(active_reader == 0 && waiting_writer > 0){//if there is not any reader working and there are some writer waiting
        write_to_go.notify_one();//notify one writer
    }//else get out directly
    the_mutex.unlock();
}

inline void monitor::startWrite() {
    the_mutex.lock();
    while((active_writer + active_reader) > 0){//if any reader or writer is working
        waiting_writer++;//one more writer waiting
        write_to_go.wait(cond_mutex);//block this thread
        waiting_writer--;//when notfied, the number of waiting writer become less
    }
    active_writer++;//one more active writer
    cond_mutex.unlock();
}

inline void monitor::endWrite() {//write is over
    the_mutex.lock();
    active_writer--;//one less writer working
    if(waiting_writer > 0){//if any writer waiting
        write_to_go.notify_one();//notify one of them
    }
    else if(waiting_reader > 0){//if any reader waiting
        read_to_go.notify_all();//notify all of them
    }
    the_mutex.unlock();
}

void monitor::getTask(int Rank, int &task_one, int &task_two, int &second_rank) {
    startRead();
    task_one = data[Rank];
    while(Rank < (datasize - 1) && data[++Rank] == 0);
    task_two = data[Rank];
    second_rank = Rank;
    //std::cout << "the second Rank is " << Rank << std::endl;
    endRead();
}

void monitor::putResult(int Rank, int the_answer, int next_Rank) {
    startWrite();
    data[Rank] = the_answer;
    data[next_Rank] = 0;
    endWrite();
}

void reducer(int Rank){
    //std::cout << "a reducer begins" << Rank << std::endl;
    do {
        int myTask1, myTask2, secondRank;
        imonitor.getTask(Rank, myTask1, myTask2, secondRank);
        if(myTask2 == 0) return;
        //std::cout << "the second value Rank: " << secondRank << std::endl;
        int answer = myTask1 + myTask2;
        imonitor.putResult(Rank, answer, secondRank);
    }while (true);
}

int main() {
    std::cout << "Hello, World!" << std::endl;
    data_init();

    std::thread Reduce1(reducer, 0);
    std::thread Reduce2(reducer, datasize/2);
    /*std::thread Reduce3(reducer, 4);
    std::thread Reduce4(reducer, 6);
    std::thread Reduce5(reducer, 8);
    std::thread Reduce6(reducer, 10);
    std::thread Reduce7(reducer, 12);
    std::thread Reduce8(reducer, 14);*/

    Reduce1.join(); //std::cout << "A reducer in" <<std::endl;
    Reduce2.join();
    /*Reduce3.join();
    Reduce4.join();
    Reduce5.join();
    Reduce6.join();
    Reduce7.join();
    Reduce8.join();*/

    std::cout << data[0] << std::endl;
    return 0;
}

My goal used to use 8 threads, but now the code can work with only one thread. Some cout for debugging are left in the code. Thank you for any help!

Aucun commentaire:

Enregistrer un commentaire