mercredi 28 juillet 2021

Is this threadpool code attempting double execution of task?

I have copied the below threadpool implementation from https://pastebin.com/MM5kSvH6. All looks good but i can't understand the logic at Line number 32 and Line number 71. Aren't both these lines trying to execute the function? I thought in threadpool, the thread is supposed to pull task from task queue and then execute it? In that sense line 71 looks OK but i am confused by line 32. Instead of adding the task to the queue why is it trying to execute the same?

#include <condition_variable>
#include <functional>
#include <iostream>
#include <future>
#include <vector>
#include <thread>
#include <queue>
 
class ThreadPool
{
public:
    using Task = std::function<void()>;
 
    explicit ThreadPool(std::size_t numThreads)
    {
        start(numThreads);
    }
 
    ~ThreadPool()
    {
        stop();
    }
 
    template<class T>
    auto enqueue(T task)->std::future<decltype(task())>
    {
        auto wrapper = std::make_shared<std::packaged_task<decltype(task()) ()>>(std::move(task));
 
        {
            std::unique_lock<std::mutex> lock{mEventMutex};
            mTasks.emplace([=] {
                (*wrapper)();
            });
        }
 
        mEventVar.notify_one();
        return wrapper->get_future();
    }
 
private:
    std::vector<std::thread> mThreads;
 
    std::condition_variable mEventVar;
 
    std::mutex mEventMutex;
    bool mStopping = false;
 
    std::queue<Task> mTasks;
 
    void start(std::size_t numThreads)
    {
        for (auto i = 0u; i < numThreads; ++i)
        {
            mThreads.emplace_back([=] {
                while (true)
                {
                    Task task;
 
                    {
                        std::unique_lock<std::mutex> lock{mEventMutex};
 
                        mEventVar.wait(lock, [=] { return mStopping || !mTasks.empty(); });
 
                        if (mStopping && mTasks.empty())
                            break;
 
                        task = std::move(mTasks.front());
                        mTasks.pop();
                    }
 
                    task();
                }
            });
        }
    }
 
    void stop() noexcept
    {
        {
            std::unique_lock<std::mutex> lock{mEventMutex};
            mStopping = true;
        }
 
        mEventVar.notify_all();
 
        for (auto &thread : mThreads)
            thread.join();
    }
};
 
int main()
{
    {
        ThreadPool pool{36};
 
        for (auto i = 0; i < 36; ++i)
        {
            pool.enqueue([] {
                auto f = 1000000000;
                while (f > 1)
                    f /= 1.00000001;
            });
        }
    }
 
    return 0;
}

Aucun commentaire:

Enregistrer un commentaire