dimanche 17 avril 2022

C++11 multithreaded cancellable slice-based work

I am trying to create a base class to manage a slice-based workload.
My approach was to create a base abstract class that handles the initialization/termination of the work and inherit from that class in specific classes that only specify the actual work and timings.
I also added the functionality in the base class to reinitialize the workload if a set number of errors occur.

This works as expected in a simple example (given below) and with most workloads that I have but when I try to use this with a specific workload (reading a serial port that's written to by an arduino) it completely messes up the stream read from arduino.
I suspect there is some problem with my approach but I couldn't figure it out...

Here is my code:

sliceWork.h

#pragma once
#include <future>
using namespace ::std;

class sliceWork
{
    int sliceIntervalMilliSeconds;
    int failureCounter;
    int maxFailsBeforeRestart;
    char* label = NULL;
    
    promise<void> workPromise;
    thread* workerThread = NULL;

    virtual void init() = 0;
    virtual bool oneSliceWork() = 0;
    void work(future<void> future);

public:
    sliceWork(int sliceInterval, int maxFails, const char* label);
    ~sliceWork();
    void initWork();
    void signalTerminate();
};

sliceWork.cpp

#include <string.h>
#include "sliceWork.h"

sliceWork::sliceWork(int interval, int maxFails, const char* workLabel)
{
    sliceIntervalMilliSeconds = interval;
    maxFailsBeforeRestart = maxFails;
    label = new char[strlen(workLabel) + 1];
    strcpy(label, workLabel);
}

sliceWork::~sliceWork()
{
    if (workerThread != NULL && workerThread->joinable())
        workerThread->join();
    printf("destructor %s\n", label);
    delete label;
    delete workerThread;
}

void sliceWork::initWork()
{
    failureCounter = 0;
    init();
    printf("Init work %s finished!\n", label);
    future<void> futureWorker = workPromise.get_future();
    workerThread = new thread(&sliceWork::work, this, move(futureWorker));
}

void sliceWork::work(future<void> future)
{
    using namespace ::std::chrono;
    steady_clock::time_point t0 = steady_clock::now();

    while (future.wait_for(chrono::milliseconds(1)) == future_status::timeout)
    {
        if (duration_cast<chrono::milliseconds>(steady_clock::now() - t0).count() 
            > sliceIntervalMilliSeconds)
        {
            if (!oneSliceWork())
            {
                if (++failureCounter > maxFailsBeforeRestart 
                    && maxFailsBeforeRestart > 0)
                {
                    init();
                    failureCounter = 0;
                }
            }
            t0 = steady_clock::now();
        }
    }
    printf("work terminated for %s!\n", label);
}

void sliceWork::signalTerminate()
{
    printf("request terminate for work %s...\n", label);
    workPromise.set_value();
}

And here is an example of using it that works as expected:

main.cpp

#include <string.h>
#include "sliceWork.h"

class A : public sliceWork
{
    void init() {
        printf("Init A...\n");
    }

    bool oneSliceWork() {
        printf("Working A...\n");
        return true;
    }
public:
    A(int slice, int max, const char* label) 
        : sliceWork(slice, max, label) 
    {
    }
};

class B : public sliceWork
{
    void init() {
        printf("Init B...\n");
    }

    bool oneSliceWork() {
        printf("Working B...\n");
        return true;
    }
public:
    B(int slice, int max, const char* label) 
        : sliceWork(slice, max, label) 
    {
    }
};

class C : public sliceWork
{
    void init() {
        printf("Init C...\n");
    }

    bool oneSliceWork() {
        printf("Working C...\n");
        return false;
    }
public:
    C(int slice, int max, const char* label) 
        : sliceWork(slice, max, label) 
    {
    }
};

int main()
{
     {
         A a(1000, 1000, "A");
         a.initWork();
         B b(2000, 1000, "B" );
         b.initWork();
         C c(700, 2, "C" );
         c.initWork();
         printf("Initializations finished!\n");
         ::std::this_thread::sleep_for(::std::chrono::seconds(7));
         a.signalTerminate();
         ::std::this_thread::sleep_for(::std::chrono::seconds(5));
         b.signalTerminate();
         ::std::this_thread::sleep_for(::std::chrono::seconds(4));
         c.signalTerminate();
     }
     getchar();
     return 0;
}

So, I want to ask if this approach is prone to error because the way I implemented the functionality.

Application is written in C++11 and targets an Raspberry PI 3b+ running the Raspberry's flavor of Debian 11 (Raspbian), if that is relevant.

Aucun commentaire:

Enregistrer un commentaire