vendredi 5 janvier 2018

Thread pool triggers data race in Helgrind

I am playing with the Thread Pool that is listed here to try and improve my understanding of the C++11-17 threading primitives. Simultaneously I am trying to understand how to use Helgrind effectively to detect data races.

Running a very simple minimal example, with just a single job for the 8-thread pool, Helgrind detects a staggering number of data races. I am trying to figure out why.

The code for the Thread Pool and minimum working example is listed here:

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>

template <typename T>
class ThreadSafeQueue
{
public:
    ~ThreadSafeQueue() { invalidate(); }

    bool try_pop(T & out)
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        if (m_queue.empty() || !m_valid.load())
        {
            return false;
        }

        out = std::move(m_queue.front());
        m_queue.pop();
        return true;
    }

    bool wait_pop(T & out)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_condition.wait(lock, [this]()
        {
            return !m_queue.empty() || !m_valid.load();
        });

        if (!m_valid.load())
        {
            return false;
        }

        out = std::move(m_queue.front());
        m_queue.pop();
        return true;
    }

    void push(T value)
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_queue.push(std::move(value));
        m_condition.notify_one();
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        return m_queue.empty();
    }

    void clear()
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        while (!m_queue.empty())
        {
            m_queue.pop();
        }
        m_condition.notify_all();
    }

    void invalidate()
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_valid.exchange(false);
        m_condition.notify_all();
    }

    bool is_valid() const
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        return m_valid.load();
    }

private:
    std::atomic_bool        m_valid{true};
    mutable std::mutex      m_mutex;
    std::queue<T>           m_queue;
    std::condition_variable m_condition;

}; // class ThreadSafeQueue

class ThreadPool
{
private:
    class IThreadTask
    {
    public:
        IThreadTask() = default;
        virtual ~IThreadTask() = default;
        IThreadTask(const IThreadTask & rhs) = delete;
        IThreadTask & operator=(const IThreadTask & rhs) = delete;
        IThreadTask(IThreadTask && other) = default;
        IThreadTask & operator=(IThreadTask && other) = default;

        virtual void execute() = 0;

    }; // class IThreadTask

    template <typename Func>
    class ThreadTask : public IThreadTask
    {
    public:
        ThreadTask(Func && func)
            : m_func(std::move(func)) {}

        ~ThreadTask() override = default;
        ThreadTask(const ThreadTask & rhs) = delete;
        ThreadTask & operator=(const ThreadTask & rhs) = delete;
        ThreadTask(ThreadTask && other) = default;
        ThreadTask & operator=(ThreadTask && other) = default;

        void execute() override
        {
            m_func();
        }

    private:
        Func m_func;
    }; // class ThreadTask

public:
    template <typename T>
    class TaskFuture
    {
    public:
        TaskFuture(std::future<T> && future)
            : m_future(std::move(future)) {}

        TaskFuture(const TaskFuture & rhs) = delete;
        TaskFuture & operator=(const TaskFuture & rhs) = delete;
        TaskFuture(TaskFuture && rhs) = default;
        TaskFuture & operator=(TaskFuture && rhs) = default;
        ~TaskFuture()
        {
            if (m_future.valid())
            {
                m_future.get();
            }
        }

        auto get()
        {
            return m_future.get();
        }

    private:
        std::future<T> m_future;

    }; // class TaskFuture

    ThreadPool() : ThreadPool(std::max(std::thread::hardware_concurrency(), 1u)) {}

    explicit ThreadPool(const std::uint32_t num_threads)
    {
        try
        {
            for (std::uint32_t i = 0u; i < num_threads; ++i)
            {
                m_threads.emplace_back(&ThreadPool::worker, this);
            }
        } catch (...) {
            destroy();
            throw;
        }
        std::cout << "Started thread pool with " << m_threads.size() << " threads" << std::endl;
    }

    ThreadPool(const ThreadPool & rhs) = delete;

    ThreadPool & operator=(const ThreadPool & rhs) = delete;

    ~ThreadPool()
    {
        destroy();
    }

    template <typename Func, typename... Args>
    auto submit(Func && func, Args && ... args)
    {
        auto bound_task = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
        using ResultType = std::result_of_t<decltype(bound_task)()>;
        using PackagedTask = std::packaged_task<ResultType()>;
        using TaskType = ThreadTask<PackagedTask>;

        PackagedTask task(std::move(bound_task));
        TaskFuture<ResultType> result(task.get_future());
        m_work_queue.push(std::make_unique<TaskType>(std::move(task)));
        return result;
    }

private:
    void worker()
    {
        while (!m_done.load())
        {
            std::unique_ptr<IThreadTask> p_task(nullptr);
            if (m_work_queue.wait_pop(p_task))
            {
                p_task->execute();
            }
        }
    }

    void destroy()
    {
        m_done.exchange(true);
        m_work_queue.invalidate();
        for (auto & thread : m_threads)
        {
            if (thread.joinable())
            {
                thread.join();
            }
        }
    }

