mardi 29 mars 2022

Stopping multiple threads at once

What do I miss in the program below with threads waiting for a condition_variable_any to determine when to stop ? In the program listed below, the threads stop in an impredictable way; some before the call to notify_all and some don't stop at all.

The condition variable used is defined as below:

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

The threads check if it is time to stop as below:

std::unique_lock<std::mutex> lock(interrupt_mutex);
const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
const auto timeout_expired = cv_status == std::cv_status::timeout;
if (!timeout_expired)
{
    quit = true;
}

The main thread signals the threads to stop as below:

std::unique_lock<std::mutex> lock(interrupt_mutex);
interrupt_cv.notify_all();

A possible output looks like:

Thread  1> Received interrupt signal at iteration 2
Thread  1> Terminate
Thread  2> Received interrupt signal at iteration 2
Thread  2> Terminate
Thread  4> Received interrupt signal at iteration 2
Thread  4> Terminate
**** Requesting all threads to stop ****
Waiting for all threads to complete...

Below the complete code that reproduces the problem:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

static std::mutex interrupt_mutex;
static std::condition_variable_any interrupt_cv;

int main()
{
    std::vector<std::thread> thread_handles;
    for (int thread_idx = 0; thread_idx < 4; ++thread_idx)
    {
        thread_handles.emplace_back(std::thread([thread_idx](const int thread_id)
        {
            int num_iterations = 0;
            auto quit = false;
            while (!quit)
            {
                // Fake processing time during the lock for testing purpose
                std::this_thread::sleep_for(std::chrono::milliseconds(200));
                ++num_iterations;

                // Check if need to stop with a timeout of 200ms 
                {
                    std::unique_lock<std::mutex> lock(interrupt_mutex);
                    const auto cv_status = interrupt_cv.wait_for(lock, std::chrono::milliseconds(1000));
                    if (const auto timeout_expired = cv_status == std::cv_status::timeout; !timeout_expired)
                    {
                        printf("Thread %2d> Received interrupt signal at iteration %d\n", thread_id, num_iterations);
                        quit = true;
                    }
                }
            }

            printf("Thread %2d> Terminate\n", thread_id);
        }, thread_idx + 1));
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // Signals all threads to stop
    {
        printf("**** Requesting all threads to stop ****\n");
        std::unique_lock<std::mutex> lock(interrupt_mutex);
        interrupt_cv.notify_all();
    }

    // Wait until all threads stop
    printf("Waiting for all threads to complete...\n");
    std::ranges::for_each(thread_handles, [](std::thread& thread_handle)
    {
        thread_handle.join();
    });

    printf("Program ends\n");
    return 0;
}

Aucun commentaire:

Enregistrer un commentaire