lundi 20 août 2018

Managing queue in second thread

I created thread which is responsible for doing some long operation on passed data. All data are put to the queue in main thread, then second thread gets this data, manipulate them and returns to map which contains ID of executed data and result. Class looks like that:

#include <memory>
#include <future>
#include <vector>
#include <map>
#include <set>
#include <queue>
#include <condition_variable>
#include <iostream>
#include <sstream>

typedef unsigned int Key;

class DatabaseWorker
{
public:
    DatabaseWorker()
    {
        startThread();
    }

    ~DatabaseWorker()
    {
        stopThread();
    }

    void startThread()
    {
        std::cout << "starting thread" << std::endl;
        m_futureObj = m_exitSignal.get_future();
        m_thread = std::thread(&DatabaseWorker::work, this);
    }

    void stopThread()
    {
        std::cout << "stopping thread" << std::endl;
        m_exitSignal.set_value();
        if (m_thread.joinable())
            m_thread.join();
    }

    void work()
    {
        std::cout << "thread started" << std::endl;
        bool bEmpty;
        std::pair<Key, std::string> data;
        while (m_futureObj.wait_for(std::chrono::milliseconds(1)) == std::future_status::timeout)
        {
            //Check queue size
            {
                std::lock_guard<std::mutex> lock(m_mutex);
                bEmpty = m_requests.empty();
            }
            if (!bEmpty)
            {
                std::cout << "something in queue" << std::endl;
                //Get data from queue
                {
                    std::lock_guard<std::mutex> lock(m_mutex);
                    data = m_requests.front();
                    m_requests.pop();
                }

                Key id = data.first;
                std::string str = data.second;
                //HERE do some operation with string
                str += "_123";

                //Prepare result
                {
                    std::lock_guard<std::mutex> lock(m_mutex);
                    m_results.insert(std::make_pair(id, str));

                    //Inform conditional about data
                    m_conditionals.find(id)->second.second = true;
                    m_conditionals.find(id)->second.first->notify_one();
                }
            }
        }
    }

    Key addToQueue(const std::string &data)
    {
        std::cout << "adding to queue" << std::endl;
        std::lock_guard<std::mutex> lock(m_mutex);  //lock
        m_requests.push(std::make_pair(m_nID, data));   //adddata to queue
        m_conditionals.insert(std::make_pair(m_nID, std::make_pair(std::make_shared<std::condition_variable>(), bool(false))));
        return m_nID++;
    }

    std::string getResult(Key id)
    {
        std::cout << "getting result" << std::endl;
        std::unique_lock<std::mutex> lock(m_mutex);
        //Check if data ready
        m_conditionals.find(id)->second.first->wait(lock, [&](){
            return m_conditionals.find(id)->second.second;  //return bool data
        });

        //If data ready
        auto result = m_results.find(id)->second;   //Get data from thread
        //Delete data
        m_results.erase(m_results.find(id));    //HERE HEAP ISSUE OCCURS SOMETIMES!!!
        return result;
    }

private:
    std::queue<std::pair<Key, std::string>> m_requests;
    std::map<Key, std::string> m_results;
    std::map < Key, std::pair< std::shared_ptr<std::condition_variable>, bool>> m_conditionals;

    std::promise<void> m_exitSignal;
    std::future<void> m_futureObj;
    std::mutex m_mutex;
    std::thread m_thread;

    Key m_nID = 0;
};

Then in main function I'm adding some data and waiting for response from thread:

int main()
{
    DatabaseWorker dbWorker;
    std::ostringstream stringStream;
    std::string data;
    std::vector<Key> keys;
    int i;
    for (i = 0; i < 100; i++)
    {
        stringStream.str("");
        stringStream << "data_" << i;
        data = stringStream.str();
        keys.push_back(dbWorker.addToQueue(data));
    }

    for each (Key key in keys)
    {
        std::cout << "key: " << key << " data: " << dbWorker.getResult(key) << std::endl;
    }
}

The problem is that sometimes during calling getResult() method I get memory error in place m_results.erase(m_results.find(id)):

Debug Assertion Failed!
Expression _pFirstBlock == pHead

Where is the lack of my understanding/implementation?

Aucun commentaire:

Enregistrer un commentaire