samedi 29 août 2015

Best way to delete job context objects in a pipelined processor

I appreciate it if someone suggests best way to finally delete context objects used represent a job processed through a pipeline of steps.

Here in the following code an object of class text_file_processing_request is created and sent to the io_service. My pipeline here is made up of one step, there could be more steps in real code.

Now, I would like to get opinions on the best way to delete these objects of type text_file_processing_request once they are done with.

Thank you!

#include <iostream>
#include "boost/asio.hpp"
#include "boost/thread.hpp"

using namespace std;
namespace asio = boost::asio;

typedef std::unique_ptr<asio::io_service::work> work_ptr;

typedef boost::function<void(void) > parse_and_aggregate_fun_t;

class file_processing_request{
 public:
    virtual void process(int num) = 0;
};

class text_file_processing_request : public file_processing_request {
public:
    virtual void process(int num) {
        cout << "text_file_processing_request::process " << num << endl;
    }
};

class processor {
public:
    processor(int threads) : thread_count(threads) {
        service = new asio::io_service();
        work = new work_ptr(new asio::io_service::work(*(service)));
        for (int i = 0; i < this->thread_count; ++i)
            workers.create_thread(boost::bind(&asio::io_service::run, service));
    }

    void post_task(parse_and_aggregate_fun_t job){
        this->service->post(job);
    }

    void stop(){
        this->work->reset();
    }

    void wait()
    {
        this->workers.join_all();
    }
private:
    int thread_count;
    work_ptr * work;
    asio::io_service* service;
    boost::thread_group workers;
};

class job_discoverer {
public:
    job_discoverer(processor *p): worker(p){}
    void start_producing(){

        do {
            file_processing_request * cPtr = new text_file_processing_request();
            this->worker->post_task(boost::bind(&file_processing_request::process, cPtr, 42));
        } while (0);

        this->worker->stop(); // no more data to process
    }
private:
    processor *worker;
};

int main(int argc, char** argv) {

    processor *pr = new processor(4);
    job_discoverer disocverer(pr);
    disocverer.start_producing();

    pr->wait();

    delete pr;

    return 0;
}

Aucun commentaire:

Enregistrer un commentaire