vendredi 26 août 2016

Is this valid logic for a lock free queue based on lists and iterators (c++11)

Can anyone tell me if there is a flaw in the logic of my lock free queue code below? Specifically, I'm concerned that having one thread modify a std::list while other threads are simultaneously manipulating iterators to that list may not be OK. My application based around this method appears to be working fine, but the more I think about it the more I'm worried that I've misunderstood the relation between std::list and iterators and that there's a problem hiding.

Instead of explaining the code here, I tried to write it to be very easy to read. Generally only the producer thread modifies any of the std::lists, and the various consumer threads only work with iterators. Shared pointers are used to automate memory management. The entire example code is below, it should compile and run as-is on Visual Studio 2015.

    // shared_pointers.cpp : Defines the entry point for the console application.
//

/* This process based on the lock free queues article found here: http://ift.tt/2bmRW9z
*  The goal is to have one thread generate data, and mulitple threads consume the data, without requiring the overhead of locks.  
*  This design is based on the premise that list iterators are guaranteed to be valid, even after the list has been modified. 
*  The parent thread is the ONLY thread to modify the list, the child threads all work with iterators.  
*
*
* The queue: 
* [BeeeeeeeeeeeeeeeHwwwwwwwwwwwwwwwwwwwwT]
*  ^     ^^^^      ^       ^^^          ^ 
*  |       |       |        |           |     
* Begin    |      Head      |          Tail  
*        Erase            Waiting
*                                          
*   Begin - The start of the queue, this is a dummy element that won't be read
*   Erase - These elements have already been read and are waiting to be erased
*   Head - This is the frist unread element in the queue
*   Waiting - These elements are waiting to be read
*   Tail - This is the end of the list, same as vector::end()
*
* We will combine lists, iterators and shared pointers to make a lock free single-producer multiple-consumer queue.  
* The main thread (producer thread) will allocate buffers and keep a list of shared pointers to the buffers.  
* Every worker (consumer) thread will get list of the same shared pointers.  As each consumer thread consumes the buffer, the 
* buffer references will be removed from that threads list. Periodically the main thread will check the number of references
* to each shared pointer, and when it's down to 1 then all consumer threads are finished with the pointer and it will be released, 
* freeing the buffer memory.  The goal is to eliminate the overhead of locks and memory management.  
* 
*/



#include "stdafx.h"
#include <list>
#include <memory>
#include <thread>
#include <vector>
#include <windows.h>
#include <algorithm>
#include <iostream>

typedef std::shared_ptr<BYTE> SharedBytePtr;
typedef std::list<SharedBytePtr> SharedBytePtrList;

#define BUFFER_SIZE_BYTES (1024 * 1024 * 24)  /* 24MB */
#define MAX_NUMBER_BUFFERS 20
#define NUMBER_CONSUMER_THREADS 50
#define NUMBER_DATA_READS 10000               /* Total number of buffers to "fill with data" by the producer thread, to be consumed by the worker threads */



class DataConsumer
{
  public:
    void StartInfiniteLoop();
    void ConsumeData(SharedBytePtr &pData);
    bool bExitNow;  // Set true to cause the thread to exit

    // Lock free queue stuff.  Iterators should always be valid, but only 1 thread can edit the list! 
    SharedBytePtrList m_lMyBuffers;
    SharedBytePtrList::iterator m_iHead, m_iTail;

    DataConsumer() 
    { 
      bExitNow = false; 
      // Add an empty item to the list, we will always think this has already been read
      std::shared_ptr<BYTE> pDummy;
      m_lMyBuffers.push_back(pDummy);
      m_iHead = m_lMyBuffers.begin();   // Dummy element, just a place holder
      m_iTail = m_lMyBuffers.end();     // Always vector::end()
    } // ctor()
};

/* Sit in this loop, consuming data buffers as they become available */
void DataConsumer::StartInfiniteLoop()
{
  while (true)
  {
    if (bExitNow)
    {
      return;
    }

    // Do we have a buffer ready to be consumed?  Remember buffer 0 is a dummy, so check or size > 1
    if (m_lMyBuffers.size() > 1)
    {
      // Access the list elements using ONLY iterators!  Only the main producer thread can access the list directly. 
      // Iterators are guaranteed by definition to be valid, even if the list is modified.
      SharedBytePtrList::iterator iNextBuf = m_iHead;
      ++iNextBuf; // m_iHead is a dummy entry or already read
      // Loop through all available buffers and consume them
      while (iNextBuf != m_iTail)
      {
        m_iHead = iNextBuf;
        ConsumeData(*m_iHead);
        iNextBuf++;
      }
    }
    else
    {
      // Nothing to do, unload the CPU until more data is ready
      Sleep(1);
    }
  } // while
} // ::StartInfiniteLoop


void DataConsumer::ConsumeData(SharedBytePtr &pData)
{
  // Do stuff....
  Sleep(10);  // Pretend that we're doing something with the data.. 
  return;
}


class WorkerThreadClass
{
public:
  std::thread theThread;        // Reference to the actual thread
  DataConsumer* dataConsumer;   // Reference to the DataConsumer object where "theThread" will run.. 
};


