mercredi 31 août 2016

C++11 condtional variable


I am trying make a lot of mistakes to learn Concurrency in C++11. I have to ask this,

Here is what this one is supposed to do: One queue, and three threads, one is suppose to put an integer into the queue, the other twos are suppose to correspondingly increase s1, s2 by popping the queue so that I can get total sum of numbers that were in the queue. To make it simpler I put 1 through 10 numbers into the queue.

But sometimes it works and sometimes it seems like there is an infinite loop:: what would be the reason?

#include <queue>
#include <memory>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
#include <string>

class threadsafe_queue {
private:
    mutable std::mutex mut;
    std::queue<int> data_queue;
    std::condition_variable data_cond;
    std::string log; //just to see what is going on behind
    bool done;

public:
    threadsafe_queue(){
        log = "initializing queue\n";
        done = false;
    }
    threadsafe_queue(threadsafe_queue const& other) {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue = other.data_queue;
    }
    void set_done(bool const s) {
        std::lock_guard<std::mutex> lk(mut);
        done = s;
    }
    bool get_done() {
        std::lock_guard<std::mutex> lk(mut);
        return done;
    }
    void push(int new_value) {
        std::lock_guard<std::mutex> lk(mut);
        log += "+pushing " + std::to_string(new_value) + "\n";
        data_queue.push(new_value);
        data_cond.notify_one();
    }
    void wait_and_pop(int& value) {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty();});
        value = data_queue.front();
        log += "-poping " + std::to_string(value) + "\n";
        data_queue.pop();
    }
    std::shared_ptr<int> wait_and_pop() {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty();});
        std::shared_ptr<int> res(std::make_shared<int>(data_queue.front()));
        log += "- popping " + std::to_string(*res) + "\n";
        data_queue.pop();
        return res;
    }
    bool try_pop(int& value) {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty()) {
            log += "tried to pop but it was empty\n";
            return false;
        }
        value = data_queue.front();
        log += "-popping " + std::to_string(value) + "\n";
        data_queue.pop();
        return true;
    }
    std::shared_ptr<int> try_pop() {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty()) {
            log += "tried to pop but it was empty\n";
            return std::shared_ptr<int>();
        }
        std::shared_ptr<int> res(std::make_shared<int>(data_queue.front()));
        log += "-popping " + std::to_string(*res) + "\n";
        data_queue.pop();
        return res;
    }
    bool empty() const {
        std::lock_guard<std::mutex> lk(mut);
        //log += "checking the queue if it is empty\n";
        return data_queue.empty();
    }

    std::string get_log() {
        return log;
    }

};

threadsafe_queue tq;
int s1, s2;

void prepare() {
    for (int i = 1; i <= 10; i++)
        tq.push(i);
    tq.set_done(true);
}

void p1() {
    while (true) {
        int data;
        tq.wait_and_pop(data);
        s1 += data;
        if (tq.get_done() && tq.empty()) break;
    }
}

void p2() {
    while (true) {
        int data;
        tq.wait_and_pop(data);
        s2 += data;
        if (tq.get_done() && tq.empty()) break;
    }
}

int main(int argc, char *argv[]) {
    std::thread pp(prepare);
    std::thread worker(p1);
    std::thread worker2(p2);
    pp.join();
    worker.join();
    worker2.join();

    std::cout << tq.get_log() << std::endl;
    std::cout << s1 << " " << s2 << std::endl;
    return 0;
}

Aucun commentaire:

Enregistrer un commentaire