vendredi 1 juillet 2016

Is std::atomic_thread_fence probably misunderstood

I'm making a sort of exercise on C++ atomics.

I have written the following code in which I start 2 reader threads which read some structured data based on ready counter which is incremented by the main thread by 2 each time it updates the data. Each reader thread once flag is non-zero reads the data and decrements the counter by 1. The code

#include <atomic>
#include <thread>
#include <iostream>
#include <functional>
#include <chrono>

struct Foo
{
    int a;
    int b;
    int c;
};

int main()
{
    unsigned long writeCnt { 0 };
    std::atomic<int> dataReady {0};

    //Shared data
    Foo data {0, 0, 0};

    //Readers accumulate data here
    Foo dataBuf[2][1000];

    //Reader threads  
    std::thread threads[2];

    //A pointer to a buffer assigned to a reader
    Foo (*curBuf)[1000] = dataBuf;

    //Thread's ID
    int threadNum  { 0 };

    //Starting readers
    for (auto & item : threads)
    {
        item = std::thread(std::bind([&dataReady,&data](Foo * buf, const int threadNum)
        {
            unsigned long readCnt { 0 };
            while (readCnt < 1000)
            {
                auto ready = dataReady.load(std::memory_order_relaxed);
                std::cout << "Thread #" << threadNum << " ready " << ready << " read cnt " << readCnt <<std::endl;
                if(ready > 0)
                {
                    std::atomic_thread_fence(std::memory_order_acquire);
                    buf[readCnt].a = data.a;
                    buf[readCnt].b = data.b;
                    buf[readCnt].c = data.c;
                    std::atomic_thread_fence(std::memory_order_release);
                    dataReady.store(ready - 1, std::memory_order_relaxed);
                    ++readCnt;
                }
                else
                {
                    continue;
                }
            }
        },*(curBuf++),threadNum++));
    }

    //Writer loop
    while (writeCnt < 1000)
    {
        auto ready = dataReady.load(std::memory_order_relaxed);
        std::cout << "Main thread ready " << ready <<  " write cnt " << writeCnt << std::endl;
        if(ready == 0)
        {
            std::atomic_thread_fence(std::memory_order_acquire);
            data.a = writeCnt;
            data.b = writeCnt;
            data.c = writeCnt;
            std::atomic_thread_fence(std::memory_order_release);
            dataReady.store(2, std::memory_order_relaxed);
            ++writeCnt;
        }
        else
        {
            continue;
        }
    }

    //Joining readers
    for(auto & item : threads)
    {
        if(item.joinable())
        {
            item.join();
        }
    }

    //Print accumulated data
    for(const auto & buffer : dataBuf)
    {
        for(const auto & item : buffer)
        {
            std::cout << item.a << " " << item.b << " " << item.c << std::endl;
        }
    }


}

For now it doesn't matter that possibly just one thread double-reads the updated data. I'm worried about writeCnt stops at about 730 or something while readers finish their reads. Is this misbehaviour caused by memory reorderings and why since thread fences are added? What am I missing here?

Aucun commentaire:

Enregistrer un commentaire