lundi 29 juillet 2019

Condition variable should be used or not to reduce missed wakeups

I have two threads, one is the producer and other is consumer. My consumer is always late (due to some costly function call, simulated in below code using sleeps) so I have used ring buffer as I can afford to loose some events.

Questions: I am wondering if it would be better to use condition variable instead of what I currently have : continuous monitoring of the ring buffer size to see if the events got generated. I know that the current while loop of checking the ring buffer size is expensive, so I can probably add some yield calls to reduce the tight loop.

Can I get rid of pointers? In my current code I am passing pointers to my ring buffer from main function to the threads. Wondering if there is any fancy or better way to do the same?

#include <iostream>
#include <thread>
#include <chrono>
#include <vector>
#include <atomic>
#include <boost/circular_buffer.hpp>
#include <condition_variable>
#include <functional>

std::atomic<bool> mRunning;
std::mutex m_mutex;
std::condition_variable m_condVar;
long int data = 0;

class Detacher {
    public:
    template<typename Function, typename ... Args>
    void createTask(Function &&func, Args&& ... args) {
        m_threads.emplace_back(std::forward<Function>(func), std::forward<Args>(args)...);
    }

    Detacher() = default;
    Detacher(const Detacher&) = delete;
    Detacher & operator=(const Detacher&) = delete;
    Detacher(Detacher&&) = default;
    Detacher& operator=(Detacher&&) = default;

    ~Detacher() {
        for (auto& thread : m_threads) {
            thread.join();
        }
    }

    private:
    std::vector<std::thread> m_threads;
};

void foo_1(boost::circular_buffer<int> *cb)
{
    while (mRunning) {
        std::unique_lock<std::mutex> mlock(m_mutex);
        if (!cb->size())
            continue;
        int data = cb[0][0];
        cb->pop_front();
        mlock.unlock();
        if (!mRunning) {
            break;  
        }
        //simulate time consuming function call
        std::this_thread::sleep_for(std::chrono::milliseconds(16));
    }
}

void foo_2(boost::circular_buffer<int> *cb)
{
    while (mRunning) {
        std::unique_lock<std::mutex> mlock(m_mutex);
        cb->push_back(data);
        data++;
        mlock.unlock();
        //simulate time consuming function call
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
}

int main()
{
    mRunning = true;
    boost::circular_buffer<int> cb(100);
    Detacher thread_1;
    thread_1.createTask(foo_1, &cb);
    Detacher thread_2;
    thread_2.createTask(foo_2, &cb);
    std::this_thread::sleep_for(std::chrono::milliseconds(20000));
    mRunning = false;
}

Aucun commentaire:

Enregistrer un commentaire