mercredi 7 novembre 2018

lock free stack: correct use of memory order

The below class describes a lock free stack of uint32_t sequential values, taken from here. For instance, LockFreeIndexStack stack(5); declares a stack containing the numbers {0, 1, 2, 3, 5}.

The push method is implemented with one atomic load and a CAS loop; the pop with two atomic load and a CAS loop. What is the correct memory order semantic I should used in the atomic operations (I wrote my guess commented out). Thanks

struct LockFreeIndexStack
{
    typedef uint64_t bundle_t;
    typedef uint32_t index_t;

private:
    static const index_t s_null = ~index_t(0);

    typedef std::atomic<bundle_t> atomic_bundle_t;

    union Bundle {
        Bundle(index_t index, index_t count)
        {
            m_value.m_index = index;
            m_value.m_count = count;
        }

        Bundle(bundle_t bundle)
        {
            m_bundle = bundle;
        }

        struct {
            index_t m_index;
            index_t m_count;
        } m_value;

        bundle_t m_bundle;
    };

public:
    LockFreeIndexStack(index_t n)
        : m_top(Bundle(0, 0).m_bundle)
        , m_next(n, s_null)
    {
        for (index_t i = 1; i < n; ++i)
            m_next[i - 1] = i;
    }

    index_t pop()
    {
        Bundle curtop(m_top.load());  // memory_order_acquire?
        while(true) {
            index_t candidate = curtop.m_value.m_index;

            if (candidate != s_null) {  // stack is not empty?
                index_t next = m_next[candidate];
                Bundle newtop(next, curtop.m_value.m_count + 1);
                // In the very remote eventuality that, between reading 'm_top' and
                // the next line other threads cause all the below circumstances occur simultaneously:
                // - other threads execute exactly a multiple of 2^32 pop or push operations,
                //   so that 'm_count' assumes again the original value;
                // - the value read as 'candidate' 2^32 transactions ago is again top of the stack;
                // - the value 'm_next[candidate]' is no longer what it was 2^32 transactions ago
                // then the stack will get corrupted
                if (m_top.compare_exchange_weak(curtop.m_bundle, newtop.m_bundle)) {
                    return candidate;
                }
            }
            else {
                // stack was empty, no point in spinning
                std::this_thread::yield();
                curtop = m_top.load();  // memory_order_acquire?
            }
        }
    }

    void push(index_t index)
    {
        Bundle curtop(m_top.load());    // memory_order_relaxed?
        while (true) {
            index_t current = curtop.m_value.m_index;
            m_next[index] = current;
            Bundle newtop = Bundle(index, curtop.m_value.m_count + 1);
            if (m_top.compare_exchange_weak(curtop.m_bundle, newtop.m_bundle)) {
                return;
            }
        };
    }

private:
    atomic_bundle_t m_top;
    std::vector<index_t> m_next;
};

Aucun commentaire:

Enregistrer un commentaire