vendredi 25 septembre 2020

Lockless queue using std::atomic

I wish to create a lockless queue using std::atomic.
Here's my probably not so good first attempt at trying to do so:

template <typename T>
class atomic_queue
    using value_type = T;
    struct node
        value_type m_value;
        node* m_next;
        node* m_prev;

        node(const value_type& value) :
            m_prev(nullptr) {}
    std::atomic<node*> m_head = nullptr;
    std::atomic<node*> m_tail = nullptr;
    void push(const value_type& value)
        auto new_node = new node(value);

        node* tmp = nullptr;
        if (m_tail.compare_exchange_strong(tmp, new_node))
  , std::memory_order_relaxed);

        node* old_tail;
        do {
            old_tail = m_tail;
            new_node->m_prev = old_tail;
        } while (!m_tail.compare_exchange_strong(old_tail, new_node));
        new_node->m_prev->m_next = new_node;

    void pop()
        if (m_head.load(std::memory_order_relaxed) == nullptr)

        node* tmp = nullptr;
        node* head = m_head;
        if (m_tail.compare_exchange_strong(head, tmp))
  , std::memory_order_relaxed);

        node* old_head;
        do {
            old_head = m_head;
        } while (m_head && !m_head.compare_exchange_strong(old_head, old_head->m_next));
        if (old_head)
            delete old_head;

    bool empty()
        return m_head.load(std::memory_order_relaxed) == nullptr;

    value_type& front()
        node* head = m_head.load(std::memory_order_acquire);
        return head->m_value;

Something to note here is that I store m_prev on node so that I could update the m_next of m_tail after successful push without actually doing so via m_tail incase it was changed already by another thread. So even if another thread got to push a new value already, the current thread would still link what it saw as the m_tail's m_next to the new node.

Now there're a few things that are not really thread-safe as far as I can tell and which I can't really think of a good way to solve these problems:

Let's assume thread1 pops from the queue the one and only item then we go inside the following if statement:

        node* tmp = nullptr;
        node* head = m_head;
        if (m_tail.compare_exchange_strong(head, tmp))
            // Now thread2 kicks in
  , std::memory_order_relaxed);

And let's assume thread2 kicks in at the marked spot to push a new value to the queue the the following statement will be executed:

        node* tmp = nullptr;
        if (m_tail.compare_exchange_strong(tmp, new_node))
  , std::memory_order_relaxed);

and let us assume it finished it's pushing without thread1 continuing and only then thread1 continues, then thread1 will execute:, std::memory_order_relaxed);

and will basically undo thread2's push by setting m_head to nullptr. As far as I can understand memory orders can't help me in this scenario so I'm not sure what my options are?

Another problematic scenario is that let's assume we have 2 reader threads thread3 and thread4 doing the same job:

    while (true)
        if (!q.empty())
            int v = q.front();
            std::stringstream stream;
            stream << "thread_3/4: " << v << '\n';
            std::cout << stream.str();

And let us assume the queue is of size 1, so both of them could see that the queue is not empty and get a reference to the front data and then pop the element and print the same result.
It seems to me that locking would help in this scenario, but I do no wish to use locking and also I do not wish the reading threads to care about synchronization problems because the interface itself should be the one responsible, but since front and pop are independent I don't see a good way to handle this.
Also there's the problem that front might access nullptr, so even here I'm not sure how to handle this. I can make the interface return a raw pointer, or std::optional but both solutions seems not correct in my eyes so would love to hear opinions on what should be done here.

Also, I'm not sure if I could go away with cheaper methods other than CAS, I know I could go with unique slot approach where each thread get an index into a fixed array by using fetch_add on atomic of type std::atomic<int> slot and so each thread pushes to the queue to a unique index, but I don't like this approach since it makes the limitation of a fixed size queue. On the other hand using new and delete is probably not the fastest thing either, I could use a pool allocator of sort, but then I would have to make sure it's synchronized as-well and that's a new level of pain.

I'm not even sure these are all the problems, these are the problems that I could spot with my implementation, I'm sure I didn't think of everything (or maybe I did?), anyway would love to hear yours thoughts on the described problems and maybe ways to overcome them.

Aucun commentaire:

Enregistrer un commentaire