mardi 19 février 2019

What is the correct way to read a C++11 stream in a server context, given blocking?

Given that all standard C++11 stream reads are blocking, what is the correct way to read a C++ stream in a service context? I have tried placing the blocking reads inside a thread, but just encounter the block again when I try to join the thread.

  • Consider the following failed attempt at a solution.

I have a class Reader instantiated with a input stream, that may or may not be endless (EOF).

The Reader may be started (non-blocking), at which point the Reader starts a thread that reads the stream, line by line, doing something for each line, until, internally, an error occurs or EOF is reached, or externally, the Reader's stop method is called.

The Reader's stop method must join the thread, but this join will block, given that the thread is blocked on a read! Similarly for the destructor.

#include <iostream>
#include <thread>
#include <mutex>
#include <assert.h>

enum State {
    PRE, STARTED, STOPPED
};

class Reader {

    std::istream& source;
    State state { PRE };
    std::mutex mutex { };
    std::thread* myThread { nullptr };
    bool eof {false};
    bool error {false};

public:

    Reader(std::istream& source) :
            source(source) {
    }

    ~Reader() {
        if (myThread!=nullptr) {
            myThread->join();
        }
        delete myThread;
    }

    void start() {
        using namespace std;
        mutex.lock();
        if (state != PRE)
            throw -1;
        state = STARTED;

        myThread = new std::thread { [&]() {

            while(state==STARTED) {
                string line {};
                getline(source, line);

                if (source.eof()) {
                    mutex.lock();
                    state=STOPPED; // internal stop
                    eof=true;
                    mutex.unlock();
                    break;
                }

                if (!source.good()) {
                    mutex.lock();
                    state=STOPPED; // internal stop
                    error=true;
                    mutex.unlock();
                    break;
                }

                if (state==STARTED) { // may have been stopped externally
                    // do something important
                    cout << "'"<<line<<"'"<<endl;
                }
            }

        }

        };
        mutex.unlock();
    }

    void stop() {
        mutex.lock();
        if (state != STARTED)
            throw -2;
        state = STOPPED;
        myThread->join();
        delete myThread;
        myThread=nullptr;
        mutex.unlock();
    }

    bool isStarted() const {
        return state==STARTED;
    }
    bool isStopped() const {
        return state==STOPPED;
    }

    bool isEof() const {
        return eof;
    }

    bool isError() const {
        return error;
    }
};

int main() {
    using namespace std;

    Reader r { cin };
    r.start();
    cout << "[SLEEPING]..." << endl;
    this_thread::sleep_for(chrono::seconds(6));
    cout << "...[AWAKE]" << endl;
    cout << "[EOF:"<<to_string(r.isEof())<<"]" << endl;
    cout << "[ERROR:"<<to_string(r.isError())<<"]" << endl;
    cout << "[STILL STARTED:"<<to_string(r.isStarted())<<"]" << endl;
    if (r.isStarted()) {
        cout << "[STOPPING]..." << endl;
        r.stop();
        cout << "...[STOPPED]" << endl;
    }
    cout << "[EXIT]" << endl;
    return 0;
}

The main function demonstrates the problem. A Reader is instantiated with std::cin and started. Then the main thread sleeps for a while.

When the main thread awakes, it checks if the Reader is still started (which it will be unless an error or EOF was encountered). If the Reader is still started (in which case the thread is blocked on a read), we stop the Reader.

  • But this stop blocks when it attempts to join the thread.

If you run the code (link with pthread) and press Ctrl-D during the sleeping period, all terminates correctly (after the sleep of course).

But if you wait out the sleep period, the program blocks on stop, and never exits.

  • How does one solve this issue in C++11?
  • If there is no solution in C++11 (how can there not be?), is there a Posix solution?

Aucun commentaire:

Enregistrer un commentaire