mercredi 31 janvier 2018

Weird hang in generic multiple producers / consumers in C++11 (or above)

I wrote a "generic" multiple producer / consumer using C++11 (or higher) multithreading. The code (below) sort of works, but it hangs / crashes if too many producers / consumers threads are created.

The idea is to neatly separate the concerns: the MultiProducerConsumer object takes care of the protocols (thread maintenance, mutex, condvar) while the "user" injects the relevant functors doing the concrete work (producer, consumer, termination predicate) into the object.

Tested with VS 2017 and cygwin g++. The situation is worse on cygwin (why?). I cannot figure out what the problem is, and I could use a hint. Thanks in advance.

The header, multi_producer_consumer.hpp:

#pragma once
#include <algorithm>
#include <functional>
#include <iterator>
#include <thread>
#include <mutex>
#include <condition_variable>
//#include <cassert>

template<typename Container>
struct MultiProducerConsumer
{
    using Type = typename Container::value_type;
    using ModifierFct = std::function<void(Container&)>;
    using DoneFctr = std::function<bool(const Container&)>;

    MultiProducerConsumer(const Container& q, 
                          ModifierFct producer,
                          ModifierFct consumer,
                          DoneFctr donef,
                          size_t n_producers,
                          size_t n_consumers):
        m_queue(q),
        m_pf(producer),
        m_cf(consumer),
        m_producers(n_producers),
        m_consumers(n_consumers),
        m_done(donef),
        m_joined(false)
    {
        ///std::lock_guard<std::mutex> lk(m_mutex);//why? to prevent the producers to start before consumers are created. So what, if they do?

        for (auto i = 0; i < n_producers; ++i)
        {
            m_producers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::produce), this, i);
        }

        for (int i = 0; i < n_consumers; ++i)
        {
            m_consumers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::consume), this, i);
        }
    }

    virtual ~MultiProducerConsumer(void)
    {
        if (!m_joined)
            join();
    }

    virtual bool done(void) const
    {
        std::lock_guard<std::mutex> lk(m_mutex);
        return m_done(m_queue);
    }

    void join(void)
    {
        std::for_each(m_producers.begin(), m_producers.end(), std::mem_fn(&std::thread::join));
        std::for_each(m_consumers.begin(), m_consumers.end(), std::mem_fn(&std::thread::join));
        m_joined = true;
    }

protected:
    virtual void produce(size_t i)
    {
        while (!done())
        {
            std::lock_guard<std::mutex> lk(m_mutex);
            m_pf(m_queue);
            ///if (i == 0)//should only only one thread notify all the consumers...? nope
            m_condvar.notify_all();//notifies all...not one
        }
    }

    virtual void consume(size_t i)
    {
        while (!done())
        {
            std::unique_lock<std::mutex> lk(m_mutex);
            m_condvar.wait(lk, [this]() {
                return !m_queue.empty();
            });
            m_cf(m_queue);
        }
    }
private:
    Container m_queue;
    ModifierFct m_pf;
    ModifierFct m_cf;
    DoneFctr m_done;

    mutable std::mutex m_mutex;
    std::condition_variable m_condvar;

    std::vector<std::thread> m_producers;
    std::vector<std::thread> m_consumers;

    bool m_joined;
};

The tester, below, uses a queue of vectors that are being "produced" (simply moved from an "outside" queue, matrix, into the producer / consumer queue). The consumers "consume" the vectors by summing each of them and storing the sum into another "outside" container (sums). The whole process terminates when the first vector summing up to zero is encountered. Below is the code:

#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include <queue>
#include <numeric>
#include <iterator>
#include <cassert>

#include "multi_producer_consumer.hpp"

template<typename T>
using QVec = std::queue<std::vector<T>>;

template<typename T>
inline
T sum(const std::vector<T>& v)
{
    return std::accumulate(v.begin(), v.end(), 0);
}

template<typename T>
T from_string(std::string&& str)
{
    T ret;
    std::stringstream ss(str);
    ss >> ret;

    return ret;
}

int main(int argc, char* argv[])
{
    int n_p = 1;
    int n_c = 1;
    if (argc == 3)
    {
        n_p = from_string<int>(argv[1]);
        n_c = from_string<int>(argv[2]);
    }

    const unsigned long max_n_threads = std::thread::hardware_concurrency();
    std::cout << "max # threads: " << max_n_threads << "\n";
    std::cout << "n_producers: " << n_p << ", n_consumers: " << n_c << "\n";

    try {
        std::vector<int> vstart(1, 1);
        std::vector<int> vstop(1, 0);

        std::queue<std::vector<int>> matrix;
        matrix.push(vstart);
        matrix.push(std::vector<int>{ 1, 2, 3, 4, 5 });
        matrix.push(std::vector<int>{ 6, 7, 8, 9 });
        matrix.push(std::vector<int>{ 10, 11, 12, 13 });
        matrix.push(vstop);
        matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
        std::vector<long> sums;
        QVec<int> qqv;

        //multi-producer-consumer that feeds vector from a queue
        //to a consumer that sums them up, until sum is zero:
        //
        MultiProducerConsumer<QVec<int>> mpc(qqv, 
            [&matrix](QVec<int>& qv) { //producer function: move elements from matrix into qv
            if (!matrix.empty())
            {
                auto v = matrix.front();
                matrix.pop();
                qv.push(v);
            }
        },
            [&sums](QVec<int>& qv) {  //consumer function: pop from qv and sum up elements
            //if (!qv.empty())//this test is superfluous
            //{
                auto v = qv.front();
                qv.pop();
                sums.push_back(sum(v));
            //}
        },
            [](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
            if (!qv.empty())
            {
                auto v = qv.front();
                return (sum(v) == 0);
            }
            return false;
        }, n_p, n_c);//1,1 => okay; 1,2 => okay; 2,2 => okay; 5,5 => okay on Win64; hangs on cygwin; 5,10 => it can hang

        //need main thread to block until producers/consumers are done,
        //so that matrix/sums are not destructed while 
        //producers/consumers are still trying to use them:
        //
        mpc.join();

        std::cout << "sums:\n";
        std::copy(std::begin(sums), std::end(sums), std::ostream_iterator<int>(std::cout, "\n"));
    }
    catch (std::exception& ex)
    {
        std::cerr << ex.what() << "\n";
        return 1;
    }
    catch (...)
    {
        std::cerr << "Unknown exception.\n";
        return 1;
    }

    std::cout << "Done!" << std::endl;
    return 0;
}

Something is wrong with it. Just cannot figure what.

Aucun commentaire:

Enregistrer un commentaire