mardi 28 avril 2015

Readers / writer implementation using std::atomic (mutex free)

Below is an attempt at a multiple reader / writer shared data which uses std::atomics and busy waits instead of mutex and condition variables to synchronize between readers and writers. I am puzzled as to why the asserts in there are being hit. I'm sure there's a bug somewhere in the logic, but I'm not certain where it is.

The idea behind the implementation is that threads that read are spinning until the writer is done writing. As they enter the read function they increase m_numReaders count and as they are waiting for the writer they increase the m_numWaiting count.

The idea is that the m_numWaiting should then always be smaller or equal to m_numReaders, provided m_numWaiting is always incremented after m_numReaders and decremented before m_numReaders.

There shouldn't be a case where m_numWaiting is bigger than m_numReaders (or I'm not seeing it) since a reader always increments the reader counter first and only sometimes increments the waiting counter and the waiting counter is always decremented first.

Yet, this seems to be whats happening because the asserts are being hit. Can someone point out the logic error, if you see it?

Thanks!

#include <iostream>
#include <thread> 
#include <assert.h>

template<typename T>
class ReadWrite
{

public:

    ReadWrite() : m_numReaders(0), m_numWaiting(0), m_writing(false)
    {
        m_writeFlag.clear();
    }

    template<typename functor>
    void read(functor& readFunc)
    {
        m_numReaders++;
        std::atomic<bool>waiting(false);
        while (m_writing)
        {
            if(!waiting)
            {
                m_numWaiting++; // m_numWaiting should always be increased after m_numReaders
                waiting = true;
            }
        }

        assert(m_numWaiting <= m_numReaders);

        readFunc(&m_data);

        assert(m_numWaiting <= m_numReaders); // <-- These asserts get hit ?

        if(waiting)
        {
            m_numWaiting--; // m_numWaiting should always be decreased before m_numReaders
        }

        m_numReaders--;

        assert(m_numWaiting <= m_numReaders); // <-- These asserts get hit ?
    }

    //
    // Only a single writer can operate on this at any given time.
    //
    template<typename functor>
    void write(functor& writeFunc)
    {
        while (m_writeFlag.test_and_set());

        // Ensure no readers present
        while (m_numReaders);

        // At this point m_numReaders may have been increased !
        m_writing = true;

        // If a reader entered before the writing flag was set, wait for
        // it to finish
        while (m_numReaders > m_numWaiting);

        writeFunc(&m_data);

        m_writeFlag.clear();
        m_writing = false;
    }
private:
    T m_data;
    std::atomic<int64_t> m_numReaders;
    std::atomic<int64_t> m_numWaiting;
    std::atomic<bool> m_writing;
    std::atomic_flag m_writeFlag;
};

int main(int argc, const char * argv[])
{
    const size_t numReaders = 2;
    const size_t numWriters = 1;
    const size_t numReadWrites = 10000000;

    std::thread readThreads[numReaders];
    std::thread writeThreads[numWriters];

    ReadWrite<int> dummyData;

    auto writeFunc = [&](int* pData)    { return; }; // intentionally empty
    auto readFunc = [&](int* pData)     { return; }; // intentionally empty

    auto readThreadProc = [&]()
    {
        size_t numReads = numReadWrites;
        while (numReads--)
        {
            dummyData.read(readFunc);
        }
    };

    auto writeThreadProc = [&]()
    {
        size_t numWrites = numReadWrites;
        while (numWrites--)
        {
            dummyData.write(writeFunc);
        }
    };

    for (std::thread& thread : writeThreads)    { thread = std::thread(writeThreadProc);}
    for (std::thread& thread : readThreads)     { thread = std::thread(readThreadProc);}
    for (std::thread& thread : writeThreads)    { thread.join();}
    for (std::thread& thread : readThreads)     { thread.join();}
}

Aucun commentaire:

Enregistrer un commentaire