I wish to create a lockless queue using std::atomic
.
Here's my probably not so good first attempt at trying to do so:
template <typename T>
class atomic_queue
{
public:
using value_type = T;
private:
struct node
{
value_type m_value;
node* m_next;
node* m_prev;
node(const value_type& value) :
m_value(value),
m_next(nullptr),
m_prev(nullptr) {}
};
private:
std::atomic<node*> m_head = nullptr;
std::atomic<node*> m_tail = nullptr;
public:
void push(const value_type& value)
{
auto new_node = new node(value);
node* tmp = nullptr;
if (m_tail.compare_exchange_strong(tmp, new_node))
{
m_head.store(new_node, std::memory_order_relaxed);
return;
}
node* old_tail;
do {
old_tail = m_tail;
new_node->m_prev = old_tail;
} while (!m_tail.compare_exchange_strong(old_tail, new_node));
new_node->m_prev->m_next = new_node;
}
void pop()
{
if (m_head.load(std::memory_order_relaxed) == nullptr)
{
return;
}
node* tmp = nullptr;
node* head = m_head;
if (m_tail.compare_exchange_strong(head, tmp))
{
m_head.store(tmp, std::memory_order_relaxed);
return;
}
node* old_head;
do {
old_head = m_head;
} while (m_head && !m_head.compare_exchange_strong(old_head, old_head->m_next));
if (old_head)
{
delete old_head;
}
}
bool empty()
{
return m_head.load(std::memory_order_relaxed) == nullptr;
}
value_type& front()
{
node* head = m_head.load(std::memory_order_acquire);
return head->m_value;
}
};
Something to note here is that I store m_prev
on node
so that I could update the m_next
of m_tail
after successful push
without actually doing so via m_tail
incase it was changed already by another thread. So even if another thread got to push
a new value already, the current thread would still link what it saw as the m_tail
's m_next
to the new node.
Now there're a few things that are not really thread-safe as far as I can tell and which I can't really think of a good way to solve these problems:
Let's assume thread1
pop
s from the queue the one and only item then we go inside the following if statement:
node* tmp = nullptr;
node* head = m_head;
if (m_tail.compare_exchange_strong(head, tmp))
{
// Now thread2 kicks in
m_head.store(tmp, std::memory_order_relaxed);
return;
}
And let's assume thread2
kicks in at the marked spot to push
a new value to the queue the the following statement will be executed:
node* tmp = nullptr;
if (m_tail.compare_exchange_strong(tmp, new_node))
{
m_head.store(new_node, std::memory_order_relaxed);
return;
}
and let us assume it finished it's push
ing without thread1
continuing and only then thread1
continues, then thread1
will execute:
m_head.store(tmp, std::memory_order_relaxed);
return;
and will basically undo thread2
's push
by setting m_head
to nullptr
. As far as I can understand memory orders can't help me in this scenario so I'm not sure what my options are?
Another problematic scenario is that let's assume we have 2 reader threads thread3
and thread4
doing the same job:
while (true)
{
if (!q.empty())
{
int v = q.front();
q.pop();
std::stringstream stream;
stream << "thread_3/4: " << v << '\n';
std::cout << stream.str();
}
}
And let us assume the queue is of size 1, so both of them could see that the queue is not empty and get a reference to the front data and then pop the element and print the same result.
It seems to me that locking would help in this scenario, but I do no wish to use locking and also I do not wish the reading threads to care about synchronization problems because the interface itself should be the one responsible, but since front
and pop
are independent I don't see a good way to handle this.
Also there's the problem that front
might access nullptr
, so even here I'm not sure how to handle this. I can make the interface return a raw pointer, or std::optional
but both solutions seems not correct in my eyes so would love to hear opinions on what should be done here.
Also, I'm not sure if I could go away with cheaper methods other than CAS, I know I could go with unique slot approach where each thread get an index into a fixed array by using fetch_add
on atomic of type std::atomic<int> slot
and so each thread pushes to the queue to a unique index, but I don't like this approach since it makes the limitation of a fixed size queue. On the other hand using new
and delete
is probably not the fastest thing either, I could use a pool allocator of sort, but then I would have to make sure it's synchronized as-well and that's a new level of pain.
I'm not even sure these are all the problems, these are the problems that I could spot with my implementation, I'm sure I didn't think of everything (or maybe I did?), anyway would love to hear yours thoughts on the described problems and maybe ways to overcome them.
Aucun commentaire:
Enregistrer un commentaire