I am new to C++ and multithreading applications. I want to process a long list of data (potentially several thousands of entries) by dividing its entries among a few threads. I have retrieved a ThreadPool class and a Queue class from the web (it is my first time tackling the subject). I construct the threads and populate the queue in the following way (definitions at the end of the post):
ThreadPool *pool = new ThreadPool(8);
std::vector<std::function<void(int)>> *caller =
new std::vector<std::function<void(int)>>;
for (size_t i = 0; i < Nentries; ++i)
{
caller->push_back(
[=](int j){func(entries[i], j);});
pool->PushTask((*caller)[i]);
}
delete pool;
The problem is that only a number of entries equaling the number of created threads are processed, as if the program does not wait for the queue to be empty. Indeed, if I put
while (pool->GetWorkQueueLength()) {}
just before the pool destructor, the whole list is correctly processed. However, I am afraid I am consuming too many resources by using a while loop. Moreover, I have not found anyone doing anything like it, so I think this is the wrong approach and the classes I use have some error. Can anyone find the error (if present) or suggest another solution?
Here are the classes I use. I suppose the problem is in the implementation of the destructor, but I am not sure.
SynchronizeQueue.hh
#ifndef SYNCQUEUE_H
#define SYNCQUEUE_H
#include <list>
#include <mutex>
#include <condition_variable>
template<typename T>
class SynchronizedQueue
{
public:
SynchronizedQueue();
void Put(T const & data);
T Get();
size_t Size();
private:
SynchronizedQueue(SynchronizedQueue const &)=delete;
SynchronizedQueue & operator=(SynchronizedQueue const &)=delete;
std::list<T> queue;
std::mutex mut;
std::condition_variable condvar;
};
template<typename T>
SynchronizedQueue<T>::SynchronizedQueue()
{}
template<typename T>
void SynchronizedQueue<T>::Put(T const & data)
{
std::unique_lock<std::mutex> lck(mut);
queue.push_back(data);
condvar.notify_one();
}
template<typename T>
T SynchronizedQueue<T>::Get()
{
std::unique_lock<std::mutex> lck(mut);
while (queue.empty())
{
condvar.wait(lck);
}
T result = queue.front();
queue.pop_front();
return result;
}
template<typename T>
size_t SynchronizedQueue<T>::Size()
{
std::unique_lock<std::mutex> lck(mut);
return queue.size();
}
#endif
ThreadPool.hh
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include "SynchronizedQueue.hh"
#include <atomic>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
class ThreadPool
{
public:
ThreadPool(int nThreads = 0);
virtual ~ThreadPool();
void PushTask(std::function<void(int)> func);
size_t GetWorkQueueLength();
private:
void WorkerThread(int i);
std::atomic<bool> done;
unsigned int threadCount;
SynchronizedQueue<std::function<void(int)>> workQueue;
std::vector<std::thread> threads;
};
#endif
ThreadPool.cc
#include "ThreadPool.hh"
#include "SynchronizedQueue.hh"
void doNothing(int i)
{}
ThreadPool::ThreadPool(int nThreads)
: done(false)
{
if (nThreads <= 0)
{
threadCount = std::thread::hardware_concurrency();
}
else
{
threadCount = nThreads;
}
for (unsigned int i = 0; i < threadCount; ++i)
{
threads.push_back(std::thread(&ThreadPool::WorkerThread, this, i));
}
}
ThreadPool::~ThreadPool()
{
done = true;
for (unsigned int i = 0; i < threadCount; ++i)
{
PushTask(&doNothing);
}
for (auto& th : threads)
{
if (th.joinable())
{
th.join();
}
}
}
void ThreadPool::PushTask(std::function<void(int)> func)
{
workQueue.Put(func);
}
void ThreadPool::WorkerThread(int i)
{
while (!done)
{
workQueue.Get()(i);
}
}
size_t ThreadPool::GetWorkQueueLength()
{
return workQueue.Size();
}
Aucun commentaire:
Enregistrer un commentaire