I created thread which is responsible for doing some long operation on passed data. All data are put to the queue in main thread, then second thread gets this data, manipulate them and returns to map which contains ID of executed data and result. Class looks like that:
#include <memory>
#include <future>
#include <vector>
#include <map>
#include <set>
#include <queue>
#include <condition_variable>
#include <iostream>
#include <sstream>
typedef unsigned int Key;
class DatabaseWorker
{
public:
DatabaseWorker()
{
startThread();
}
~DatabaseWorker()
{
stopThread();
}
void startThread()
{
std::cout << "starting thread" << std::endl;
m_futureObj = m_exitSignal.get_future();
m_thread = std::thread(&DatabaseWorker::work, this);
}
void stopThread()
{
std::cout << "stopping thread" << std::endl;
m_exitSignal.set_value();
if (m_thread.joinable())
m_thread.join();
}
void work()
{
std::cout << "thread started" << std::endl;
bool bEmpty;
std::pair<Key, std::string> data;
while (m_futureObj.wait_for(std::chrono::milliseconds(1)) == std::future_status::timeout)
{
//Check queue size
{
std::lock_guard<std::mutex> lock(m_mutex);
bEmpty = m_requests.empty();
}
if (!bEmpty)
{
std::cout << "something in queue" << std::endl;
//Get data from queue
{
std::lock_guard<std::mutex> lock(m_mutex);
data = m_requests.front();
m_requests.pop();
}
Key id = data.first;
std::string str = data.second;
//HERE do some operation with string
str += "_123";
//Prepare result
{
std::lock_guard<std::mutex> lock(m_mutex);
m_results.insert(std::make_pair(id, str));
//Inform conditional about data
m_conditionals.find(id)->second.second = true;
m_conditionals.find(id)->second.first->notify_one();
}
}
}
}
Key addToQueue(const std::string &data)
{
std::cout << "adding to queue" << std::endl;
std::lock_guard<std::mutex> lock(m_mutex); //lock
m_requests.push(std::make_pair(m_nID, data)); //adddata to queue
m_conditionals.insert(std::make_pair(m_nID, std::make_pair(std::make_shared<std::condition_variable>(), bool(false))));
return m_nID++;
}
std::string getResult(Key id)
{
std::cout << "getting result" << std::endl;
std::unique_lock<std::mutex> lock(m_mutex);
//Check if data ready
m_conditionals.find(id)->second.first->wait(lock, [&](){
return m_conditionals.find(id)->second.second; //return bool data
});
//If data ready
auto result = m_results.find(id)->second; //Get data from thread
//Delete data
m_results.erase(m_results.find(id)); //HERE HEAP ISSUE OCCURS SOMETIMES!!!
return result;
}
private:
std::queue<std::pair<Key, std::string>> m_requests;
std::map<Key, std::string> m_results;
std::map < Key, std::pair< std::shared_ptr<std::condition_variable>, bool>> m_conditionals;
std::promise<void> m_exitSignal;
std::future<void> m_futureObj;
std::mutex m_mutex;
std::thread m_thread;
Key m_nID = 0;
};
Then in main function I'm adding some data and waiting for response from thread:
int main()
{
DatabaseWorker dbWorker;
std::ostringstream stringStream;
std::string data;
std::vector<Key> keys;
int i;
for (i = 0; i < 100; i++)
{
stringStream.str("");
stringStream << "data_" << i;
data = stringStream.str();
keys.push_back(dbWorker.addToQueue(data));
}
for each (Key key in keys)
{
std::cout << "key: " << key << " data: " << dbWorker.getResult(key) << std::endl;
}
}
The problem is that sometimes during calling getResult() method I get memory error in place m_results.erase(m_results.find(id)):
Debug Assertion Failed!
Expression _pFirstBlock == pHead
Where is the lack of my understanding/implementation?
Aucun commentaire:
Enregistrer un commentaire