jeudi 26 mai 2016

Thread pool queue is not getting updated with pushed items, though push is successful

I have a simple thread pool implementation. and the implementation is below-

The problem is when i insert an item in the queue it does not get reflected in thread pool function void worker_thread() i.e. the check if (work_queue.try_pop(task)) in worker_thread() fucntion is always failing and tasks are not picked up by the threads. Can you please point out what am i doing wrong?

Thanks in advance.

template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {}

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(std::move(new_value));
        data_cond.notify_one();
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty(); });
        value = std::move(data_queue.front());
        data_queue.pop();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty(); });
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }

    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return false;
        value = std::move(data_queue.front());
        data_queue.pop();
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

class thread_pool
{
    threadsafe_queue<std::function<void()>> work_queue;
    std::atomic<bool> done;
    int thread_count;
    std::vector<std::thread> threads;

    void worker_thread()
    {
        while (!done)
        {
            std::function<void()> task;
            if (work_queue.try_pop(task))
            {
                task();
            }
            else
            {
                std::this_thread::yield();
            }
        }
    }
public:
    thread_pool(unsigned thread_count = 5) : done(false), thread_count(thread_count)
    {
        try
        {
            for (unsigned i = 0; i < thread_count; ++i)
            {
                threads.emplace_back(std::thread(&thread_pool::worker_thread, this));
            }
        }
        catch (...)
        {
            done = true;
            throw;
        }
    }

    ~thread_pool()
    {
        done = true;
        for (unsigned i = 0; i < thread_count; ++i)
        {
            threads[i].join();
        }
    }
    template <typename FunctionType>
    void submit(FunctionType f)
    {
        work_queue.push(std::function<void()>(f)); // how
    }
};

void fun()
 {
    cout << "hi"<<this_thread::get_id();
}
template<class T>
class A
 {
  private:
     int x{ 3 };
public:
      void fun(vector<string> &v)
      {
         std::cout << v[0].c_str() << endl;
          x = 5;
      }

};

     int main()
    {
        thread_pool tp(2);
         vector<string> v{ "1", "2" };
          A<int> a;
          tp.submit([&] { a.fun(std::ref(v)); });
         tp.submit < void()>(fun);
         std::this_thread::sleep_for(std::chrono::seconds(10));
         return 0;
    }

Aucun commentaire:

Enregistrer un commentaire