lundi 2 avril 2018

C++ STL Producer multiple consumer where producer waits for free consumer before producing next value

My little consumer-producer problem had me stumped for some time. I didn't want an implementation where one producer pushes some data round-robin to the consumers, filling up their queues of data respectively.

I wanted to have one producer, x consumers, but the producer waits with producing new data until a consumer is free again. In my example there are 3 consumers so the producer creates a maximum of 3 objects of data at any given time. Since I don't like polling, the consumers were supposed to notify the producer when they are done. Sounds simple, but the solution I found doesn't please me. First the code.

#include "stdafx.h"
#include <mutex>
#include <iostream>
#include <future>
#include <map>
#include <atomic>

std::atomic_int totalconsumed;

class producer {
    using runningmap_t = std::map<int, std::pair<std::future<void>, bool>>;

    // Secure the map of futures.
    std::mutex mutex_;
    runningmap_t running_;

    // Used for finished notification
    std::mutex waitermutex_;
    std::condition_variable waiter_;

    // The magic number to limit the producer.
    std::atomic<int> count_;

    bool can_run();
    void clean();

    // Fake a source, e.g. filesystem scan.
    int fakeiter;
    int next();
    bool has_next() const;

public:
    producer() : fakeiter(50) {}
    void run();
    void notify(int value);
    void wait();
};

class consumer {
    producer& producer_;
public:
    consumer(producer& producer) : producer_(producer) {}
    void run(int value) {
        std::this_thread::sleep_for(std::chrono::milliseconds(42));
        std::cout << "Consumed " << value << " on (" << std::this_thread::get_id() << ")" << std::endl;
        totalconsumed++;
        producer_.notify(value);
    }
};


// Only if less than three threads are active, another gets to run.
bool producer::can_run() { return count_.load() < 3; }

// Verify if there's something to consume
bool producer::has_next() const { return 0 != fakeiter; }

// Produce the next value for consumption.
int producer::next() { return --fakeiter; }

// Remove the futures that have reported to be finished.
void producer::clean()
{
    for (auto it = running_.begin(); it != running_.end(); ) {
        if (it->second.second) {
            it = running_.erase(it);
        }
        else { 
            ++it;
        }
    }
}

// Runs the producer. Creates a new consumer for every produced value. Max 3 at a time.
void producer::run()
{
    while (has_next()) {
        if (can_run()) {
            auto c = next();

            count_++;
            auto future = std::async(&consumer::run, consumer(*this), c);

            std::unique_lock<std::mutex> lock(mutex_);
            running_[c] = std::make_pair(std::move(future), false);

            clean();
        }
        else {
            std::unique_lock<std::mutex> lock(waitermutex_);
            waiter_.wait(lock);
        }
    }
}

// Consumers diligently tell the producer that they are finished.
void producer::notify(int value)
{
    count_--;

    mutex_.lock();
    running_[value].second = true;
    mutex_.unlock();

    std::unique_lock<std::mutex> waiterlock(waitermutex_);
    waiter_.notify_all();
}

// Wait for all consumers to finish.
void producer::wait()
{
    while (!running_.empty()) {
        mutex_.lock();
        clean();
        mutex_.unlock();

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
}

// Looks like the application entry point.
int main()
{
    producer p;

    std::thread pthread(&producer::run, &p);
    pthread.join();
    p.wait();

    std::cout << std::endl << std::endl << "Total consumed " << totalconsumed.load() << std::endl;

    return 0;
}

The part I don't like is the list of values mapped to the futures, called running_. I need to keep the future around until the consumer is actually done. I can't remove the future from the map in the notify method or else I'll kill the thread that is currently calling notify.

Am I missing something that could simplify this construct?

Aucun commentaire:

Enregistrer un commentaire