mardi 16 août 2016

Writing a boost::asio async TCP client to read fixed blocks of data

I need some guidance on how to write a async TCP Asio client to retrieve data from a remote server. This is a proprietary file retrieval protocol that I need to implement. The file server in this case is something I do not have any control over. The server waits for a start request data structure (shown below) to be sent to its port 1080. Ideally an async echo client would be most similar to my needs (except instead of strings I send to exchange variable sized data structures and receive similar data structures (with a fixed common header as described later). Unfortunately the boost asio examples, do not include such an example.

Once the start data request data structure has been received and validated by the server, it sends a response data structure that the client needs to check before proceeding to the download handshake loop (something along the lines of tftp protocol except this protocol os over TCP).

If this response data structure is received and valid, then the client moves to the next state - the data transfer loop - where it sends requests for data blocks.

I tried to put some basics together for this based off the async read asio example but that is stream based and I really am looking for something more along the lines of tcp datagrams where I cannot send out the next handshake request until a full data structure has been received. Each of the data structures has a similar header structure - a message identifier and a length field and then the variable data associated with each of the different messages.

I'm sure this is not a particularly unique type of TCP based protocol but I cannot find any similar example unless I do it synchronously (which I would prefer not to do if at all possible).

Unfortunately, I have not made more progress into the guts this client code as I am a bit stumped on how to asynchronously read fixed size TCP blocks of data and then send new request blocks in the usual ASIO callback fashion.

The data structure below is the initial request to kick off the protocol (the one the server is waiting for). All the other data structures are similar to this one (with the same initial 5 fields a 2 uint16_t and a 4 uint32_ts) with each packet containing additional data after these depending on the mPacketId field.

/**
 */
struct ConnectRequest {
    // fields
    uint16_t mMessageId;
    uint16_t mByteCount;
    uint32_t mPacketId;
    uint32_t mReserved;
    uint32_t mCRC;

    //! move only message.
    ConnectRequest() = delete;

    //! delete the copy constructor.
    ConnectRequest(const ConnectRequest&) = delete;

    //! move constructor.
    ConnectRequest(ConnectRequest&&) = default;

    //! move assignment operator.
    ConnectRequest& operator=(ConnectRequest&&) = default;

    //! not assignable via lValue.
    ConnectRequest& operator=(const ConnectRequest&) = delete;

    /**
     * Constructor.
     *
     * @param rPacketId [in] packet ID.
     */
    explicit ConnectRequest(
        const uint32_t& rPacketId)
        : mMessageId(static_cast<uint16_t>(MessageType::ConnectRequest))
        , mByteCount(sizeof(this))
        , mPacketId(rPacketId)
        , mReserved(0)
        , mCRC(0)
    {}

    /**
     * Construct message from raw memory buffer - no need for
     * serialization interface.<p>
     *
     * @param pData  [in] raw buffer pointer to datagram data in big
     *               endian format.
     * @param rDataLength
     *               [in] datagram length - must be
     *               sizeof(ConnectRequest).
     *
     * @exception thrown if a bad argument is passed to the
     *                   constructor.
     */
    explicit ConnectRequest(
        const uint8_t* pData,
        const size_t& rDataLength)
    {
        if (pData && rDataLength == sizeof(*this)) {
            UtlSafeBuffer safeBuffer(UtlSafeBuffer::ByteOrder::BigEndian);
            safeBuffer.write(pData, rDataLength);
            safeBuffer.setPosition(0, UtlSafeBuffer::OffsetMode::START);
            safeBuffer.read(mMessageId);
            safeBuffer.read(mByteCount);
            safeBuffer.read(mPacketId);
            safeBuffer.read(mReserved);
            safeBuffer.read(mCRC);
        } else {
            throw std::invalid_argument(
                "invalid buffer size:" + rDataLength);
        }
    }

    /**
     * Equality comparison - used to keep the log lean and only show
     * changes, these are POD structs, so memcmp should work fine.
     *
     * @param rhs [in] ConnectRequest message.
     *
     * @return true if this messages are the same.
     *
     */
    inline bool operator==(const ConnectRequest& rhs) const {
        return memcmp(this, &rhs, sizeof(ConnectRequest)) == 0;
    }

    //! returns the negated version of operator==(rhs).
    inline bool operator!=(const ConnectRequest& rhs) const {
        return !operator==(rhs);
    }

    /**
     * Stream insert operator.<p>
     *
     * @param os     [in,out] output stream.
     * @param rhs    [in] ConnectRequest to send to the output
     *               stream.
     *
     * @return a reference to the updated stream.
     */
    friend std::ostream& operator<<(
        std::ostream& os, const ConnectRequest& rhs) {
        os  << "ConnectRequest: "
            << "mMessageId[" << rhs.mMessageId
            << "], mByteCount[" << rhs.mByteCount
            << "], mPacketId[" << rhs.mPacketId
            << "], mCRC[" << rhs.mCRC
            << "]";
        return os;
    }
};

The modified code example from boost is as follows: Note that I changed the input_buffer_ field from the original as the original one uses boost::asio::streambuf input_buffer_. I changed this mainly because I do not know how to use that for reading fixed size data structures and I thought that using a fixed 1024 byte array would be more than adequate.

If someone could either point me in the direction of a similar example to the problem I am trying to solve or show me how to get the flow control going I would be very grateful.

Thanks and apologies that I am stuck at this fairly basic stage but I am fairly new to boost asio - especially for tcp.

#pragma once

// SYSTEM INCLUDES
#include <memory>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/deadline_timer.hpp>

// APPLICATION INCLUDES
#include "fhdb/ConnectRequest.h"

// DEFINES
// MACROS
// EXTERNAL FUNCTIONS
// EXTERNAL VARIABLES
// CONSTANTS
// STRUCTS
// FORWARD DECLARATIONS

class FHDBUtility {
public:
    FHDBUtility(boost::asio::io_service& io_service)
        : stopped_(false)
        , socket_(io_service)
        , input_buffer_(std::make_unique<uint8_t[]>(1024))
        , deadline_(io_service)
        , heartbeat_timer_(io_service)
    {}

    // Called by the user of the FHDBUtility class to initiate the connection process.
    // The endpoint iterator will have been obtained using a tcp::resolver.
    void start(boost::asio::ip::tcp::resolver::iterator endpoint_iter) {
        // Start the connect actor.
        start_connect(endpoint_iter);

        // Start the deadline actor. You will note that we're not setting any
        // particular deadline here. Instead, the connect and input actors will
        // update the deadline prior to each asynchronous operation.
        deadline_.async_wait(boost::bind(&FHDBUtility::check_deadline, this));
    }

    // This function terminates all the actors to shut down the connection. It
    // may be called by the user of the FHDBUtility class, or by the class itself in
    // response to graceful termination or an unrecoverable error.
    void stop()
    {
        stopped_ = true;
        socket_.close();
        deadline_.cancel();
        heartbeat_timer_.cancel();
    }

private:
    void start_connect(boost::asio::ip::tcp::resolver::iterator endpoint_iter)
    {
        if (endpoint_iter != boost::asio::ip::tcp::resolver::iterator())
        {
            std::cout << "Trying " << endpoint_iter->endpoint() << "...\n";

            // Set a deadline for the connect operation.
            deadline_.expires_from_now(boost::posix_time::seconds(60));

            // Start the asynchronous connect operation.
            socket_.async_connect(endpoint_iter->endpoint(),
                boost::bind(&FHDBUtility::handle_connect,
                    this, _1, endpoint_iter));
        } else {
            // There are no more endpoints to try. Shut down the FHDBUtility.
            stop();
        }
    }

    void handle_connect(const boost::system::error_code& ec,
        boost::asio::ip::tcp::resolver::iterator endpoint_iter)
    {
        if (stopped_) {
            return;
        }
        // The async_connect() function automatically opens the socket at the start
        // of the asynchronous operation. If the socket is closed at this time then
        // the timeout handler must have run first.
        if (!socket_.is_open()) {
            std::cout << "Connect timed out\n";
            // Try the next available endpoint.
            start_connect(++endpoint_iter);
        } else if (ec) {
            // Check if the connect operation failed before the deadline expired.
            std::cout << "Connect error: " << ec.message() << "\n";
            // We need to close the socket used in the previous connection attempt
            // before starting a new one.
            socket_.close();
            // Try the next available endpoint.
            start_connect(++endpoint_iter);
        } else { // Otherwise we have successfully established a connection.        
            std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
            // send async connect request
            sendConnectRequest();
        }
    }

    void start_read() {
        // Set a deadline for the read operation.
        deadline_.expires_from_now(boost::posix_time::seconds(30));
        boost::asio::async_read(socket_, input_buffer_, 1024,
            boost::bind(&FHDBUtility::handle_read, this, _1));
    }

    void handle_read(const boost::system::error_code& ec) {
        if (stopped_) {
            return;
        }

        if (!ec) {
            // Extract the newline-delimited message from the buffer.
            start_read();
        } else {
            std::cout << "Error on receive: " << ec.message() << "\n";
            stop();
        }
    }

    void sendConnectRequest() {
        if (stopped_) {
            return;
        }
        // make initial connect request using PacketId set to 1
        const auto connectRequest = std::make_shared<ConnectRequest>(1);
        // send the connect request packet to the CMC to initiate the protocol
        boost::asio::async_write(socket_, 
            boost::asio::buffer(connectRequest.get(), sizeof(ConnectRequest)),
            boost::bind(&FHDBUtility::handle_write, this, _1, connectRequest->mPacketId));
    }

    void handle_write(const boost::system::error_code& ec, const uint32_t packetId) {
        if (stopped_) {
            return;
        }
        if (!ec) {
            // Wait 10 seconds before sending the next heartbeat.
            heartbeat_timer_.expires_from_now(boost::posix_time::seconds(10));
            switch (packetId) {
            }
            //heartbeat_timer_.async_wait(boost::bind(&FHDBUtility::start_write, this));
        } else {
            std::cout << "Error on heartbeat: " << ec.message() << "\n";
            stop();
        }
    }

    void check_deadline() {
        if (stopped_) {
            return;
        }

        // Check whether the deadline has passed. We compare the deadline against
        // the current time since a new asynchronous operation may have moved the
        // deadline before this actor had a chance to run.
        if (deadline_.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
            // The deadline has passed. The socket is closed so that any outstanding
            // asynchronous operations are cancelled.
            socket_.close();
            // There is no longer an active deadline. The expiry is set to positive
            // infinity so that the actor takes no action until a new deadline is set.
            deadline_.expires_at(boost::posix_time::pos_infin);
        }
        // Put the actor back to sleep.
        deadline_.async_wait(boost::bind(&FHDBUtility::check_deadline, this));
    }

private:
    bool stopped_;
    boost::asio::ip::tcp::socket socket_;
    std::unique_ptr<uint8_t[]> input_buffer_;
    boost::asio::deadline_timer deadline_;
    boost::asio::deadline_timer heartbeat_timer_;
};

Aucun commentaire:

Enregistrer un commentaire