vendredi 25 septembre 2020

Lockless queue using std::atomic

I wish to create a lockless queue using std::atomic.
Here's my probably not so good first attempt at trying to do so:

template <typename T>
class atomic_queue
{
public:
    using value_type = T;
private:
    struct node
    {
        value_type m_value;
        node* m_next;
        node* m_prev;

        node(const value_type& value) :
            m_value(value),
            m_next(nullptr),
            m_prev(nullptr) {}
    };
private:
    std::atomic<node*> m_head = nullptr;
    std::atomic<node*> m_tail = nullptr;
public:
    void push(const value_type& value)
    {
        auto new_node = new node(value);

        node* tmp = nullptr;
        if (m_tail.compare_exchange_strong(tmp, new_node))
        {
            m_head.store(new_node, std::memory_order_relaxed);
            return;
        }

        node* old_tail;
        do {
            old_tail = m_tail;
            new_node->m_prev = old_tail;
        } while (!m_tail.compare_exchange_strong(old_tail, new_node));
        new_node->m_prev->m_next = new_node;
    }

    void pop()
    {
        if (m_head.load(std::memory_order_relaxed) == nullptr)
        {
            return;
        }

        node* tmp = nullptr;
        node* head = m_head;
        if (m_tail.compare_exchange_strong(head, tmp))
        {
            m_head.store(tmp, std::memory_order_relaxed);
            return;
        }

        node* old_head;
        do {
            old_head = m_head;
        } while (m_head && !m_head.compare_exchange_strong(old_head, old_head->m_next));
        if (old_head)
        {
            delete old_head;
        }
    }

    bool empty()
    {
        return m_head.load(std::memory_order_relaxed) == nullptr;
    }

    value_type& front()
    {
        node* head = m_head.load(std::memory_order_acquire);
        return head->m_value;
    }
};

Something to note here is that I store m_prev on node so that I could update the m_next of m_tail after successful push without actually doing so via m_tail incase it was changed already by another thread. So even if another thread got to push a new value already, the current thread would still link what it saw as the m_tail's m_next to the new node.

Now there're a few things that are not really thread-safe as far as I can tell and which I can't really think of a good way to solve these problems:

Let's assume thread1 pops from the queue the one and only item then we go inside the following if statement:

        node* tmp = nullptr;
        node* head = m_head;
        if (m_tail.compare_exchange_strong(head, tmp))
        {
            // Now thread2 kicks in
            m_head.store(tmp, std::memory_order_relaxed);
            return;
        }

And let's assume thread2 kicks in at the marked spot to push a new value to the queue the the following statement will be executed:

        node* tmp = nullptr;
        if (m_tail.compare_exchange_strong(tmp, new_node))
        {
            m_head.store(new_node, std::memory_order_relaxed);
            return;
        }

and let us assume it finished it's pushing without thread1 continuing and only then thread1 continues, then thread1 will execute:

        m_head.store(tmp, std::memory_order_relaxed);
        return;

and will basically undo thread2's push by setting m_head to nullptr. As far as I can understand memory orders can't help me in this scenario so I'm not sure what my options are?

Another problematic scenario is that let's assume we have 2 reader threads thread3 and thread4 doing the same job:

    while (true)
    {
        if (!q.empty())
        {
            int v = q.front();
            q.pop();
            std::stringstream stream;
            stream << "thread_3/4: " << v << '\n';
            std::cout << stream.str();
        }
    }

And let us assume the queue is of size 1, so both of them could see that the queue is not empty and get a reference to the front data and then pop the element and print the same result.
It seems to me that locking would help in this scenario, but I do no wish to use locking and also I do not wish the reading threads to care about synchronization problems because the interface itself should be the one responsible, but since front and pop are independent I don't see a good way to handle this.
Also there's the problem that front might access nullptr, so even here I'm not sure how to handle this. I can make the interface return a raw pointer, or std::optional but both solutions seems not correct in my eyes so would love to hear opinions on what should be done here.

Also, I'm not sure if I could go away with cheaper methods other than CAS, I know I could go with unique slot approach where each thread get an index into a fixed array by using fetch_add on atomic of type std::atomic<int> slot and so each thread pushes to the queue to a unique index, but I don't like this approach since it makes the limitation of a fixed size queue. On the other hand using new and delete is probably not the fastest thing either, I could use a pool allocator of sort, but then I would have to make sure it's synchronized as-well and that's a new level of pain.

I'm not even sure these are all the problems, these are the problems that I could spot with my implementation, I'm sure I didn't think of everything (or maybe I did?), anyway would love to hear yours thoughts on the described problems and maybe ways to overcome them.

Aucun commentaire:

Enregistrer un commentaire