I am playing with the Thread Pool that is listed here to try and improve my understanding of the C++11-17 threading primitives. Simultaneously I am trying to understand how to use Helgrind effectively to detect data races.
Running a very simple minimal example, with just a single job for the 8-thread pool, Helgrind detects a staggering number of data races. I am trying to figure out why.
The code for the Thread Pool and minimum working example is listed here:
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
template <typename T>
class ThreadSafeQueue
{
public:
~ThreadSafeQueue() { invalidate(); }
bool try_pop(T & out)
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_queue.empty() || !m_valid.load())
{
return false;
}
out = std::move(m_queue.front());
m_queue.pop();
return true;
}
bool wait_pop(T & out)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_condition.wait(lock, [this]()
{
return !m_queue.empty() || !m_valid.load();
});
if (!m_valid.load())
{
return false;
}
out = std::move(m_queue.front());
m_queue.pop();
return true;
}
void push(T value)
{
std::lock_guard<std::mutex> lock(m_mutex);
m_queue.push(std::move(value));
m_condition.notify_one();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m_mutex);
return m_queue.empty();
}
void clear()
{
std::lock_guard<std::mutex> lock(m_mutex);
while (!m_queue.empty())
{
m_queue.pop();
}
m_condition.notify_all();
}
void invalidate()
{
std::lock_guard<std::mutex> lock(m_mutex);
m_valid.exchange(false);
m_condition.notify_all();
}
bool is_valid() const
{
std::lock_guard<std::mutex> lock(m_mutex);
return m_valid.load();
}
private:
std::atomic_bool m_valid{true};
mutable std::mutex m_mutex;
std::queue<T> m_queue;
std::condition_variable m_condition;
}; // class ThreadSafeQueue
class ThreadPool
{
private:
class IThreadTask
{
public:
IThreadTask() = default;
virtual ~IThreadTask() = default;
IThreadTask(const IThreadTask & rhs) = delete;
IThreadTask & operator=(const IThreadTask & rhs) = delete;
IThreadTask(IThreadTask && other) = default;
IThreadTask & operator=(IThreadTask && other) = default;
virtual void execute() = 0;
}; // class IThreadTask
template <typename Func>
class ThreadTask : public IThreadTask
{
public:
ThreadTask(Func && func)
: m_func(std::move(func)) {}
~ThreadTask() override = default;
ThreadTask(const ThreadTask & rhs) = delete;
ThreadTask & operator=(const ThreadTask & rhs) = delete;
ThreadTask(ThreadTask && other) = default;
ThreadTask & operator=(ThreadTask && other) = default;
void execute() override
{
m_func();
}
private:
Func m_func;
}; // class ThreadTask
public:
template <typename T>
class TaskFuture
{
public:
TaskFuture(std::future<T> && future)
: m_future(std::move(future)) {}
TaskFuture(const TaskFuture & rhs) = delete;
TaskFuture & operator=(const TaskFuture & rhs) = delete;
TaskFuture(TaskFuture && rhs) = default;
TaskFuture & operator=(TaskFuture && rhs) = default;
~TaskFuture()
{
if (m_future.valid())
{
m_future.get();
}
}
auto get()
{
return m_future.get();
}
private:
std::future<T> m_future;
}; // class TaskFuture
ThreadPool() : ThreadPool(std::max(std::thread::hardware_concurrency(), 1u)) {}
explicit ThreadPool(const std::uint32_t num_threads)
{
try
{
for (std::uint32_t i = 0u; i < num_threads; ++i)
{
m_threads.emplace_back(&ThreadPool::worker, this);
}
} catch (...) {
destroy();
throw;
}
std::cout << "Started thread pool with " << m_threads.size() << " threads" << std::endl;
}
ThreadPool(const ThreadPool & rhs) = delete;
ThreadPool & operator=(const ThreadPool & rhs) = delete;
~ThreadPool()
{
destroy();
}
template <typename Func, typename... Args>
auto submit(Func && func, Args && ... args)
{
auto bound_task = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
using ResultType = std::result_of_t<decltype(bound_task)()>;
using PackagedTask = std::packaged_task<ResultType()>;
using TaskType = ThreadTask<PackagedTask>;
PackagedTask task(std::move(bound_task));
TaskFuture<ResultType> result(task.get_future());
m_work_queue.push(std::make_unique<TaskType>(std::move(task)));
return result;
}
private:
void worker()
{
while (!m_done.load())
{
std::unique_ptr<IThreadTask> p_task(nullptr);
if (m_work_queue.wait_pop(p_task))
{
p_task->execute();
}
}
}
void destroy()
{
m_done.exchange(true);
m_work_queue.invalidate();
for (auto & thread : m_threads)
{
if (thread.joinable())
{
thread.join();
}
}
}
std::atomic_bool m_done{false};
ThreadSafeQueue<std::unique_ptr<IThreadTask>> m_work_queue;
std::vector<std::thread> m_threads;
}; // class ThreadPool
namespace DefaultThreadPool
{
inline ThreadPool & get_thread_pool()
{
static ThreadPool default_pool(8);
return default_pool;
}
template <typename Func, typename... Args>
inline auto submit_job(Func && func, Args && ... args)
{
return get_thread_pool().submit(std::forward<Func>(func), std::forward<Args>(args)...);
}
} // namespace DefaultThreadPool
int main(int argc, char * argv[])
{
auto future = DefaultThreadPool::submit_job([](int x) { std::cout << x * 5 << std::endl; }, 5);
}
When compiled with GCC 6.2.0 with debug info and -O0, and run through Helgrind using the command valgrind --tool=helgrind ./mwe I get more than 8000 lines of errors, such as:
==11453== Possible data race during read of size 8 at 0x3632E0 by thread #1
==11453== Locks held: none
==11453== at 0x13A484: ThreadPool*&& std::forward<ThreadPool*>(std::remove_reference<ThreadPool*>::type&) (move.h:77)
==11453== by 0x13A57C: void std::vector<std::thread, std::allocator<std::thread> >::emplace_back<void (ThreadPool::*)(), ThreadPool*>(void (ThreadPool::*&&)(), ThreadPool*&&) (vector.tcc:101)
==11453== by 0x137135: ThreadPool::ThreadPool(unsigned int) (mwe.cpp:174)
==11453== by 0x137828: DefaultThreadPool::get_thread_pool() (mwe.cpp:243)
==11453== by 0x1311E9: auto DefaultThreadPool::submit_job<main::{lambda(int)#1}, int>(main::{lambda(int)#1}&&, int&&) (mwe.cpp:250)
==11453== by 0x131297: main (mwe.cpp:257)
==11453==
==11453== This conflicts with a previous write of size 8 by thread #2
==11453== Locks held: none
==11453== at 0x13A48F: ThreadPool*&& std::forward<ThreadPool*>(std::remove_reference<ThreadPool*>::type&) (move.h:77)
==11453== by 0x146249: void std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (functional:1399)
==11453== by 0x145FD7: std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::operator()() (functional:1389)
==11453== by 0x145F33: std::thread::_State_impl<std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)> >::_M_run() (thread:196)
==11453== by 0x4EFB50E: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==11453== by 0x4C34C86: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==11453== by 0x53E66C9: start_thread (pthread_create.c:333)
==11453== by 0x5705CAE: clone (clone.S:105)
==11453== Address 0x3632e0 is 0 bytes inside data symbol "__gcov0._ZSt7forwardIP10ThreadPoolEOT_RNSt16remove_referenceIS2_E4typeE"
and
==11453== Possible data race during read of size 8 at 0x35FCF0 by thread #3
==11453== Locks held: none
==11453== at 0x146130: std::tuple_element<0ul, std::tuple<std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*> >::type& std::get<0ul, std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*>(std::tuple<std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*>&) (tuple:1254)
==11453== by 0x14626A: void std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (functional:1399)
==11453== by 0x145FD7: std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::operator()() (functional:1389)
==11453== by 0x145F33: std::thread::_State_impl<std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)> >::_M_run() (thread:196)
==11453== by 0x4EFB50E: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==11453== by 0x4C34C86: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==11453== by 0x53E66C9: start_thread (pthread_create.c:333)
==11453== by 0x5705CAE: clone (clone.S:105)
==11453==
==11453== This conflicts with a previous write of size 8 by thread #2
==11453== Locks held: none
==11453== at 0x14613B: std::tuple_element<0ul, std::tuple<std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*> >::type& std::get<0ul, std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*>(std::tuple<std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*>&) (tuple:1254)
==11453== by 0x14626A: void std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (functional:1399)
==11453== by 0x145FD7: std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::operator()() (functional:1389)
==11453== by 0x145F33: std::thread::_State_impl<std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)> >::_M_run() (thread:196)
==11453== by 0x4EFB50E: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==11453== by 0x4C34C86: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==11453== by 0x53E66C9: start_thread (pthread_create.c:333)
==11453== by 0x5705CAE: clone (clone.S:105)
==11453== Address 0x35fcf0 is 0 bytes inside data symbol "__gcov0._ZSt3getILm0EJSt7_Mem_fnIM10ThreadPoolFvvEEPS1_EERNSt13tuple_elementIXT_ESt5tupleIJDpT0_EEE4typeERSA_"
==11453==
These errors are quite opaque to me. Can someone clarify what's happening here, and why there are so many data races detected?
Aucun commentaire:
Enregistrer un commentaire