I have a simple thread pool implementation. and the implementation is below-
The problem is when i insert an item in the queue it does not get reflected in thread pool function void worker_thread() i.e. the check if (work_queue.try_pop(task)) in worker_thread() fucntion is always failing and tasks are not picked up by the threads. Can you please point out what am i doing wrong?
Thanks in advance.
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(new_value));
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty(); });
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty(); });
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
class thread_pool
{
threadsafe_queue<std::function<void()>> work_queue;
std::atomic<bool> done;
int thread_count;
std::vector<std::thread> threads;
void worker_thread()
{
while (!done)
{
std::function<void()> task;
if (work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
}
public:
thread_pool(unsigned thread_count = 5) : done(false), thread_count(thread_count)
{
try
{
for (unsigned i = 0; i < thread_count; ++i)
{
threads.emplace_back(std::thread(&thread_pool::worker_thread, this));
}
}
catch (...)
{
done = true;
throw;
}
}
~thread_pool()
{
done = true;
for (unsigned i = 0; i < thread_count; ++i)
{
threads[i].join();
}
}
template <typename FunctionType>
void submit(FunctionType f)
{
work_queue.push(std::function<void()>(f)); // how
}
};
void fun()
{
cout << "hi"<<this_thread::get_id();
}
template<class T>
class A
{
private:
int x{ 3 };
public:
void fun(vector<string> &v)
{
std::cout << v[0].c_str() << endl;
x = 5;
}
};
int main()
{
thread_pool tp(2);
vector<string> v{ "1", "2" };
A<int> a;
tp.submit([&] { a.fun(std::ref(v)); });
tp.submit < void()>(fun);
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
Aucun commentaire:
Enregistrer un commentaire