lundi 26 juin 2017

C++11: Issue with a simple event loop implementation

I am trying to implement a simple event loop implementation with the following characteristics: 1. Single consumer, multiple producers 2. Looper thread starts when BasicEventLoop object is constructed and automatically ends when BasicEventLoop Is destroyed 3. No busy wait when there is no work to do

Code (.h and .cpp) pasted below. But I am seeing some issues in some cases when the object is destroyed on the thread that is owned by the object itself. This can happen with a peculiar use of shared_ptr to hold BasicEventLoop.

Here is the code: basic_event_loop.h:

#ifndef BASE_BASIC_EVENT_LOOP_H_
#define BASE_BASIC_EVENT_LOOP_H_

#include <atomic>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>

namespace base {

// This ia basic implementation of an event loop. It does the following:
// * Spawns a new thread (a standard thread)
// * Exposes an API to add tasks to run on the background thread.
// * Continually runs tasks already queued up.
// * Shuts down the thread automatically in destructor.
class BasicEventLoop {
public:
  // Starts the event loop.
  BasicEventLoop();
  // Add a new task to run on the loop.
  void AddTask(std::function<void()> f);
  // Stops the loop and shuts down the thread.
  ~BasicEventLoop();
private:
  void RunLoop();

  std::mutex mQueueMutex;
  std::condition_variable mConditionVariable;
  std::atomic_bool mShouldQuit;
  std::queue<std::function<void()>> mTaskQueue;
  std::unique_ptr<std::thread> mEventThread;
};

}  // namespace base

#endif  // BASE_BASIC_EVENT_LOOP_H_

basic_event_loop.cpp:

#include "base/basic_event_loop.h"

#include <atomic>

using std::function;
using std::lock_guard;
using std::unique_lock;
using std::mutex;
using std::thread;

namespace base {

BasicEventLoop::BasicEventLoop()
  : mShouldQuit(false),
    mEventThread(new thread(&BasicEventLoop::RunLoop, this)) {
}

BasicEventLoop::~BasicEventLoop() {
  // Indicate the thread to stop executing tasks.
  mShouldQuit = true;
  mConditionVariable.notify_one();
  // In some cases the destructor will be caleld on the same thread that is running the loop.
  // so the thread will be waiting for itself to join. That will cause a deadlock.
  // Hence, wait for the thread to finish if it's a different thread, otherwise detach.
  if (mEventThread->get_id() != std::this_thread::get_id()) {
    if (mEventThread.get())
      mEventThread->join();
  } else {
    mEventThread->detach();
  }
}

void BasicEventLoop::AddTask(function<void()> f) {
  if (mShouldQuit)
    return;

  lock_guard<mutex> lock(mQueueMutex);
  mTaskQueue.push(f);
  mConditionVariable.notify_one();
}

void BasicEventLoop::RunLoop() {
  // The wait operation after acquiring the mutex atomically releases the 
  // mutex and suspends the execution of the thread. When the condition 
  // variable is notified, the thread is awakened and the mutex is atomically
  // reacquired.
  while(1) {
    std::function<void()> task;
    { // new scope for unique_lock
      // need unique_lock for condition_variable
      unique_lock<mutex> lock(mQueueMutex);
      // The condition variable wakes up whenever the predicate specified as the 
      // lambda is true.
      mConditionVariable.wait(lock, [&] {return !mTaskQueue.empty() || mShouldQuit; });

      // Means no tasks to process and destructor has already been called.  
      if (mTaskQueue.empty() && mShouldQuit)
        return;

      if (!mTaskQueue.empty()) {
        task = mTaskQueue.front();
        mTaskQueue.pop();
      }
    }
    task();
  }
}

}  // namespace base

Aucun commentaire:

Enregistrer un commentaire