lundi 2 octobre 2017

C++ Threadpool for suspended threads

To perform some performance testing I need to start a batch of threads at a specific point within my program. Unfortunately I have to go the thread-based way and can't use tasks (std::async), because I have to pin the given threads to specific cores (using affinity). To realize this kind of behaviour I used a RAII - "one-shot" approach mentioned by Scott Meyers

This is my code so far:

template < class T >
typename std::decay< T >::type decay_copy( T&& v ) {
  return std::forward< T >( v );
}


/**
 * delayed thread - more or less copied from Scott Meyers:
 * http://ift.tt/2fET3CZ
 */
class del_thread {
  private:
    using future_t          = std::shared_future< void >;
    using thread_t          = std::thread;

    enum execution_state {
      WAITING, TRIGGERED, DISMISSED 
    };

    future_t        _future;
    thread_t        _thread;

    execution_state _state  = WAITING;

  public:
    del_thread() = delete;
    del_thread( del_thread const & ) = delete;
    del_thread &operator=( del_thread const & dt ) = delete;

    del_thread( del_thread && other ):
      _future( std::move( other._future ) ),
      _thread( std::move( other._thread ) ),
      _state( std::move( other._state ) ) {
      other._state = DISMISSED;
    }

    del_thread &operator=( del_thread && dt ) {
      _future  = std::move( dt._future );
      _thread   = std::move( dt._thread );
      _state    = std::move( dt._state );
      dt._state = DISMISSED;
      return *this;
    } 

    template< typename op_t >
    del_thread( op_t && operation, future_t const & future ):
      _thread(  [ operation = decay_copy(std::forward< op_t >( operation )),
                  _future = future,
                  &_state = _state
                ]() {
                  _future.wait();
                  if( _state == TRIGGERED || _state == DISMISSED ) {
                    return;
                  }
                  _state = TRIGGERED;
                  operation();
                }
            ) {
    }

    ~del_thread() {
      join();
    }

    void join() {
      if( _state == DISMISSED ) {
        return;
      }
      if( _thread.joinable() ) {
        _thread.join();
      }
    }
};

class batch_thread_pool {
  private:
    std::promise< void > _promise;
    std::shared_future< void > _future;
    std::vector< del_thread > _pool;

  public:
    batch_thread_pool() :
      _future( _promise.get_future().share() ) {}
    template< typename op_t >
    void add_thread( op_t && operation ) {
      _pool.emplace_back( del_thread(std::forward< op_t >( operation ), std::ref( _future ) ) );
    }

    void run_batch() {
      _promise.set_value();
      _pool.clear();
    }
};

The basic idea is to use a void-future to create a suspended thread, perform the thread setup stuff like setting affinity and / or priority and start all threads at the same time. As one can see, the main thread should join the added threads when the pool is cleared. To test the threadpool I wrote a little main, looking like that:

#include <chrono>
#include <iostream>

#include "threads.h"

void runFunc() {
  std::cout << "In runFunc...\n";
  return;
}
void run2Func() {
  std::cout << "In run2Func...\n";
  return;
}

int main() {
  batch_thread_pool tp;
  tp.add_thread( runFunc );
  tp.add_thread( run2Func );
  std::cout << "Working while thread 'suspended'...\n";
  tp.run_batch();
  std::cout << "Working while thread runs asynchronously...\n";
  std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
  std::cout << "Done!\n";
}

Unfortunately the Threads are not started consistently. Sometimes both methods (runFunc and run2Func) are executed, sometimes only one of them. I guess it is, because the main thread ends, before a join happens. Is this correct or does anyone knows, what I am doing wrong?

Sincerely

Aucun commentaire:

Enregistrer un commentaire