mercredi 2 décembre 2020

How to handle condition variable missed signal from another thread

So in the compilable code below, I'm sending a Query message to be handled by another thread and I want to wait for a response or timeout if it hits a certain timeout. I don't know why the wait_until is missing the signal and hitting the timeout period when it should not be doing that. It only happens if the handler is returning a response REALLY fast. How do you propose I fix the code below?

#include <mutex>
#include <memory>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <iostream>
#include <queue>
#include <zconf.h>

class Question
{

};

class Answer
{
public:
    bool isAnswered = false;
};

class Query
{
    std::condition_variable     _cv;
    std::mutex                  _mutex;
    std::atomic_bool            _questionAnswered;
    std::atomic_bool            _questionSet;

    std::shared_ptr<Question>   _question;
    std::shared_ptr<Answer>     _answer;

public:
    void setQuestion(std::shared_ptr<Question> & question)
    {
        if(!_questionSet)
        {
            _question = question;
            _questionSet = true;
        }
    };

    void setAnswer(std::shared_ptr<Answer> answer)
    {
        std::lock_guard<std::mutex> lock(_mutex);
        if(!_questionAnswered)
        {
            // Set the answer and notify the getAnswerWithTimeout() to unlock if holding
            _answer = answer;
            _questionAnswered = true;
            _cv.notify_all();
        }
    };

    std::shared_ptr<Answer> getAnswerWithTimeout(uint64_t micros)
    {
        std::unique_lock<std::mutex> lock(_mutex);

        if(!_questionAnswered)
        {
            auto now = std::chrono::system_clock::now();

            // When timeout occurs, lock down this class, set the answer as null, and set error to timeout
            if (!_cv.wait_until(lock, now + std::chrono::microseconds(micros), [&]() { return (bool)_questionAnswered; }) )
            {
                _answer = nullptr;
                _questionAnswered = true;
            }
        }

        return _answer;
    };
};

int function_to_run(std::shared_ptr<Query> query)
{
    // Respond to query and set the answer
    auto answer = std::make_shared<Answer>();
    answer->isAnswered = true;

    // Set the response answer
    query->setAnswer(answer);
}



std::queue<std::shared_ptr<Query>> queryHandler;
std::mutex queryHandlerMutex;
std::condition_variable queryHandlerCv;

void handleQueryHandler()
{
    while(true)
    {
        if(!queryHandler.empty())
        {
            // Pop off item from queue
            auto query = queryHandler.front();
            queryHandler.pop();

            // Process query with function
            function_to_run(query);
        }
        else
        {
            // If query handler is empty, wait for signal
            std::unique_lock<std::mutex> lock(queryHandlerMutex);
            queryHandlerCv.wait(lock);
        }
    }
}

void insertIntoQueryHandler(std::shared_ptr<Query> & query)
{
    // Insert into Query Handler
    queryHandler.emplace(query);
    // Notify query handler to start if locked on empty
    queryHandlerCv.notify_one();
}

std::shared_ptr<Answer>
ask(std::shared_ptr<Query> query, uint64_t timeoutMicros=0)
{
    std::shared_ptr<Answer> answer = nullptr;

    // Send Query to be handled by external thread
    insertIntoQueryHandler(query);

    // Hold for the answer to be returned with timeout period
    answer = query->getAnswerWithTimeout(timeoutMicros);

    return answer;
}


int main()
{
    // Start Up Query Handler thread to handle Queries
    std::thread queryHandlerThread(handleQueryHandler);
    queryHandlerThread.detach();

    // Create queries in infinite loop and process
    while(true)
    {
        auto question = std::make_shared<Question>();
        auto query = std::make_shared<Query>();
        query->setQuestion(question);

        auto answer = ask(query, 1000);
        if(!answer)
        {
            std::cout << "Query Timed out after 1000us" << std::endl;
        }
    }
}

Aucun commentaire:

Enregistrer un commentaire