lundi 28 décembre 2020

queue entries of threadpool in C++

I am trying to implement a thredpool, and I found that there are two implementations. They are all using queue to manage tasks, but the entries of queue are different. I want to konw which is more better and the rational of the implementations. Why the second version need a function_wrapper.

First version use std::queue<std::packaged_task<void()> > tasks;

class ThreadPool {
 public:
  explicit ThreadPool(size_t);
  template <class F, class... Args>
  decltype(auto) enqueue(F&& f, Args&&... args);
  ~ThreadPool();

 private:
  // need to keep track of threads so we can join them
  std::vector<std::thread> workers;
  // the task queue
  std::queue<std::packaged_task<void()> > tasks;

  // synchronization
  std::mutex queue_mutex;
  std::condition_variable condition;
  bool stop;
};

// add new work item to the pool
template <class F, class... Args>
decltype(auto) ThreadPool::enqueue(F&& f, Args&&... args) {
  using return_type = decltype(f(args...));

  std::packaged_task<return_type()> task(
      std::bind(std::forward<F>(f), std::forward<Args>(args)...));

  std::future<return_type> res = task.get_future();
  {
    std::unique_lock<std::mutex> lock(queue_mutex);

    // don't allow enqueueing after stopping the pool
    if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");

    tasks.emplace(std::move(task));
  }
  condition.notify_one();
  return res;
}

Second version use thread_safe_queue<function_wrapper> work_queue;

class function_wrapper {
  struct impl_base {
    virtual void call() = 0;
    virtual ~impl_base() {}
  };
  std::unique_ptr<impl_base> impl;
  template <typename F>
  struct impl_type : impl_base {
    F f;
    impl_type(F&& f_) : f(std::move(f_)) {}
    void call() { f(); }
  };

 public:
  template <typename F>
  function_wrapper(F&& f) : impl(new impl_type<F>(std::move(f))) {}
  void operator()() { impl->call(); }
  function_wrapper() = default;
  function_wrapper(function_wrapper&& other) : impl(std::move(other.impl)) {}
  function_wrapper& operator=(function_wrapper&& other) {
    impl = std::move(other.impl);
    return *this;
  }
  function_wrapper(const function_wrapper&) = delete;
  function_wrapper(function_wrapper&) = delete;
  function_wrapper& operator=(const function_wrapper&) = delete;
};

class thread_pool {
  thread_safe_queue<function_wrapper> work_queue;
  void worker_thread() {
    while (!done) {
      function_wrapper task;
      if (work_queue.try_pop(task)) {
        task();
      } else {
        std::this_thread::yield();
      }
    }
  }

 public:
  template <typename FunctionType>
  std::future<typename std::result_of<FunctionType()>::type> submit(
      FunctionType f) {
    typedef typename std::result_of<FunctionType()>::type result_type;
    std::packaged_task<result_type()> task(std::move(f));
    std::future<result_type> res(task.get_future());
    work_queue.push(std::move(task));
    return res;
  }
}

Aucun commentaire:

Enregistrer un commentaire