int main()
{
  // Make a local list to store a reference to every buffer allocated in the main (producer) thread
  std::list<std::shared_ptr<BYTE>> lBuffers;

  // Start up the consumer threads
  std::vector<WorkerThreadClass*> vWorkerThreads;

  for (int t = 0; t < NUMBER_CONSUMER_THREADS; t++)
  {
    DataConsumer *pDataConsumer = new DataConsumer;
    WorkerThreadClass *pWorkerThread = new WorkerThreadClass();

    // Startup the thread 
    pWorkerThread->theThread = std::thread(&DataConsumer::StartInfiniteLoop, pDataConsumer);
    pWorkerThread->dataConsumer = pDataConsumer;

    // Add our new worker thread to our list 
    vWorkerThreads.push_back(pWorkerThread);
  } // for()


  // We are the main (producer) thread now.  Simulate 10,000 data reads. Each read will go into a buffer, and 
  // a reference to that buffer will be stored both on the main (producer) thread buffer list, and on the buffer list
  // of every worker (consumer) thread for processing. 
  int iBuffersToRead = NUMBER_DATA_READS;
  while (iBuffersToRead > 0)
  {
    // Check the buffer reference list on each worker (consumer) thread, and erase every entry that falls in the "erase" area, between the "begin" and "head" elements. These buffers have already 
    // been consumed by this thread. Modify these lists ONLY from the main (procuder) thread, NOT from the worker threads themselves! 
    // [BeeeeeeeeeeeeeeeHwwwwwwwwwwwwwwwwwwwwT]
    std::for_each(vWorkerThreads.begin(), vWorkerThreads.end(), [&](WorkerThreadClass* thisThread)
    {
      thisThread->dataConsumer->m_lMyBuffers.erase(thisThread->dataConsumer->m_lMyBuffers.begin(), thisThread->dataConsumer->m_iHead); // clean up unused entries
    });

    // Have we already allocated our limit of buffers? 
    if (lBuffers.size() < MAX_NUMBER_BUFFERS)
    {
      iBuffersToRead--; 
      std::cout << "Buffer read number: " << NUMBER_DATA_READS - iBuffersToRead << std::endl;
      // Create a new buffer
      SharedBytePtr pBuf = (SharedBytePtr)new BYTE[BUFFER_SIZE_BYTES];

      // Fill the buffer with data here
      // ReadSomeDataIntoBuffer(pBuf);.... 

      // Add a reference to this buffer to the main (producer) thread list of all buffers
      lBuffers.push_back(pBuf);

      // Now add a reference to this buffer to the list of buffers on every worker (consumer) thread.  This will increase the shared_ptr ref count beyond "1", we will not release this
      // buffer until the ref count is again down to "1", which means that the main (producer) thread owns the only reference to it, and all worker (consumer) threads are finished with it. 
      std::cout << "Adding buffer to threads, total buffers now: " << lBuffers.size() << std::endl;
      std::for_each(vWorkerThreads.begin(), vWorkerThreads.end(), [&](WorkerThreadClass* thisThread) 
                                                                                                    { 
                                                                                                      thisThread->dataConsumer->m_lMyBuffers.push_back(pBuf);  // Push back new buffer
                                                                                                      thisThread->dataConsumer->m_iTail = thisThread->dataConsumer->m_lMyBuffers.end(); // Update tail
                                                                                                      thisThread->dataConsumer->m_lMyBuffers.erase(thisThread->dataConsumer->m_lMyBuffers.begin(), thisThread->dataConsumer->m_iHead); // clean up old entries while we're here
                                                                                                     });

    } // if()
    else
    {
      // We've hit our limit on buffers and cannot create new ones until old ones are freed.  
      // Check all references on the main (producer) list of buffer references.  Any that are "unique()" have only 1 reference, meaning all worker (consumer) threads 
      // are finished with it.  In this case, release our reference and the memory will be automatically freed.  
      SharedBytePtrList::iterator iNextBuf = lBuffers.begin();
      while (iNextBuf != lBuffers.end())
      {
        if (iNextBuf->unique())
        {
          // We (main thread) hold the only reference, remove the reference and the memory referenced by the shared_ptr will be freed
          *iNextBuf = nullptr;
          // Remove the entry from the list
          lBuffers.remove(*iNextBuf);
          // List is now invalid, reset iterator and loop
          iNextBuf = lBuffers.begin();
          std::cout << "Released buffer, number buffers now: " << lBuffers.size() << "\n";
          continue;
        } // if()
        else
        {
          // std::cout << "Buffer still in-use by some worker (consumer) threads, cannot release it yet!  Buffer list size: " << lBuffers.size() << "\n";
        }
        iNextBuf++;
      } //  while()
    } // else
  } // for()


  // All of our work is finished, time to clean up

  // Tell all worker (consumer) threads to exit
  std::for_each(vWorkerThreads.begin(), vWorkerThreads.end(), [&](WorkerThreadClass* thisThread) { thisThread->dataConsumer->bExitNow = true; });

  // As each thread exits, clean up it's object
  std::vector<WorkerThreadClass*>::iterator iNext = vWorkerThreads.begin();
  while (iNext != vWorkerThreads.end())
  {
    // Wait for the thread to exit
    (*iNext)->theThread.join();
    // Clean up the associated object
    delete (*iNext)->dataConsumer;
    iNext++;
  }

  // Clean up the final reference to our buffers so their memory will be freed
  std::for_each(lBuffers.begin(), lBuffers.end(), [&](std::shared_ptr<BYTE> pBuf) { pBuf = nullptr; });

  return 0;
}

Aucun commentaire:

Enregistrer un commentaire