So in the compilable code below, I'm sending a Query message to be handled by another thread and I want to wait for a response or timeout if it hits a certain timeout. I don't know why the wait_until is missing the signal and hitting the timeout period when it should not be doing that. It only happens if the handler is returning a response REALLY fast. How do you propose I fix the code below?
#include <mutex>
#include <memory>
#include <condition_variable>
#include <atomic>
#include <thread>
#include <iostream>
#include <queue>
#include <zconf.h>
class Question
{
};
class Answer
{
public:
bool isAnswered = false;
};
class Query
{
std::condition_variable _cv;
std::mutex _mutex;
std::atomic_bool _questionAnswered;
std::atomic_bool _questionSet;
std::shared_ptr<Question> _question;
std::shared_ptr<Answer> _answer;
public:
void setQuestion(std::shared_ptr<Question> & question)
{
if(!_questionSet)
{
_question = question;
_questionSet = true;
}
};
void setAnswer(std::shared_ptr<Answer> answer)
{
std::lock_guard<std::mutex> lock(_mutex);
if(!_questionAnswered)
{
// Set the answer and notify the getAnswerWithTimeout() to unlock if holding
_answer = answer;
_questionAnswered = true;
_cv.notify_all();
}
};
std::shared_ptr<Answer> getAnswerWithTimeout(uint64_t micros)
{
std::unique_lock<std::mutex> lock(_mutex);
if(!_questionAnswered)
{
auto now = std::chrono::system_clock::now();
// When timeout occurs, lock down this class, set the answer as null, and set error to timeout
if (!_cv.wait_until(lock, now + std::chrono::microseconds(micros), [&]() { return (bool)_questionAnswered; }) )
{
_answer = nullptr;
_questionAnswered = true;
}
}
return _answer;
};
};
int function_to_run(std::shared_ptr<Query> query)
{
// Respond to query and set the answer
auto answer = std::make_shared<Answer>();
answer->isAnswered = true;
// Set the response answer
query->setAnswer(answer);
}
std::queue<std::shared_ptr<Query>> queryHandler;
std::mutex queryHandlerMutex;
std::condition_variable queryHandlerCv;
void handleQueryHandler()
{
while(true)
{
if(!queryHandler.empty())
{
// Pop off item from queue
auto query = queryHandler.front();
queryHandler.pop();
// Process query with function
function_to_run(query);
}
else
{
// If query handler is empty, wait for signal
std::unique_lock<std::mutex> lock(queryHandlerMutex);
queryHandlerCv.wait(lock);
}
}
}
void insertIntoQueryHandler(std::shared_ptr<Query> & query)
{
// Insert into Query Handler
queryHandler.emplace(query);
// Notify query handler to start if locked on empty
queryHandlerCv.notify_one();
}
std::shared_ptr<Answer>
ask(std::shared_ptr<Query> query, uint64_t timeoutMicros=0)
{
std::shared_ptr<Answer> answer = nullptr;
// Send Query to be handled by external thread
insertIntoQueryHandler(query);
// Hold for the answer to be returned with timeout period
answer = query->getAnswerWithTimeout(timeoutMicros);
return answer;
}
int main()
{
// Start Up Query Handler thread to handle Queries
std::thread queryHandlerThread(handleQueryHandler);
queryHandlerThread.detach();
// Create queries in infinite loop and process
while(true)
{
auto question = std::make_shared<Question>();
auto query = std::make_shared<Query>();
query->setQuestion(question);
auto answer = ask(query, 1000);
if(!answer)
{
std::cout << "Query Timed out after 1000us" << std::endl;
}
}
}
Aucun commentaire:
Enregistrer un commentaire