mercredi 29 avril 2020

Wait for thread queue to be empty

I am new to C++ and multithreading applications. I want to process a long list of data (potentially several thousands of entries) by dividing its entries among a few threads. I have retrieved a ThreadPool class and a Queue class from the web (it is my first time tackling the subject). I construct the threads and populate the queue in the following way (definitions at the end of the post):

ThreadPool *pool = new ThreadPool(8);
std::vector<std::function<void(int)>> *caller = 
    new std::vector<std::function<void(int)>>;
for (size_t i = 0; i < Nentries; ++i)
{
    caller->push_back( 
        [=](int j){func(entries[i], j);});
    pool->PushTask((*caller)[i]);
}
delete pool;

The problem is that only a number of entries equaling the number of created threads are processed, as if the program does not wait for the queue to be empty. Indeed, if I put

while (pool->GetWorkQueueLength()) {}

just before the pool destructor, the whole list is correctly processed. However, I am afraid I am consuming too many resources by using a while loop. Moreover, I have not found anyone doing anything like it, so I think this is the wrong approach and the classes I use have some error. Can anyone find the error (if present) or suggest another solution?

Here are the classes I use. I suppose the problem is in the implementation of the destructor, but I am not sure.

SynchronizeQueue.hh

#ifndef SYNCQUEUE_H
#define SYNCQUEUE_H
#include <list>
#include <mutex>
#include <condition_variable>

template<typename T>
class SynchronizedQueue
{
    public:
        SynchronizedQueue();

        void Put(T const & data);
        T Get();
        size_t Size();

    private:
        SynchronizedQueue(SynchronizedQueue const &)=delete;
        SynchronizedQueue & operator=(SynchronizedQueue const &)=delete;

        std::list<T> queue;
        std::mutex mut;
        std::condition_variable condvar;
};

template<typename T>
SynchronizedQueue<T>::SynchronizedQueue()
{}


template<typename T>
void SynchronizedQueue<T>::Put(T const & data)
{
    std::unique_lock<std::mutex> lck(mut);
    queue.push_back(data);
    condvar.notify_one();
}

template<typename T>
T SynchronizedQueue<T>::Get()
{
    std::unique_lock<std::mutex> lck(mut);
    while (queue.empty())
    {
        condvar.wait(lck);
    }
    T result = queue.front();
    queue.pop_front();
    return result;
}

template<typename T>
size_t SynchronizedQueue<T>::Size()
{
    std::unique_lock<std::mutex> lck(mut);
    return queue.size();
}

#endif

ThreadPool.hh

#ifndef THREADPOOL_H
#define THREADPOOL_H    
#include "SynchronizedQueue.hh"
#include <atomic>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>

class ThreadPool
{
    public:
        ThreadPool(int nThreads = 0);
        virtual ~ThreadPool();

        void PushTask(std::function<void(int)> func);

        size_t GetWorkQueueLength();

    private:
        void WorkerThread(int i);

        std::atomic<bool> done;
        unsigned int threadCount;
        SynchronizedQueue<std::function<void(int)>> workQueue;
        std::vector<std::thread> threads;
};

#endif

ThreadPool.cc

#include "ThreadPool.hh"
#include "SynchronizedQueue.hh"

void doNothing(int i)
{}

ThreadPool::ThreadPool(int nThreads)
    : done(false)
{
    if (nThreads <= 0)
    {
        threadCount = std::thread::hardware_concurrency();
    } 
    else
    {
        threadCount = nThreads;
    }

    for (unsigned int i = 0; i < threadCount; ++i)
    {
        threads.push_back(std::thread(&ThreadPool::WorkerThread, this, i));
    }
}

ThreadPool::~ThreadPool()
{
    done = true;
    for (unsigned int i = 0; i < threadCount; ++i)
    {
        PushTask(&doNothing);
    }
    for (auto& th : threads)
    {
        if (th.joinable())
        {
            th.join();
        }
    }
}

void ThreadPool::PushTask(std::function<void(int)> func)
{
    workQueue.Put(func);
}

void ThreadPool::WorkerThread(int i)
{
    while (!done)
    {
        workQueue.Get()(i);
    }
}

size_t ThreadPool::GetWorkQueueLength()
{
    return workQueue.Size();
}

Aucun commentaire:

Enregistrer un commentaire