I'm trying to create an inter thread message based communications using C++11 concurrency techniques. Anthony William's book 'Concurrency in Action' describes a thread safe locking queue which this implementation is based on. The difference between the thread safe locking queue that is described in the book and the one I want to implement is firstly that I am using universal references to forward queue elements to the blocking queue, and secondly (and this is where things are going wrong) I need to be able to store a queue of std::shared_ptr pointers as the template types consist of a simple message class hierarchy with an abstract base class and sub-classes with the actual specialized messages. I need to use shared pointers to avoid data slicing.
To this end I changed Anthony William's implementation of the underlying message queue from:
std::queue<T> data_queue
to a
std::queue<std::shared_ptr<T>> data_queue
but then when I attempt to push message pointers on the queue via the universal reference perfect forwarding signature, I get all sorts of errors.
The way I would like to be able to add message on this queue is as follows:
UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);
With the above code, the compiler complains indicating something along the following lines error C2664: 'void UtlThreadSafeQueue::push(T &&)' : cannot convert argument 1 from 'std::shared_ptr' to 'BaseMessageType &&' with T=BaseMessageType
I think I need some way to specialize pointer types but I am not sure.
My implementation is as follows:
/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/
#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_
// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>
// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
// Windows uses min/max macros
#undef min
#undef max
#endif
// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS
template<typename T>
class UtlThreadSafeQueue {
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
std::size_t capacity;
std::atomic<bool> shutdownFlag;
public:
explicit UtlThreadSafeQueue(const size_t& rCapacity =
std::numeric_limits<std::size_t>::max())
: mut()
, data_queue()
, data_cond()
, capacity(rCapacity)
, shutdownFlag(false)
{}
UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
std::lock_guard<std::mutex> lock(rhs.mut);
data_queue = rhs.data_queue;
}
virtual ~UtlThreadSafeQueue() = default;
// move aware push
inline void push(T&& value) {
std::unique_lock<std::mutex> lock(mut);
// only add the value on the stack if there is room
data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
data_queue.emplace(std::forward<T>(value));
data_cond.notify_one();
}
// wait for non empty lambda condition before returning value
inline void wait_and_pop(T& rValue) {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty();});
// ideally should return an invalid value
if (!shutdownFlag) {
rValue = data_queue.front();
data_queue.pop();
}
}
// wait for non empty lambda condition before returning shared pointer to value
inline std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mut);
data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
if (shutdownFlag) {
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
return nullptr;
}
// return value in specified reference and flag indicating whether value
// successfully returned or not
inline bool try_pop(T& rValue) {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return false;
}
rValue = data_queue.front();
data_queue.pop();
return true;
}
// return shared pointer to value - which if set to nullptr,
// indicates container was empty at the time of the call.
inline std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mut);
if (data_queue.empty()) {
return std::shared_ptr<T>();
}
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
// thread safe method to check if the queue is empty
// note that if it is empty
inline bool empty() const {
std::lock_guard<std::mutex> lock(mut);
return data_queue.empty();
}
// shutdown support - wake up potentially sleeping queues
inline void shutdown() {
shutdownFlag = true;
data_cond.notify_all();
}
};
#endif // _utlThreadSafeQueue_h_
Aucun commentaire:
Enregistrer un commentaire