dimanche 28 avril 2019

Running consumer threads sequentially

I have a queue which stores words read from a text file. I'm using producer-consumer thread model to achieve this.

thread one - producer thread, read words from text file and push it to queue.
thread two & three - consumer threads, pop word from queue.

Now i wants to run the consumer threads sequentially one after other.

expected output:

$ ./a.out sample.txt
word one thread one
word two thread two
word three thread one
word four thread two
.
.
.

As of now, I have tried with std::condition_variable with Predicate, but i'm getting different output for each time.
I'm new to multi-threading and knows that this is not the right way to implement multi-threading, but i wants to achieve this for learning purpose. Please find my code below

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <string>
#include <fstream>

// for reading file
std::fstream file;
std::string word;

// used as predicate in condition variables 
int thread_number{0};

template <typename T>
class Queue
{
  public:
  T pop_one()
  {
   std::unique_lock<std::mutex> mlock(mutex_);
   cond_.wait(mlock, [] {return thread_number == 0;});
   while (queue_.empty())
   {
    cond_.wait(mlock, [] {return thread_number == 0;});
   } 
   auto item = queue_.front();
   queue_.pop();
   thread_number = 1;
   return item;
  }

  T pop_two()
  {
   std::unique_lock<std::mutex> mlock(mutex_);
   cond_.wait(mlock, [] {return thread_number == 1;});
   while (queue_.empty())
   {
    cond_.wait(mlock, [] {return thread_number == 1;});
   }
   auto item = queue_.front();
   queue_.pop();
   thread_number = 0;
   return item;
  }

  void push(const T& item)
  {
   std::unique_lock<std::mutex> mlock(mutex_);
   queue_.push(item);
   mlock.unlock();
   cond_.notify_one();
  }

  bool is_empty()
  {
   return queue_.empty();
  }

  private:
   std::queue<T> queue_;
   std::mutex mutex_;
   std::condition_variable cond_;
};

// creating queue object in global scope  
Queue<std::string> q;

// read and push words to queue 
void write_()
{
 while(file >> word)
 {
  q.push(word);
 }
}

// pop word from queue - consumer thread one
void read_one()
{
 while(!q.is_empty())
 {
  std::cout <<q.pop_one() << " thread 1" << std::endl;
 }
}

// pop word from queue - consumer thread two
void read_two()
{
 while(!q.is_empty())
 {
  std::cout <<q.pop_two() << " thread 2" << std::endl;
 }
}

int main(int argc, char const *argv[])
{
 file.open(argv[1]);
 std::thread wt(write_);
 std::thread rt1(read_one);
 std::thread rt2(read_two);
 wt.join();
 rt1.join();
 rt2.join();
 return 0;
}

Aucun commentaire:

Enregistrer un commentaire