dimanche 31 décembre 2017

Thread pool on a queue in C++

I've been trying to solve a problem concurrently, which fits the thread pool pattern very nicely. Here I will try to provide a minimal representative example:

Say we have a pseudo-program like this:

Q : collection<int>
while (!Q.empty()) {
    for each q in Q {
        // perform some computation
    }
    // assign a new value to Q
    Q = something_completely_new();
}

I'm trying to implement that in a parallel way, with n-1 workers and one main thread. The workers will perform the computation in the inner loop by grabbing elements from Q.

I tried to solve this using two conditional variables, work, on which the master threads notifies the workers that Q has been assigned to, and another, work_done, where the workers notify master that the entire computation might be done.

Here's my C++ code:

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <thread>

using namespace std;

std::queue<int> Q;
std::mutex mut;
std::condition_variable work;
std::condition_variable work_done;

void run_thread() {
    for (;;) {
        std::unique_lock<std::mutex> lock(mut);
        work.wait(lock, [&] { return Q.size() > 0; });

        // there is work to be done - pretend we're working on something
        int x = Q.front(); Q.pop();
        std::cout << "Working on " << x << std::endl;

        work_done.notify_one();
    }
}

int main() {
    // your code goes here
    std::vector<std::thread *> workers(3);

    for (size_t i = 0; i < 3; i++) {
        workers[i] = new std::thread{
            [&] { run_thread(); }
        };
    }

    for (int i = 4; i > 0; --i) {
        std::unique_lock<std::mutex> lock(mut);
        Q = std::queue<int>();
        for (int k = 0; k < i; k++) {
            Q.push(k);
        }
        work.notify_all();
        work_done.wait(lock, [&] { return Q.size() == 0; });
    }

    for (size_t i = 0; i < 3; i++) {
        delete workers[i];
    }

    return 0;
}

Unfortunately, after compiling it on OS X with g++ -std=c++11 -Wall -o main main.cpp I get the following output:

Working on 0
Working on 1
Working on 2
Working on 3
Working on 0
Working on 1
Working on 2
Working on 0
Working on 1
Working on 0
libc++abi.dylib: terminating
Abort trap: 6

After a while of googling it looks like a segmentation fault. It probably has to do with me misusing conditional variables. I would appreciate some insight, both architectural (on how to approach this type of problem) and specific, as in what I'm doing wrong here exactly.

I appreciate the help

Aucun commentaire:

Enregistrer un commentaire