lundi 1 juin 2015

unable to add elements to thread safe locking queue of shared pointers

I'm trying to create an inter thread message based communications using C++11 concurrency techniques. Anthony William's book 'Concurrency in Action' describes a thread safe locking queue which this implementation is based on. The difference between the thread safe locking queue that is described in the book and the one I want to implement is firstly that I am using universal references to forward queue elements to the blocking queue, and secondly (and this is where things are going wrong) I need to be able to store a queue of std::shared_ptr pointers as the template types consist of a simple message class hierarchy with an abstract base class and sub-classes with the actual specialized messages. I need to use shared pointers to avoid data slicing.

To this end I changed Anthony William's implementation of the underlying message queue from:

std::queue<T> data_queue

to a

std::queue<std::shared_ptr<T>> data_queue

but then when I attempt to push message pointers on the queue via the universal reference perfect forwarding signature, I get all sorts of errors.

The way I would like to be able to add message on this queue is as follows:

UtlThreadSafeQueue<BaseMessageType>& mDataLoadSessionQ;
auto message = std::make_shared<DerivedType>(1,2,3);
mDataLoadSessionQ.push(BaseType);

With the above code, the compiler complains indicating something along the following lines error C2664: 'void UtlThreadSafeQueue::push(T &&)' : cannot convert argument 1 from 'std::shared_ptr' to 'BaseMessageType &&' with T=BaseMessageType

I think I need some way to specialize pointer types but I am not sure.

My implementation is as follows:

/*
** code adapted from Anthony Williams's book C++ Concurrency in Action
** Pages 74-75.
**
*/

#ifndef _utlThreadSafeQueue_h_
#define _utlThreadSafeQueue_h_

// SYSTEM INCLUDES
#include <atomic>
#include <queue>
#include <limits>
#include <memory>
#include <mutex>
#include <condition_variable>

// APPLICATION INCLUDES
// MACROS
#if defined (_WIN32) && (defined (max) || defined (min))
    // Windows uses min/max macros
    #undef min
    #undef max
#endif

// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS

template<typename T>
class UtlThreadSafeQueue {
private:
    mutable std::mutex mut;
    std::queue<std::shared_ptr<T>> data_queue;
    std::condition_variable data_cond;
    std::size_t capacity;
    std::atomic<bool> shutdownFlag;
public:
    explicit UtlThreadSafeQueue(const size_t& rCapacity =
        std::numeric_limits<std::size_t>::max())
        : mut()
        , data_queue()
        , data_cond()
        , capacity(rCapacity)
        , shutdownFlag(false)
    {}

    UtlThreadSafeQueue(UtlThreadSafeQueue const& rhs) {
        std::lock_guard<std::mutex> lock(rhs.mut);
        data_queue = rhs.data_queue;
    }

    virtual ~UtlThreadSafeQueue() = default;

    // move aware push
    inline void push(T&& value) {
        std::unique_lock<std::mutex> lock(mut);
        // only add the value on the stack if there is room
        data_cond.wait(lock,[this]{return (data_queue.size() < capacity) || shutdownFlag;});
        data_queue.emplace(std::forward<T>(value));
        data_cond.notify_one();
    }

    // wait for non empty lambda condition before returning value
    inline void wait_and_pop(T& rValue) {
        std::unique_lock<std::mutex> lock(mut);
        data_cond.wait(lock,[this]{return !data_queue.empty();});
        // ideally should return an invalid value
        if (!shutdownFlag) {
            rValue = data_queue.front();
            data_queue.pop();
        }
    }

    // wait for non empty lambda condition before returning shared pointer to value
    inline std::shared_ptr<T> wait_and_pop() {
        std::unique_lock<std::mutex> lock(mut);
        data_cond.wait(lock,[this]{return !data_queue.empty() || shutdownFlag;});
        if (shutdownFlag) {
            std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
            data_queue.pop();
            return res;
        }
        return nullptr;
    }

    // return value in specified reference and flag indicating whether value
    // successfully returned or not
    inline bool try_pop(T& rValue) {
        std::lock_guard<std::mutex> lock(mut);
        if (data_queue.empty()) {
            return false;
        }
        rValue = data_queue.front();
        data_queue.pop();
        return true;
    }

    // return shared pointer to value - which if set to nullptr,
    // indicates container was empty at the time of the call.
    inline std::shared_ptr<T> try_pop() {
        std::lock_guard<std::mutex> lock(mut);
        if (data_queue.empty()) {
            return std::shared_ptr<T>();
        }
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }
    // thread safe method to check if the queue is empty
    // note that if it is empty
    inline bool empty() const {
        std::lock_guard<std::mutex> lock(mut);
        return data_queue.empty();
    }

    // shutdown support - wake up potentially sleeping queues
    inline void shutdown() {
        shutdownFlag = true;
        data_cond.notify_all();
    }
};

#endif // _utlThreadSafeQueue_h_

Aucun commentaire:

Enregistrer un commentaire