mercredi 3 juin 2020

thread racing when working with vector of data in the thread function

Being an early stage c++/thread coder I am having some hard time with thread racing in one of my test functions and would truly appreciate some feedback.

My parent() function takes in as input a rather large vector of images (cv::Mat from openCV) and the task is to compute an operator on each one separately (e.g. dilation). I wrote a loop that creates threads using a worker() function and passes on each thread a subset of my input vector.

The result from each thread is to be stored on that input subset vector. My problem is that I cannot retrieve it back from within the parent().

As an alternative I passed the entire vector to worker() with start and end indices for each thread but then I run into some serious thread racing issues consuming more time than the serial approach.

Please see my code below.

std::vector<cv::Mat> worker(std::vector<cv::Mat>& ctn);
std::vector<cv::Mat> worker(std::vector<cv::Mat>& ctn) {

  int erosion_type = cv::MORPH_RECT;
  int erosion_size = 5;

  cv::Mat element = cv::getStructuringElement( erosion_type,
    cv::Size( 2*erosion_size + 1, 2*erosion_size+1 ),
    cv::Point( erosion_size, erosion_size ) );

  this_mutex.lock();
  for(uint it=0; it<ctn.size(); ++it) {
    cv::erode(ctn[it], ctn[it], element);
  }
  this_mutex.unlock();
  return ctn;
}


void parent(std::vector<cv::Mat>& imageSet) {

  auto start = std::chrono::steady_clock::now();

  const auto processor_count = std::thread::hardware_concurrency();

  std::vector<std::thread> threads;

  const int grainsize = imageSet.size() / processor_count;

  uint work_iter = 0;
  std::vector<cv::Mat> target; // holds the output vector

  // create the threads
  for(uint it=0; it<processor_count-1; ++it) {
    std::vector<cv::Mat> subvec(imageSet.begin() + work_iter, imageSet.begin() + work_iter + grainsize);
    threads.emplace_back([&,it]() {
      std::vector<cv::Mat> tmp = worker(subvec);
      target.insert(target.end(), tmp.begin(), tmp.end());
    });
    work_iter += grainsize;
  }

  // create the last thread for the remainder of the vector elements
  std::vector<cv::Mat> subvec(imageSet.begin() + work_iter, imageSet.end());
  int it = processor_count-1;
  threads.emplace_back([&,it]() {
    std::vector<cv::Mat> tmp = worker(subvec);
    target.insert(target.end(), tmp.begin(), tmp.end());
  });

  // join the threads
  for(int i=0; i<threads.size(); ++i) {
    threads[i].join();
  }

  auto end = std::chrono::steady_clock::now();
  std::chrono::duration<double> elapsed_seconds = end-start;
  std::cout << "elapsed time: " << elapsed_seconds.count() << "s\n";

  // try to reconstruct the output  
  imageSet.clear();
  for(int i=0; i<target.size(); ++i) {
    imageSet.push_back(target[i]);
  }
}

Aucun commentaire:

Enregistrer un commentaire