hope you had all had nice holidays.
This questions is related to my earlier question: std::condition_variable - Wait for several threads to notify observer
I'm trying to implement a threadpool based on my own mutable thread implementation below:
class MutableThread
{
private:
std::thread m_Thread;
std::function<void()> m_Function;
bool m_bRun;
std::mutex m_LockMutex;
std::mutex m_WaitMutex;
std::condition_variable m_CV;
IAsyncTemplateObserver<MutableThread>* m_Observer = nullptr;
private:
void Execute()
{
while (m_bRun)
{
{
std::unique_lock<std::mutex> wait(m_WaitMutex);
m_CV.wait(wait);
}
std::lock_guard<std::mutex> lock(m_LockMutex);
if (m_bRun && m_Function)
{
m_Function();
m_Function = std::function<void()>();
if (m_Observer != nullptr)
{
m_Observer->Signal(this);
}
}
}
}
public:
HDEBUGNAME(TEXT("MutableThread"));
MutableThread(const MutableThread& thread) = delete;
MutableThread(IAsyncTemplateObserver<MutableThread>* _Observer)
{
m_Observer = _Observer;
m_bRun = true;
m_Thread = std::thread(&MutableThread::Execute, this);
}
MutableThread()
{
m_Observer = nullptr;
m_bRun = true;
m_Thread = std::thread(&MutableThread::Execute, this);
}
~MutableThread()
{
m_bRun = false;
m_CV.notify_one();
try
{
if (m_Thread.joinable())
m_Thread.join();
}
catch (std::system_error& ex)
{
HWARNINGD(TEXT("%s"), ex.what());
}
}
inline bool Start(const std::function<void()>& f)
{
std::lock_guard<std::mutex> lock(m_LockMutex);
if (m_Function != nullptr)
return false;
m_Function = f;
m_CV.notify_one();
return true;
}
The IAsyncTemplateObserver simply derives from my IAsyncObserver class posted in the earlier question and adds a virtual function:
template <typename T>
class IAsyncTemplateObserver : public IAsyncObserver
{
public:
virtual void Signal(T* _Obj) = 0;
};
What I want to do is, signal the ThreadPool that the function has finished execution and a new task is assigned to the mutable thread:
class MutableThread;
struct Task
{
std::function<void()> m_Function;
uint32_t m_uPriority;
Task(const std::function<void()>& _Function, uint32_t _uPriority)
{
m_Function = _Function;
m_uPriority = _uPriority;
}
};
inline bool operator<(const Task& lhs, const Task& rhs)
{
return lhs.m_uPriority < rhs.m_uPriority;
}
class ThreadPool : public IAsyncTemplateObserver<MutableThread>
{
private:
std::list<MutableThread* > m_FreeThreads;
std::list<MutableThread* > m_UsedThreads;
std::set<Task> m_Tasks;
std::mutex m_LockMutex;
public:
ThreadPool()
{
//Grow(std::thread::hardware_concurrency() - 1);
}
ThreadPool(size_t n)
{
Grow(n);
}
~ThreadPool()
{
//std::lock_guard<std::mutex> lock(m_Mutex);
for (MutableThread* pUsed : m_UsedThreads)
{
HSAFE_DELETE(pUsed);
}
for (MutableThread* pFree : m_FreeThreads)
{
HSAFE_DELETE(pFree);
}
}
inline void Grow(size_t n)
{
std::lock_guard<std::mutex> lock(m_LockMutex);
for (size_t i = 0; i < n; i++)
{
m_FreeThreads.push_back(new MutableThread(this));
}
}
inline void AddTask(const Task& _Task)
{
{
std::lock_guard<std::mutex> lock(m_LockMutex);
m_Tasks.insert(_Task);
}
AssignThreads();
}
virtual void Signal(MutableThread* _pThread)
{
{
std::lock_guard<std::mutex> lock(m_LockMutex);
m_UsedThreads.remove(_pThread);
m_FreeThreads.push_back(_pThread);
}
AssignThreads();
NotifyOne();
}
inline void WaitForAllThreads()
{
bool bWait = true;
do
{
{
//check if we have to wait
std::lock_guard<std::mutex> lock(m_LockMutex);
bWait = !m_UsedThreads.empty() || !m_Tasks.empty();
}
if (bWait)
{
std::unique_lock<std::mutex> wait(m_ObserverMutex);
m_ObserverCV.wait(wait);
}
} while (bWait);
}
private:
inline void AssignThreads()
{
std::lock_guard<std::mutex> lock(m_LockMutex);
if (m_FreeThreads.empty() || m_Tasks.empty())
return;
//Get free thread
MutableThread* pThread = m_FreeThreads.back();
m_FreeThreads.pop_back();
//park thread in used list
m_UsedThreads.push_back(pThread);
//get task with highest priority
std::set<Task>::iterator it = m_Tasks.end();
--it; //last entry has highest priority
//start the task
pThread->Start(it->m_Function);
//remove the task from the list
m_Tasks.erase(it);
}
The AddTask function is called several times by the same thread, but when a mutable thread signals the threadpool (via m_Observer->Signal(this) ) the application freezes at the lock_guard of the AssignThreads() function. Now the strange thing is unlike a normal deadlock, all stackviews in Visual Studio are empty as soon is I try to step over the line with the lockguard.
Can anyone explain this behaviour? Is there any major design flaw or just a simple mix up?
Thanks for your help!
Greetings, Fabian
Aucun commentaire:
Enregistrer un commentaire