jeudi 23 juillet 2020

Protecting read/write access to/from a file from multiple threads

This is a follow on from a previous question here - I received some wonderful advice that helped me move my code along. For the next piece of the puzzle, I figured it warranted a new post - I hope that's okay.

I have some code that creates requests in a main loop, to read from or write to a file and executes each request in its own thread. With the help I got from the earlier post, I was able to extend my code to add a request queue with multiple entries and read/write functions that merely sleep for a short time to emulate file access.

Now I want to actually learn how to read and write to/from the files when there can potentially one or more threads trying to read and/or write the same file at the same time.

To make this easier to test, I limit the file to a single instance otherwise I need to consider the cases where the file doesn't exist etc. In the real application, there will be several hundred files in play but my limited understanding suggests that if I can make the locking mechanism work for a single file, it'll work when there are many.

I am still trying improve my understanding of threading and first tried adding an std::mutex with a global lock variable in the read_file() & write_file() functions but I got into a terrible mess.

Can someone please point me at the correct approach I should investigate to make this work in a robust fashion.

#include <fstream>
#include <future>
#include <iostream>
#include <sstream>
#include <iomanip>
#include <string>
#include <random>

std::vector< std::future<std::string> > requests;

int random_int(int start, int end)
{
    std::random_device rd;
    std::mt19937 generator(rd());
    std::uniform_int_distribution<> distrib(start, end);

    return distrib(generator);
}

const std::string generate_filename()
{
    std::ostringstream filename;

    // use a single file for testing
    //filename << "file_" << std::setfill('0') << std::setw(2) << random_int(1, 20) << ".txt";

    filename << "file.txt";

    return filename.str();
}

std::string write_file(const std::string filename)
{
    std::cout << "write_file: filename is " << filename << std::endl;

    // slow things down so i can follow
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    std::ofstream ofs(filename);
    if (!ofs)
    {
        return std::string("ERROR");
    }

    const char chr = 'A' + random_int(0, 25);
    for (int i = 0; i < 64; ++i)
    {
        ofs << chr;
    }
    ofs << std::endl;
    ofs.close();

    std::cout << "write_file: written to " << filename << std::endl;

    return std::string("WRITTEN");
}

std::string read_file(const std::string filename)
{
    std::cout << "read_file: filename is " << filename << std::endl;

    // slow things down so i can follow
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));

    std::ifstream ifs(filename);
    if (!ifs.is_open())
    {
        return std::string("ERROR OPEINING FILE");
    }

    std::string contents;
    if (std::getline(ifs, contents))
    {
        std::cout << "    read_file: read from " << filename << std::endl;
        return std::string(contents);
    }

    return std::string("ERROR READING CONTENTS");
}

void add_request()
{
    // randomly add a read or a write request
    if (random_int(1, 50) > 25)
        requests.push_back(std::async(std::launch::async, write_file, generate_filename()));
    else
        requests.push_back(std::async(std::launch::async, read_file, generate_filename()));
}

int main(int argc, char* argv[])
{
    int max_requests = 10;

    // avoid falling out of the loop on first pass
    add_request();

    do {
        std::cout << "working: requests in queue = " << requests.size() << std::endl;

        // randomly add a request if we still have not added the max
        if (random_int(1, 5) == 1)
        {
            if (--max_requests > 0)
            {
                add_request();
            }
        }

        // service the future for each item in the request queue
        for (auto iter = requests.begin(); iter != requests.end(); )
        {
            if ((*iter).wait_for(std::chrono::milliseconds(1)) == std::future_status::ready)
            {    
                std::cout << "Request completed, removing it from the queue: result: " << (*iter).get() << std::endl;
                iter = requests.erase(iter);
            }
            else
            {
                ++iter;
            }
        }

    // once the queue is empty we exit - in the real app, we do not 
    // and keep processing requests until the app exits normally
    } while (requests.size() > 0);
}

Aucun commentaire:

Enregistrer un commentaire