    std::atomic_bool m_done{false};
    ThreadSafeQueue<std::unique_ptr<IThreadTask>> m_work_queue;
    std::vector<std::thread> m_threads;

}; // class ThreadPool

namespace DefaultThreadPool
{

inline ThreadPool & get_thread_pool()
{
    static ThreadPool default_pool(8);
    return default_pool;
}

template <typename Func, typename... Args>
inline auto submit_job(Func && func, Args && ... args)
{
    return get_thread_pool().submit(std::forward<Func>(func), std::forward<Args>(args)...);
}

} // namespace DefaultThreadPool

int main(int argc, char * argv[])
{
    auto future = DefaultThreadPool::submit_job([](int x) { std::cout << x * 5 << std::endl; }, 5);
}

When compiled with GCC 6.2.0 with debug info and -O0, and run through Helgrind using the command valgrind --tool=helgrind ./mwe I get more than 8000 lines of errors, such as:

==11453== Possible data race during read of size 8 at 0x3632E0 by thread #1
==11453== Locks held: none
==11453==    at 0x13A484: ThreadPool*&& std::forward<ThreadPool*>(std::remove_reference<ThreadPool*>::type&) (move.h:77)
==11453==    by 0x13A57C: void std::vector<std::thread, std::allocator<std::thread> >::emplace_back<void (ThreadPool::*)(), ThreadPool*>(void (ThreadPool::*&&)(), ThreadPool*&&) (vector.tcc:101)
==11453==    by 0x137135: ThreadPool::ThreadPool(unsigned int) (mwe.cpp:174)
==11453==    by 0x137828: DefaultThreadPool::get_thread_pool() (mwe.cpp:243)
==11453==    by 0x1311E9: auto DefaultThreadPool::submit_job<main::{lambda(int)#1}, int>(main::{lambda(int)#1}&&, int&&) (mwe.cpp:250)
==11453==    by 0x131297: main (mwe.cpp:257)
==11453== 
==11453== This conflicts with a previous write of size 8 by thread #2
==11453== Locks held: none
==11453==    at 0x13A48F: ThreadPool*&& std::forward<ThreadPool*>(std::remove_reference<ThreadPool*>::type&) (move.h:77)
==11453==    by 0x146249: void std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (functional:1399)
==11453==    by 0x145FD7: std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::operator()() (functional:1389)
==11453==    by 0x145F33: std::thread::_State_impl<std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)> >::_M_run() (thread:196)
==11453==    by 0x4EFB50E: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==11453==    by 0x4C34C86: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==11453==    by 0x53E66C9: start_thread (pthread_create.c:333)
==11453==    by 0x5705CAE: clone (clone.S:105)
==11453==  Address 0x3632e0 is 0 bytes inside data symbol "__gcov0._ZSt7forwardIP10ThreadPoolEOT_RNSt16remove_referenceIS2_E4typeE"

and

==11453== Possible data race during read of size 8 at 0x35FCF0 by thread #3
==11453== Locks held: none
==11453==    at 0x146130: std::tuple_element<0ul, std::tuple<std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*> >::type& std::get<0ul, std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*>(std::tuple<std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*>&) (tuple:1254)
==11453==    by 0x14626A: void std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (functional:1399)
==11453==    by 0x145FD7: std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::operator()() (functional:1389)
==11453==    by 0x145F33: std::thread::_State_impl<std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)> >::_M_run() (thread:196)
==11453==    by 0x4EFB50E: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==11453==    by 0x4C34C86: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==11453==    by 0x53E66C9: start_thread (pthread_create.c:333)
==11453==    by 0x5705CAE: clone (clone.S:105)
==11453== 
==11453== This conflicts with a previous write of size 8 by thread #2
==11453== Locks held: none
==11453==    at 0x14613B: std::tuple_element<0ul, std::tuple<std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*> >::type& std::get<0ul, std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*>(std::tuple<std::_Mem_fn<void (ThreadPool::*)()>, ThreadPool*>&) (tuple:1254)
==11453==    by 0x14626A: void std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (functional:1399)
==11453==    by 0x145FD7: std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)>::operator()() (functional:1389)
==11453==    by 0x145F33: std::thread::_State_impl<std::_Bind_simple<std::_Mem_fn<void (ThreadPool::*)()> (ThreadPool*)> >::_M_run() (thread:196)
==11453==    by 0x4EFB50E: ??? (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.22)
==11453==    by 0x4C34C86: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==11453==    by 0x53E66C9: start_thread (pthread_create.c:333)
==11453==    by 0x5705CAE: clone (clone.S:105)
==11453==  Address 0x35fcf0 is 0 bytes inside data symbol "__gcov0._ZSt3getILm0EJSt7_Mem_fnIM10ThreadPoolFvvEEPS1_EERNSt13tuple_elementIXT_ESt5tupleIJDpT0_EEE4typeERSA_"
==11453== 

These errors are quite opaque to me. Can someone clarify what's happening here, and why there are so many data races detected?

Aucun commentaire:

Enregistrer un commentaire