I implemented a ThreadPool to test my knowledge of C++ concurrency. However, when I run the following code, it does not proceed and my mac becomes extremely slow and eventually does not respond—I check the monitor latter and find the reason is the kernel_task launches several clang processes which runs all the CPU. I've carefully gone through the code several times, but still unable to locat the problem.
Here's the test code for ThreadPool. I omit all #include for simplicity. When I run this code, there is nothing printed on the terminal. Worse still, even if I cancel the process(via contrl+c), kernel_task creates several clang later and my computer crashes.
// test code for ThreadPool
#include <iostream>
#include <functional>
#include <future>
#include "thread_pool.hpp"
int task() {
static std::atomic<int> i = 1;
std::cout << i.fetch_add(1, std::memory_order_relaxed) << "task\n";
return i.load(std::memory_order_relaxed);
}
int main() {
ThreadPool<int()> thread_pool(1);
auto f1 = thread_pool.submit(task, false);
std::cout << "hello" << '\n';
auto f2 = thread_pool.submit(task, false);
std::cout << f1.get() << '\n';
std::cout << f2.get() << '\n';
}
Here's the definition of ThreadPool.
// thread_pool.hpp
#include <atomic>
#include <algorithm>
#include <chrono>
#include <functional>
#include <future>
#include <memory>
#include <queue>
#include <thread>
#include "queue.hpp"
template<typename Func>
class ThreadPool {
public:
ThreadPool(std::size_t=std::thread::hardware_concurrency()); // should I minus one here for the main thread?
~ThreadPool();
template<typename... Args,
typename ReturnType=typename std::result_of<std::decay_t<Func>(std::decay_t<Args>...)>::type>
std::future<ReturnType> submit(Func f, bool local=true);
private:
void worker_thread();
void run_task();
using LocalThreadType = std::queue<std::packaged_task<Func>>;
static thread_local LocalThreadType local_queue; // local queue, not used for now
std::shared_ptr<LockBasedQueue<std::packaged_task<Func>, std::list<T>>> shared_queue;
std::atomic_bool done;
std::vector<std::thread> threads;
};
template<typename Func>
ThreadPool<Func>::ThreadPool(std::size_t n): done(false) {
threads.emplace_back(&ThreadPool::worker_thread, this);
}
template<typename Func>
ThreadPool<Func>::~ThreadPool() {
done.store(true, std::memory_order_relaxed);
for (auto& t: threads)
t.join();
}
template<typename Func>
template<typename...Args,
typename ReturnType>
std::future<ReturnType> ThreadPool<Func>::submit(Func f, bool local) {
auto result = local? post_task(f, local_queue):
post_task(f, *shared_queue);
return result;
}
template<typename Func>
void ThreadPool<Func>::run_task() {
if (!local_queue.empty()) {
auto task = std::move(local_queue.front());
local_queue.pop();
task();
}
else {
std::packaged_task<Func> task;
auto flag = shared_queue->try_pop(task);
if (flag)
task();
else {
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
}
}
}
template<typename Func>
void ThreadPool<Func>::worker_thread() {
while (!done.load(std::memory_order_relaxed)) {
run_task();
}
}
template<typename Func>
thread_local typename ThreadPool<Func>::LocalThreadType ThreadPool<Func>::local_queue = {};
Here's the definition of post_task and LockBasedQueue, which have passed the test code in the next code block.
// queue.hpp
#include <mutex>
#include <list>
#include <deque>
#include <queue>
#include <memory>
#include <future>
#include <condition_variable>
template<typename T, typename Container>
class LockBasedQueue; // forward declaration
template<typename Func, typename... Args,
typename ReturnType=typename std::result_of<std::decay_t<Func>(std::decay_t<Args>...)>::type,
typename Container=std::list<Func>,
typename ThreadQueue=LockBasedQueue<std::packaged_task<ReturnType(Args...)>, Container>>
std::future<ReturnType> post_task(Func f, ThreadQueue& task_queue) {
std::packaged_task<ReturnType(Args...)> task(f);
std::future res = task.get_future();
task_queue.push(std::move(task)); // packaged_task is not copyable
return res;
}
// the general template is omitted for simplicity, the folo
template<typename T>
class LockBasedQueue<T, std::list<T>> {
public:
// constructors
LockBasedQueue(): head(std::make_unique<Node>()), tail(head.get()) {}
LockBasedQueue(LockBasedQueue&&);
// assignments
LockBasedQueue& operator=(LockBasedQueue&&);
// general purpose operations
void swap(LockBasedQueue&);
bool empty() const;
std::size_t size() const;
// queue operations
void push(const T&);
void push(T&&);
template <typename... Args>
void emplace(Args&&... args);
T pop();
bool try_pop(T&);
// delete front() and back(), these functions may waste notifications. To enable these function, one should replace notify_one() with notify_all() in push() and emplace()
T& front() = delete;
const T& front() const = delete;
T& back() = delete;
const T& back() const = delete;
private:
struct Node {
std::unique_ptr<T> data; // data is a pointer as it may be empty
std::unique_ptr<Node> next;
};
Node* get_tail() {
std::lock_guard l(tail_mutex);
return tail;
}
Node* get_head() {
std::lock_guard l(head_mutex);
return head.get();
}
std::unique_lock<std::mutex> get_head_lock() {
std::unique_lock l(head_mutex);
data_cond.wait(l, [this] { return head.get() != get_tail(); });
return l;
}
T pop_data() {
auto data = std::move(*head->data);
head = std::move(head->next); // we move head to the next so that the tail is always valid
return std::move(data);
}
std::unique_ptr<Node> head;
std::mutex head_mutex;
Node* tail;
std::mutex tail_mutex;
std::condition_variable data_cond;
};
template<typename T>
LockBasedQueue<T, std::list<T>>::LockBasedQueue(
LockBasedQueue<T, std::list<T>>&& other) {
{
std::scoped_lock l(head_mutex, other.head_mutex);
head(std::move(other.data_queue));
}
{
std::lock_guard l(tail_mutex);
tail = head.get();
}
{
std::lock_guard l(other.tail_mutex);
other.tail = nullptr;
}
}
template<typename T>
LockBasedQueue<T, std::list<T>>&
LockBasedQueue<T, std::list<T>>::operator=(
LockBasedQueue<T, std::list<T>>&& rhs) {
{
std::scoped_lock l(head_mutex, rhs.head_mutex);
head(std::move(rhs.data_queue));
}
{
std::lock_guard l(tail_mutex);
tail = head.get();
}
{
std::lock_guard l(rhs.tail_mutex);
rhs.tail = nullptr;
}
}
template<typename T>
void LockBasedQueue<T, std::list<T>>::swap(
LockBasedQueue<T, std::list<T>>& other) {
{
std::scoped_lock l(head_mutex, other.head_mutex);
head(std::move(other.data_queue));
}
{
std::lock_guard l(tail_mutex);
tail = head.get();
}
{
std::lock_guard l(other.tail_mutex);
other.tail = other.head.get();
}
}
template<typename T>
inline bool LockBasedQueue<T, std::list<T>>::empty() const {
return get_head() == get_tail();
}
template<typename T>
std::size_t LockBasedQueue<T, std::list<T>>::size() const {
int n = 0;
std::lock_guard l(tail_mutex); // do not use get_tail() here to avoid race condition
for (auto p = get_head(); p != tail; p = p->next.get())
++n;
return n;
}
template<typename T>
void LockBasedQueue<T, std::list<T>>::push(const T& data) {
push(T(data));
}
template<typename T>
void LockBasedQueue<T, std::list<T>>::push(T&& data) {
{
auto p = std::make_unique<Node>();
std::lock_guard l(tail_mutex);
tail->data = std::make_unique<T>(std::move(data)); // we add data to the current tail, this allows us to move head to the next when popping
tail->next = std::move(p);
tail = tail->next.get();
}
data_cond.notify_one();
}
template<typename T>
template<typename...Args>
void LockBasedQueue<T, std::list<T>>::emplace(Args&&... args) {
{
auto p = std::make_unique<Node>();
std::lock_guard l(tail_mutex);
tail->data = std::make_unique<T>(std::forward<Args>(args)...);
tail->next = std::move(p);
tail = tail->next.get();
}
data_cond.notify_one();
}
template<typename T>
T LockBasedQueue<T, std::list<T>>::pop() {
auto l(get_head_lock());
return pop_data();
}
template<typename T>
bool LockBasedQueue<T, std::list<T>>::try_pop(T& data) {
std::lock_guard l(head_mutex);
if (head.get() == get_tail())
return false;
data = pop_data();
return true;
}
Here's the code I used to test LockBasedQueue and post_task. The following test code works without any problem.
// test code for LockBasedQueue and post_task
#include <iostream>
#include <functional>
#include <future>
#include "queue.hpp"
LockBasedQueue<std::packaged_task<int()>, std::list<std::packaged_task<int()>>> task_queue; // thread safe queue, which handles locks inside
void task_execution_thread() {
bool x = true;
while (x) { // for debugging purpose, we only execute this loop once
auto task = task_queue.pop(); // Returns the front task and removes it from queue. Waits if task_queue is empty
task(); // execute task
x = false;
}
}
int task() {
static std::atomic<int> i = 1;
std::cout << i.fetch_add(1, std::memory_order_relaxed) << "task\n";
return i.load(std::memory_order_relaxed);
}
int main() {
std::thread t1(task_execution_thread);
std::thread t2(task_execution_thread);
auto f1 = post_task(task, task_queue);
auto f2 = post_task(task, task_queue);
std::cout << "f1: " << f1.get() << '\n';
std::cout << "f2: " << f2.get() << '\n';
t1.join();
t2.join();
}
I test the code using g++ -std=c++2a on the MacOS 11.2.3. The problem happens at the run time.
Aucun commentaire:
Enregistrer un commentaire