I am trying to create a monitor class to get sum of a data[]. In the class, I use a monitor a do the reading/writer working. However, the condition_variable, unique_lock and mutex confuse me. When the datasize is not over 56, my code work correctly but with a bigger datasize the code fails and the condition_variable.wait(cond_lock) cannot work when debugging in lldb. The error: type std::__1::system_error: unique_lock::unlock: not locked: Operation not permitted does not help me to understand the problem.
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
const int datasize = 58;//*****datasize cannot be bigger than 56*****//
int *data = new int[datasize];
void data_init(){
for (int i = 0; i < datasize; ++i) {
data[i] = random() % datasize + 1;
}
int sum = 0;
for (int i = 0; i < datasize; ++i) {
sum += data[i];
}
std::cout << "true answer: " << sum << std::endl;
}
class monitor{
public:
std::mutex the_mutex;//the mutex to lock the function
std::unique_lock<std::mutex> cond_mutex;//trying to use this for condition_variable
std::condition_variable read_to_go, write_to_go;
int active_reader, active_writer, waiting_reader, waiting_writer;
bool write_flag;
void getTask(int Rank, int& task_one, int& task_two, int& second_rank);//**reader**//
void putResult(int Rank, int the_answer, int next_Rank);//**writer**//
explicit monitor(){
write_flag = true;
active_reader = active_writer = waiting_reader = waiting_writer = 0;
}
private:
inline void startRead();
inline void endRead();
inline void startWrite();
inline void endWrite();
};
monitor imonitor;
inline void monitor::startRead() {
the_mutex.lock();//lock the function code
while((active_writer + active_reader) > 0){//if there are working reader and writer
waiting_reader++;//add one
read_to_go.wait(cond_mutex);//wait the thread
/*****when debugging with lldb, error appears here*****/
waiting_reader--;//one less reader waiting when notified
}
active_reader++;//one more working reader
the_mutex.unlock();
}
inline void monitor::endRead() {
the_mutex.lock();
active_reader--;//one less reader working
if(active_reader == 0 && waiting_writer > 0){//if there is not any reader working and there are some writer waiting
write_to_go.notify_one();//notify one writer
}//else get out directly
the_mutex.unlock();
}
inline void monitor::startWrite() {
the_mutex.lock();
while((active_writer + active_reader) > 0){//if any reader or writer is working
waiting_writer++;//one more writer waiting
write_to_go.wait(cond_mutex);//block this thread
waiting_writer--;//when notfied, the number of waiting writer become less
}
active_writer++;//one more active writer
cond_mutex.unlock();
}
inline void monitor::endWrite() {//write is over
the_mutex.lock();
active_writer--;//one less writer working
if(waiting_writer > 0){//if any writer waiting
write_to_go.notify_one();//notify one of them
}
else if(waiting_reader > 0){//if any reader waiting
read_to_go.notify_all();//notify all of them
}
the_mutex.unlock();
}
void monitor::getTask(int Rank, int &task_one, int &task_two, int &second_rank) {
startRead();
task_one = data[Rank];
while(Rank < (datasize - 1) && data[++Rank] == 0);
task_two = data[Rank];
second_rank = Rank;
//std::cout << "the second Rank is " << Rank << std::endl;
endRead();
}
void monitor::putResult(int Rank, int the_answer, int next_Rank) {
startWrite();
data[Rank] = the_answer;
data[next_Rank] = 0;
endWrite();
}
void reducer(int Rank){
//std::cout << "a reducer begins" << Rank << std::endl;
do {
int myTask1, myTask2, secondRank;
imonitor.getTask(Rank, myTask1, myTask2, secondRank);
if(myTask2 == 0) return;
//std::cout << "the second value Rank: " << secondRank << std::endl;
int answer = myTask1 + myTask2;
imonitor.putResult(Rank, answer, secondRank);
}while (true);
}
int main() {
std::cout << "Hello, World!" << std::endl;
data_init();
std::thread Reduce1(reducer, 0);
std::thread Reduce2(reducer, datasize/2);
/*std::thread Reduce3(reducer, 4);
std::thread Reduce4(reducer, 6);
std::thread Reduce5(reducer, 8);
std::thread Reduce6(reducer, 10);
std::thread Reduce7(reducer, 12);
std::thread Reduce8(reducer, 14);*/
Reduce1.join(); //std::cout << "A reducer in" <<std::endl;
Reduce2.join();
/*Reduce3.join();
Reduce4.join();
Reduce5.join();
Reduce6.join();
Reduce7.join();
Reduce8.join();*/
std::cout << data[0] << std::endl;
return 0;
}
My goal used to use 8 threads, but now the code can work with only one thread. Some cout for debugging are left in the code. Thank you for any help!
Aucun commentaire:
Enregistrer un commentaire