I am trying to implement a thredpool, and I found that there are two implementations. They are all using queue to manage tasks, but the entries of queue are different. I want to konw which is more better and the rational of the implementations. Why the second version need a function_wrapper
.
First version use std::queue<std::packaged_task<void()> > tasks;
class ThreadPool {
public:
explicit ThreadPool(size_t);
template <class F, class... Args>
decltype(auto) enqueue(F&& f, Args&&... args);
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
// the task queue
std::queue<std::packaged_task<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// add new work item to the pool
template <class F, class... Args>
decltype(auto) ThreadPool::enqueue(F&& f, Args&&... args) {
using return_type = decltype(f(args...));
std::packaged_task<return_type()> task(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task.get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace(std::move(task));
}
condition.notify_one();
return res;
}
Second version use thread_safe_queue<function_wrapper> work_queue;
class function_wrapper {
struct impl_base {
virtual void call() = 0;
virtual ~impl_base() {}
};
std::unique_ptr<impl_base> impl;
template <typename F>
struct impl_type : impl_base {
F f;
impl_type(F&& f_) : f(std::move(f_)) {}
void call() { f(); }
};
public:
template <typename F>
function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f))) {}
void operator()() { impl->call(); }
function_wrapper() = default;
function_wrapper(function_wrapper&& other) : impl(std::move(other.impl)) {}
function_wrapper& operator=(function_wrapper&& other) {
impl = std::move(other.impl);
return *this;
}
function_wrapper(const function_wrapper&) = delete;
function_wrapper(function_wrapper&) = delete;
function_wrapper& operator=(const function_wrapper&) = delete;
};
class thread_pool {
thread_safe_queue<function_wrapper> work_queue;
void worker_thread() {
while (!done) {
function_wrapper task;
if (work_queue.try_pop(task)) {
task();
} else {
std::this_thread::yield();
}
}
}
public:
template <typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(
FunctionType f) {
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(std::move(f));
std::future<result_type> res(task.get_future());
work_queue.push(std::move(task));
return res;
}
}
Aucun commentaire:
Enregistrer un commentaire