mercredi 28 octobre 2015

Issues with std::condition_variable wait?

I have a straight forward producer and consumer below running with gcc 4.7.3 on Ubuntu. It is crashing and I don't understand why from the stack. Also the crash is intermittent as it doesn't always happen when being run in the debugger but does from command line. It is saying it is in the condition variable wait but not sure why. See code, stack and program output below.

Compilation line: g++ -I/usr/include -I/usr/local/include -std=c++11 -pthread -ggdb -g3 -Wall ./main.o -L/usr/lib -L/usr/local/lib -o ProducerConsumer

#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <cassert>
#include <atomic>
#include <thread>
#include <chrono>
#include <sstream>

template<typename T>
struct DataQueue { 

  DataQueue() : m_queue(), m_mutex(), m_condition() {}
  ~DataQueue() = default;

  void push(const T& aData) {
    std::lock_guard<std::mutex> lk(m_mutex);
    m_queue.push(aData);
    std::ostringstream os; 
    os << " Pushed: " << aData << " TO the Queue " << std::endl;
    std::cout << os.str() << std::endl;
    m_condition.notify_one();
  }

  bool try_pop(T& aData) { 
    std::lock_guard<std::mutex> lk(m_mutex);
    if(m_queue.empty())
      return false;
    aData = m_queue.front();
    m_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop() { 
    std::lock_guard<std::mutex> lk(m_mutex);
    if(m_queue.empty())
      return std::shared_ptr<T>();
    auto sptr(std::make_shared<T>(m_queue.front()));
    m_queue.pop();
    return sptr;
  }

  void pop_wait(T& aData) { 
    std::unique_lock<std::mutex> lk; 
    m_condition.wait(lk, [this] { return !m_queue.empty(); }); 
    assert(!m_queue.empty());
    aData = m_queue.front();
    m_queue.pop();
  }

  std::shared_ptr<T> pop_wait() { 
    std::unique_lock<std::mutex> lk; 
    m_condition.wait(lk,[this] { return !m_queue.empty(); }); 
    auto sptr(std::make_shared<T>(m_queue.front()));
    std::ostringstream os; 
    os << " Popped: " << *sptr << " FROM the Queue " << std::endl;
    std::cout << os.str() << std::endl;
    assert(!m_queue.empty());
    m_queue.pop();
    return sptr;
  }

  private:
  std::queue<T> m_queue;
  std::mutex m_mutex;
  std::condition_variable m_condition;
};

template <typename T>
struct Producer {

  Producer(DataQueue<T> &aQueue) : m_queue(aQueue), m_shutdown(true), m_counter(0), m_thread() {}

  void start() { 
    m_thread = std::thread(&Producer::run, this);
  }

  void run() {
    m_shutdown.store(false,std::memory_order_release);
    while(!m_shutdown.load(std::memory_order_acquire)) 
      m_queue.push(++m_counter);
  }

  void stop() { 
    m_shutdown.store(true,std::memory_order_release);
    if(m_thread.joinable())
      m_thread.join();
  }

  private:
  DataQueue<T> &m_queue;
  std::atomic<bool> m_shutdown;
  uint64_t m_counter;
  std::thread m_thread;
};

template <typename T>
struct Consumer {

  Consumer(DataQueue<T> &aQueue) : m_queue(aQueue), m_shutdown(true), m_data(), m_thread() {}

  void start() { 
    m_thread = std::thread(&Consumer::run, this);
  }

  void run() {
    m_shutdown.store(false,std::memory_order_release);
    while(!m_shutdown.load(std::memory_order_acquire)) 
      m_data.push_back(*(m_queue.pop_wait()));
  }

  void stop() { 
    m_shutdown.store(true,std::memory_order_release);
    if(m_thread.joinable())
      m_thread.join();
    std::cout << " Consumer read : " << m_data.size() << " Items " << std::endl;
  }

  private:
  DataQueue<T> &m_queue;
  std::atomic<bool> m_shutdown;
  std::vector<T> m_data;
  std::thread m_thread;
};

int main() { 

  typedef uint64_t  dataType;
  DataQueue<dataType> dq; 
  Consumer<dataType> c(dq);
  Producer<dataType> p(dq);
  p.start();
  c.start();

  std::this_thread::sleep_for(std::chrono::seconds(2));

  p.stop();
  c.stop();
}

Stack at the point of the crash

Thread 3 (Thread 0x7ffff67d3700 (LWP 9803)):
#0  0x00007ffff76ae053 in __pthread_mutex_unlock_usercnt ()
   from /lib/x86_64-linux-gnu/libpthread.so.0
#1  0x00007ffff76aec17 in pthread_cond_wait@@GLIBC_2.3.2 ()
   from /lib/x86_64-linux-gnu/libpthread.so.0
#2  0x00007ffff7b846fc in std::condition_variable::wait(std::unique_lock<std::mutex>&) () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#3  0x0000000000403dd5 in std::condition_variable::wait<DataQueue<unsigned long>::pop_wait()::{lambda()#1}>(std::unique_lock<std::mutex>&, DataQueue<unsigned long>::pop_wait()::{lambda()#1}) (this=0x7fffffffdf68, __lock=..., __p=...)
    at /usr/include/c++/4.7/condition_variable:93
#4  0x0000000000403129 in DataQueue<unsigned long>::pop_wait (
    this=0x7fffffffdef0) at main.cpp:55
