lundi 31 août 2020

c++ thread worker failure under high load

I have been working on a idea for a system where I can have many workers that are triggered on a regular basis by a a central timer class. The part I'm concerned about here is a TriggeredWorker which, in a loop, uses the mutex & conditionVariable approach to wait to be told to do work. It has a method trigger that is called (by a different thread) that triggers work to be done. It is an abstract class that has to be subclassed for the actual work method to be implemented.

I have a test that shows that this mechanism works. However, as I increase the load by reducing the trigger interval, the test starts to fail. When I delay 20 microseconds between triggers, the test is 100% reliable. As I reduce down to 1 microsecond, I start to get failures in that the count of work performed reduces from 1000 (expected) to values like 986, 933, 999 etc..

My questions are: (1) what is it that is going wrong and how can I capture what is going wrong so I can report it or do something about it? And, (2) is there some better approach that I could use that would be better? I have to admit that my experience with c++ is limited to the last 3 months, although I have worked with other languages for several years.

Many thanks for reading...

Here are the key bits of code:

Triggered worker header file:

#ifndef TIMER_TRIGGERED_WORKER_H
#define TIMER_TRIGGERED_WORKER_H

#include <thread>
#include <plog/Log.h>

class TriggeredWorker {
private:
    std::mutex mutex_;
    std::condition_variable condVar_;
    std::atomic<bool> running_{false};
    std::atomic<bool> ready_{false};

    void workLoop();
protected:
    virtual void work() {};
public:
    void start();
    void stop();
    void trigger();
};

#endif //TIMER_TRIGGERED_WORKER_H

Triggered worker implementation:

#include "TriggeredWorker.h"

void TriggeredWorker::workLoop() {
    PLOGD << "workLoop started...";

    while(true) {
        std::unique_lock<std::mutex> lock(mutex_);
        condVar_.wait(lock, [this]{
            bool ready = this->ready_;
            bool running = this->running_;
            return ready | !running; });
        this->ready_ = false;

        if (!this->running_) {
            break;
        }

        PLOGD << "Calling work()...";
        work();

        lock.unlock();
        condVar_.notify_one();
    }

    PLOGD << "Worker thread completed.";
}

void TriggeredWorker::start() {
    PLOGD << "Worker start...";
    this->running_ = true;
    auto thread = std::thread(&TriggeredWorker::workLoop, this);
    thread.detach();
}

void TriggeredWorker::stop() {
    PLOGD << "Worker stop.";
    this->running_ = false;
}

void TriggeredWorker::trigger() {
    PLOGD << "Trigger.";
    std::unique_lock<std::mutex> lock(mutex_);
    ready_ = true;
    lock.unlock();
    condVar_.notify_one();
}

and the test:

#include "catch.hpp"
#include "TriggeredWorker.h"
#include <thread>

TEST_CASE("Simple worker performs work when triggered") {
    static std::atomic<int> twt_count{0};

    class SimpleTriggeredWorker : public TriggeredWorker {
    protected:
        void work() override {
            PLOGD << "Incrementing counter.";
            twt_count.fetch_add(1);
        }
    };

    SimpleTriggeredWorker worker;

    worker.start();

    for (int i = 0; i < 1000; i++) {
        worker.trigger();
        std::this_thread::sleep_for(std::chrono::microseconds(20));
    }

    std::this_thread::sleep_for(std::chrono::seconds(1));

    CHECK(twt_count == 1000);

    std::this_thread::sleep_for(std::chrono::seconds(1));
    worker.stop();
}

Aucun commentaire:

Enregistrer un commentaire