lundi 22 mai 2017

C++ blocking queue with Boost

I've come up with the following blocking queue implementation, with std::vector as the container for the elements stored in the queue and using Boost for threading/synchronization. I also referred to a similar post here.

template<typename T>
class BlockingQueue
{
public:
  explicit BlockingQueue(const std::vector<T>& buf): 
    buffer(buf)
  {}
  explicit BlockingQueue(): buffer()
  {}
  void push(const T& elem);
  T pop();
  ~BlockingQueue()
  {}

private:
  boost::mutex mutex;                             // mutex variable
  boost::condition_variable_any notEmptyCond;     // condition variable, to check whether the queue is empty
  std::vector<T> buffer;
};

template<typename T>
void BlockingQueue<T>::push(const T& elem) 
{
  boost::mutex::scoped_lock lock(mutex);
  buffer.push_back(elem);
  notEmptyCond.notify_one();                      // notifies one of the waiting threads which are blocked on the queue  
  // assert(!buffer.empty());
}

template<typename T>
T BlockingQueue<T>::pop()
{
  boost::mutex::scoped_lock lock(mutex);
  notEmptyCond.wait(lock,[&](){ return (buffer.size() > 0); });   // waits for the queue to get filled and for a notification, to resume consuming
  T elem = buffer.front();
  buffer.erase(buffer.begin());
  return elem;
}

I've two threads (producer/consumer), one reading strings from a file and populating them into the BlockingQueue, the other to remove the strings from the BlockingQueue and print them. Both of these are initialized from a class whose definition is given below.

class FileProcessor
{
public:
  explicit FileProcessor():bqueue(),inFile("random.txt")
  {
    rt = boost::thread(boost::bind(&FileVerifier::read, this));
    pt1 = boost::thread(boost::bind(&FileVerifier::process, this));
  }

  volatile ~FileProcessor()
  {
    rt.interrupt(); 
    pt1.interrupt(); 
    rt.join(); 
    pt1.join(); 
  }

  /* Read strings from a file, populate them in the blocking-queue */
  void read()
  {
    std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
    boost::iostreams::filtering_istream in;
    if (file.fail()) {
      std::cout << "couldn't open the input file.. please check its name and read permissions\n";
      return;
    }
    try {
      in.push(file);                      // gets the whole file content into an input stream buffer
      for(std::string inputStr; std::getline(in,inputStr);) 
      {
        bqueue.push(inputStr);
        std::cout << "inserted " << inputStr << "\n";
      }
    }
    catch(std::exception& e) {
      std::cout << "exception occurred while reading file\n" << e.what() << "\n";
    }
  }

  /* Process the elements (dequeue and print) */
  void process()
  {
    while (true)
    {
      std::string rstr = bqueue.pop();
      std::cout << "consumed " << rstr << "\n";
    }
  }

private:
  boost::mutex mutex;
  boost::thread rt;
  boost::thread pt1;
  BlockingQueue<std::string> bqueue;
  std::string inFile;     // input file name from where the strings are read
};

I observe the following output (only a snapshot included):

Run 1:

inserted AZ
inserted yezjAdCeV
inserted icKU
inserted q
inserted b
inserted DRQL
inserted aaOj
inserted CqlNRv
inserted e
inserted XuDemby
inserted rE
inserted YPk
inserted dLd
inserted xb
inserted bSrZdf
inserted sCQiRna
...

Run 4:

consumed jfRnjSxrw
inserted INdmXSCr
consumed oIDlu
inserted FfXdARGu
consumed tAO
inserted mBq
consumed I
inserted aoXNhP
consumed OOAf
inserted Qoi
consumed wCxJXGWJu
inserted WZGYHluTV
consumed oIFOh
inserted kkIoFF
consumed ecAYyjHh
inserted C
consumed KdrBIixw
inserted Ldeyjtxe
...

My problem : The consumer thread is sometimes given control over the queue's resource (able to dequeue and print) and sometimes it is not. I'm not sure why this happens. Any hints on the queue's design-flaws would be greatly appreciated. Thanks!

Observations:

  1. When the threads aren't initialized from the (FileProcessor) class' ctor, they behave as expected i.e. they access the BlockingQueue resource and do their read/write operations. Please refer to the snippets below for the changes made to have this behavior.

  2. The producer-consumer threads don't take alternative turns, as @n.m noted the producer doesn't yield to the consumer explicitly. Following the above observation, their respective outputs were something like the one given below

    inserted DZxcOw
    consumed inserted DZxcOw
    consumed robECjOp
    robECjOp
    inserted BaILFsVaA
    inserted HomURR
    inserted PVjLPb
    consumed BaILFsVaA
    consumed HomURR
    consumed PVjLPb
    inserted SHdBVSEyU
    consumed SHdBVSEyU
    consumed JaEH
    inserted JaEH
    inserted g
    inserted MwEgOVB
    inserted qlohoszv
    consumed g
    consumed MwEgOVB
    consumed qlohoszv
    consumed AsQgq
    inserted AsQgq
    inserted tbm
    inserted iriADeEL
    inserted Zoxs
    consumed tbm
    
    

Initializing from outside a class ctor.

#include <iostream>
#include <threading/file_processor.h>  //has the FileProcessor class declaration

int main()
{
  FileProcessor fp;  //previously, I had only this statement which called the class constructor, from where the threads were initialized.
  boost::thread rt(boost::bind(&FileProcessor::read, &fp));
  boost::thread pt1(boost::bind(&FileProcessor::process, &fp));
  rt.join();
  pt1.join();
  return 0;
}

Modified FileProcessor class (removed the thread-initialization from its ctor)

#include <boost/iostreams/filtering_stream.hpp>
#include <threading/blocking_queue.h>  //has the BlockingQueue class

using namespace boost::iostreams;

   class FileProcessor
   {
    public:
      explicit FileProcessor():bqueue(),inFile("random.txt")
      {}

  ~FileProcessor()
  {}

  void read()
  {
    std::ifstream file(inFile, std::ios_base::in | std::ios_base::binary);
    filtering_istream in;
    if (file.fail()) {
      std::cout << "couldn't open the input file.. please check its name and read permissions\n";
      return;
    }
    try {
      in.push(file);
      for(std::string inputStr; std::getline(in,inputStr);) 
      {
        bqueue.push(inputStr);
        std::cout << "inserted " << inputStr << "\n";
      }
    }
    catch(std::exception& e) {
      std::cout << "exception occurred while reading file\n" << e.what() << "\n";
    }
  }

  void process()
  {
    while (true)
    {
      std::string rstr = bqueue.pop();
      std::cout << "consumed " << rstr << "\n";
    }
  }

private:
  BlockingQueue<std::string> bqueue;
  std::string inFile;     // input file name from where the strings are read
   };

Aucun commentaire:

Enregistrer un commentaire