#5  0x000000000040289f in Consumer<unsigned long>::run (this=0x7fffffffdec0)
    at main.cpp:111
#6  0x0000000000407527 in std::_Mem_fn<void (Consumer<unsigned long>::*)()>::operator() (this=0x60e428, __object=0x7fffffffdec0)
    at /usr/include/c++/4.7/functional:554
#7  0x0000000000407363 in std::_Bind_simple<std::_Mem_fn<void (Consumer<unsigned long>::*)()> (Consumer<unsigned long>*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (this=0x60e420) at /usr/include/c++/4.7/functional:1598
#8  0x00000000004071cb in std::_Bind_simple<std::_Mem_fn<void (Consumer<unsigned long>::*)()> (Consumer<unsigned long>*)>::operator()() (this=0x60e420)
    at /usr/include/c++/4.7/functional:1586
#9  0x00000000004070d6 in std::thread::_Impl<std::_Bind_simple<std::_Mem_fn<void (Consumer<unsigned long>::*)()> (Consumer<unsigned long>*)> >::_M_run() (
    this=0x60e408) at /usr/include/c++/4.7/thread:115
#10 0x00007ffff7b87c80 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#11 0x00007ffff76aaf8e in start_thread ()
   from /lib/x86_64-linux-gnu/libpthread.so.0
#12 0x00007ffff73d4a0d in clone () from /lib/x86_64-linux-gnu/libc.so.6

Thread 2 (Thread 0x7ffff6fd4700 (LWP 9802)):
#0  0x00007ffff76b182c in __lll_lock_wait ()
   from /lib/x86_64-linux-gnu/libpthread.so.0
#1  0x00007ffff76af55c in pthread_cond_signal@@GLIBC_2.3.2 ()
   from /lib/x86_64-linux-gnu/libpthread.so.0
#2  0x00007ffff7b84719 in std::condition_variable::notify_one() ()
   from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#3  0x0000000000402d3f in DataQueue<unsigned long>::push (this=0x7fffffffdef0, 
    aData=@0x7fffffffdeb0: 3) at main.cpp:24
#4  0x00000000004026de in Producer<unsigned long>::run (this=0x7fffffffdea0)
    at main.cpp:83
#5  0x000000000040758f in std::_Mem_fn<void (Producer<unsigned long>::*)()>::operator() (this=0x60e2a8, __object=0x7fffffffdea0)
    at /usr/include/c++/4.7/functional:554
#6  0x0000000000407415 in std::_Bind_simple<std::_Mem_fn<void (Producer<unsigned long>::*)()> (Producer<unsigned long>*)>::_M_invoke<0ul>(std::_Index_tuple<0ul>) (this=0x60e2a0) at /usr/include/c++/4.7/functional:1598
#7  0x00000000004071f3 in std::_Bind_simple<std::_Mem_fn<void (Producer<unsigned long>::*)()> (Producer<unsigned long>*)>::operator()() (this=0x60e2a0)
---Type <return> to continue, or q <return> to quit---
    at /usr/include/c++/4.7/functional:1586
#8  0x00000000004070f4 in std::thread::_Impl<std::_Bind_simple<std::_Mem_fn<void (Producer<unsigned long>::*)()> (Producer<unsigned long>*)> >::_M_run() (
    this=0x60e288) at /usr/include/c++/4.7/thread:115
#9  0x00007ffff7b87c80 in ?? () from /usr/lib/x86_64-linux-gnu/libstdc++.so.6
#10 0x00007ffff76aaf8e in start_thread ()
   from /lib/x86_64-linux-gnu/libpthread.so.0
#11 0x00007ffff73d4a0d in clone () from /lib/x86_64-linux-gnu/libc.so.6

Thread 1 (Thread 0x7ffff7fdb740 (LWP 9798)):
#0  0x00007ffff76b243d in nanosleep ()
   from /lib/x86_64-linux-gnu/libpthread.so.0
#1  0x000000000040225a in std::this_thread::sleep_for<long, std::ratio<1l, 1l> > (__rtime=...) at /usr/include/c++/4.7/thread:277
#2  0x0000000000401786 in main () at main.cpp:137
(gdb) 

Program output

Pushed: 1 TO the Queue 

 Popped: 1 FROM the Queue 


 Pushed: 2 TO the Queue 

 Popped: 2 FROM the Queue 

 Pushed: 3 TO the Queue 
 Popped: 3 FROM the Queue 



Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0x7ffff67d3700 (LWP 9803)]
0x00007ffff76ae053 in __pthread_mutex_unlock_usercnt ()
   from /lib/x86_64-linux-gnu/libpthread.so.0

Aucun commentaire:

Enregistrer un commentaire