dimanche 28 décembre 2014

c++ thread pool with mutable threads: strange deadlock when assigning tasks to threaads

hope you had all had nice holidays.


This questions is related to my earlier question: std::condition_variable - Wait for several threads to notify observer


I'm trying to implement a threadpool based on my own mutable thread implementation below:



class MutableThread
{
private:
std::thread m_Thread;
std::function<void()> m_Function;
bool m_bRun;
std::mutex m_LockMutex;
std::mutex m_WaitMutex;
std::condition_variable m_CV;
IAsyncTemplateObserver<MutableThread>* m_Observer = nullptr;

private:
void Execute()
{
while (m_bRun)
{
{
std::unique_lock<std::mutex> wait(m_WaitMutex);
m_CV.wait(wait);
}

std::lock_guard<std::mutex> lock(m_LockMutex);
if (m_bRun && m_Function)
{
m_Function();
m_Function = std::function<void()>();

if (m_Observer != nullptr)
{
m_Observer->Signal(this);
}
}
}
}

public:
HDEBUGNAME(TEXT("MutableThread"));

MutableThread(const MutableThread& thread) = delete;

MutableThread(IAsyncTemplateObserver<MutableThread>* _Observer)
{
m_Observer = _Observer;
m_bRun = true;
m_Thread = std::thread(&MutableThread::Execute, this);
}

MutableThread()
{
m_Observer = nullptr;
m_bRun = true;
m_Thread = std::thread(&MutableThread::Execute, this);
}

~MutableThread()
{
m_bRun = false;

m_CV.notify_one();

try
{
if (m_Thread.joinable())
m_Thread.join();
}
catch (std::system_error& ex)
{
HWARNINGD(TEXT("%s"), ex.what());
}
}

inline bool Start(const std::function<void()>& f)
{
std::lock_guard<std::mutex> lock(m_LockMutex);

if (m_Function != nullptr)
return false;

m_Function = f;

m_CV.notify_one();

return true;
}


The IAsyncTemplateObserver simply derives from my IAsyncObserver class posted in the earlier question and adds a virtual function:



template <typename T>
class IAsyncTemplateObserver : public IAsyncObserver
{
public:
virtual void Signal(T* _Obj) = 0;
};


What I want to do is, signal the ThreadPool that the function has finished execution and a new task is assigned to the mutable thread:



class MutableThread;

struct Task
{
std::function<void()> m_Function;
uint32_t m_uPriority;

Task(const std::function<void()>& _Function, uint32_t _uPriority)
{
m_Function = _Function;
m_uPriority = _uPriority;
}
};

inline bool operator<(const Task& lhs, const Task& rhs)
{
return lhs.m_uPriority < rhs.m_uPriority;
}

class ThreadPool : public IAsyncTemplateObserver<MutableThread>
{
private:
std::list<MutableThread* > m_FreeThreads;
std::list<MutableThread* > m_UsedThreads;

std::set<Task> m_Tasks;

std::mutex m_LockMutex;
public:

ThreadPool()
{
//Grow(std::thread::hardware_concurrency() - 1);
}

ThreadPool(size_t n)
{
Grow(n);
}

~ThreadPool()
{
//std::lock_guard<std::mutex> lock(m_Mutex);
for (MutableThread* pUsed : m_UsedThreads)
{
HSAFE_DELETE(pUsed);
}

for (MutableThread* pFree : m_FreeThreads)
{
HSAFE_DELETE(pFree);
}
}

inline void Grow(size_t n)
{
std::lock_guard<std::mutex> lock(m_LockMutex);

for (size_t i = 0; i < n; i++)
{
m_FreeThreads.push_back(new MutableThread(this));
}
}

inline void AddTask(const Task& _Task)
{
{
std::lock_guard<std::mutex> lock(m_LockMutex);
m_Tasks.insert(_Task);
}

AssignThreads();
}

virtual void Signal(MutableThread* _pThread)
{
{
std::lock_guard<std::mutex> lock(m_LockMutex);
m_UsedThreads.remove(_pThread);
m_FreeThreads.push_back(_pThread);
}

AssignThreads();

NotifyOne();
}

inline void WaitForAllThreads()
{
bool bWait = true;
do
{
{
//check if we have to wait
std::lock_guard<std::mutex> lock(m_LockMutex);
bWait = !m_UsedThreads.empty() || !m_Tasks.empty();
}

if (bWait)
{
std::unique_lock<std::mutex> wait(m_ObserverMutex);
m_ObserverCV.wait(wait);
}

} while (bWait);
}

private:

inline void AssignThreads()
{
std::lock_guard<std::mutex> lock(m_LockMutex);

if (m_FreeThreads.empty() || m_Tasks.empty())
return;

//Get free thread
MutableThread* pThread = m_FreeThreads.back();
m_FreeThreads.pop_back();

//park thread in used list
m_UsedThreads.push_back(pThread);

//get task with highest priority
std::set<Task>::iterator it = m_Tasks.end();
--it; //last entry has highest priority

//start the task
pThread->Start(it->m_Function);

//remove the task from the list
m_Tasks.erase(it);
}


The AddTask function is called several times by the same thread, but when a mutable thread signals the threadpool (via m_Observer->Signal(this) ) the application freezes at the lock_guard of the AssignThreads() function. Now the strange thing is unlike a normal deadlock, all stackviews in Visual Studio are empty as soon is I try to step over the line with the lockguard.


Can anyone explain this behaviour? Is there any major design flaw or just a simple mix up?


Thanks for your help!


Greetings, Fabian


Aucun commentaire:

Enregistrer un commentaire