dimanche 1 novembre 2020

std::packaged_task/std::future memory leak

I'm trying to write a C++11 function run_with_timeout that takes a callable, runs it in a different thread and throws a custom TimeoutError when that thread does not terminate within a specified time limit. The callable should also be allowed to throw a AbortedError such that it is possible to get the thread to stop executing after the TimeoutError has been thrown by setting a std::atomic<bool> flag or similar.

Here is my function and an example of how I want to use it in main:

#include <atomic>
#include <chrono>
#include <future>
#include <iostream>
#include <stdexcept>
#include <thread>
#include <utility>

#include <boost/optional.hpp>

struct TimeoutError : public std::exception
{};

struct AbortedError : public std::exception
{};

template<typename FUNC, typename ...ARGS>
using ReturnType = decltype(std::declval<FUNC>()(std::declval<ARGS>()...));

template<typename FUNC, typename ...ARGS>
using ReturnTypeWrapper = boost::optional<ReturnType<FUNC, ARGS...>>;

template<typename FUNC, typename ...ARGS, typename REP, typename PERIOD>
std::future<ReturnType<FUNC, ARGS...>> future_with_timeout(
  std::string const &what,
  std::chrono::duration<REP, PERIOD> const &timeout,
  FUNC &&f,
  ARGS &&...args)
{
  std::packaged_task<ReturnType<FUNC, ARGS...>()> task(
   [&]() { return f(std::forward<ARGS>(args)...); });

  auto future(task.get_future());

  std::thread thread(std::move(task));

  if (future.wait_for(timeout) == std::future_status::timeout) {
     thread.detach();

     throw TimeoutError();

  } else {
     thread.join();

     return future;
  }
}

template<typename FUNC, typename ...ARGS, typename REP, typename PERIOD>
ReturnType<FUNC, ARGS...> run_with_timeout(
  std::string const &what,
  std::chrono::duration<REP, PERIOD> const &timeout,
  FUNC &&f,
  ARGS &&...args)
{
  auto future(future_with_timeout(
    what,
    timeout,
    [&](ARGS &&...args) {
      try {
        return ReturnTypeWrapper<FUNC, ARGS...>(f(std::forward<ARGS>(args)...));
      } catch (AbortedError const &aborted) {
        return ReturnTypeWrapper<FUNC, ARGS...>();
      }
    },
    std::forward<ARGS>(args)...));

  return *future.get();
}

int main()
{
  auto add_timeout = [](int a, int b, std::atomic<bool> &aborted)
  {
    for (;;) {
      if (aborted.load())
        throw AbortedError(); 

      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }

    return a + b;
  };

  std::atomic<bool> aborted(false);

  try {
    std::cout << run_with_timeout("add_timeout", std::chrono::milliseconds(100), add_timeout, 1, 2, aborted) << std::endl;
  } catch (TimeoutError const &e) {
    aborted.store(true);
    std::cout << "timeout" << std::endl;
  }
}

add is dummy example of a function that is "stuck" and should run into a timeout and be subsequently aborted. This seems to work fine, the program outputs "timeout" and in the detached thread running add an AbortedError is thrown and subsequently handled by the try catch block inside the lambda inside run_with_timeout.

However, when I run this program under valgrind, possibly lost as well as still reachable memory blocks are reported and I don't understand why. Here is the output of valgrind --tool=memcheck --leak-check=full (program was compiled with gcc 10.2.0 without optimization):

==120999== HEAP SUMMARY:
==120999==     in use at exit: 432 bytes in 4 blocks
==120999==   total heap usage: 7 allocs, 3 frees, 74,296 bytes allocated
==120999== 
==120999== 288 bytes in 1 blocks are possibly lost in loss record 4 of 4
==120999==    at 0x483CB65: calloc (vg_replace_malloc.c:760)
==120999==    by 0x40142EB: _dl_allocate_tls (in /usr/lib/ld-2.32.so)
==120999==    by 0x489613C: pthread_create@@GLIBC_2.2.5 (in /usr/lib/libpthread-2.32.so)
==120999==    by 0x497DEF9: __gthread_create (gthr-default.h:663)
==120999==    by 0x497DEF9: std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) (thread.cc:135)
==120999==    by 0x11024D: std::thread::thread<std::packaged_task<boost::optional<int> ()>, , void>(std::packaged_task<boost::optional<int> ()>&&) (in a.out)
==120999==    by 0x10B9A6: _Z19future_with_timeoutIZ16run_with_timeoutIRZ4mainEUliiRSt6atomicIbEE_JiiS3_ElSt5ratioILl1ELl1000EEEDTclcl7declvalIT_EEspcl7declvalIT0_EEEERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKNSt6chrono8durationIT1_T2_EEOS8_DpOS9_EUlOiST_S3_E_JiiS3_ElS7_ESt6futureISA_ESI_SP_SQ_SS_ (in a.out)
==120999==    by 0x10B82D: decltype (((declval<main::{lambda(int, int, std::atomic<bool>&)#1}&>)())((declval<int>)(), (declval<int>)(), (declval<std::atomic<bool>&>)())) run_with_timeout<main::{lambda(int, int, std::atomic<bool>&)#1}&, int, int, std::atomic<bool>&, long, std::ratio<1l, 1000l> >(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::chrono::duration<long, std::ratio<1l, 1000l> > const&, std::ratio<1l, 1000l>&&, (main::{lambda(int, int, std::atomic<bool>&)#1}&)...) (in a.out)
==120999==    by 0x10B5BD: main (in a.out)
==120999== ``
==120999== LEAK SUMMARY:
==120999==    definitely lost: 0 bytes in 0 blocks
==120999==    indirectly lost: 0 bytes in 0 blocks
==120999==      possibly lost: 288 bytes in 1 blocks
==120999==    still reachable: 144 bytes in 3 blocks
==120999==         suppressed: 0 bytes in 0 blocks
==120999== Reachable blocks (those to which a pointer was found) are not shown.
==120999== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==120999== 
==120999== For lists of detected and suppressed errors, rerun with: -s
==120999== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)

What can I do to fix this? Or is there even something fundamentally wrong with my code?

Aucun commentaire:

Enregistrer un commentaire