mardi 29 mars 2016

setting up multiple boost::asio udp socket listeners and transmitters

I am having some issues putting together a boost based async UDP networking daemon application. At its core, my application is timer driven using a boost timer callback function which is shown below. The behavior of this callback transmits a datagram stored in a smart pointer member mpStatusMessage to multiple predefined endpoints stored in a vector of std::unique_ptr endpoints mRemoteEndPointInfo which are running remotely as UDP listeners. These remote listeners are also send asynchronous heartbeat UDP datagrams back to my application application which I have setup to run as a daemon.

The server part of my application needs to setup multiple UDP listening sockets to handle the incoming traffic that can arrive at any time. I am having trouble with setting up the fields below and I would like to know if there is a similar example to perform what I am trying to do. I would like to avoid creating separate threads if that at all possible as I think boost asio should be able to handle this easily enough using its IO_Service object, anyway the heart beats of my application are only at 1HZ.

std::shared_ptr<boost::asio::io_service> mpIOService;
std::unique_ptr<boost::asio::ip::udp::socket> mpSenderSocket;
std::vector<std::unique_ptr<boost::asio::ip::udp::endpoint>> mRemoteEndPointInfo;
std::vector<std::unique_ptr<boost::asio::ip::udp::socket>> mServerSocketInfo;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpServerEndpoint;
std::unique_ptr<boost::asio::steady_timer> mpStatusTimer;
std::unique_ptr<uint8_t[]> mpReceiveBuffer;

I think I only need a single mpSenderSocket to communicate with the mRemoteEndPointInfo but I am not sure about this.

The constructor code initializes the various message datagram smart pointers - these are basically structs stored as std::unique_ptr<T>s

CADaemon::CADaemon(
    const CAFileInfo& rCAServiceRows,
    const CFGParameterInfo& rCFGParameterInfo,
    std::shared_ptr<boost::asio::io_service> pIOService)
    : ca::CAService::Service{}

    , mpStatusMessage{}
    , mpHeartbeatMessage{}
    , mpOpSupportMessage{}
    , mpOTPMessage{}
    , mpIOService(pIOService)
    , mRemoteEndPointInfo{}
    , mServerSocketInfo{}
    , mpServerEndpoint{}
    , mpStatusTimer{}
    , mpReceiveBuffer(std::make_unique<uint8_t[]>(512))
    . . .
{
    . . .
    // range represents lines of networking config info
    // iterate over the range allocating socket listeners
    for (const auto& rNext : range) {
        auto serviceIter = rNext.find(CAParameter::ServiceName);
        if (serviceIter != rNext.cend()) {
            auto rxIter = . . .
            std::smatch match;
            if (std::regex_match(. . .) {
                // update the remove endpoints (where we send messages)
                auto port = static_cast<uint16_t>(std::stoi(match[5]));
                mRemoteEndPointInfo.emplace_back(std::make_unique<
                    udp::endpoint>(address::from_string(match[1]), port));
            }
            auto txIter = . . .
            if (std::regex_match(. . .) {
                // allocate sockets to listen for incoming messages on these ports
                auto port = static_cast<uint16_t>(std::stoi(match[5]));
                mServerSocketInfo.emplace_back(std::make_unique<udp::socket>(
                    *mpIOService, udp::endpoint(udp::v4(), port)));
            }
        }
    }
    // open a single sender socket
    if (!mRemoteEndPointInfo.empty()) {
        mpSenderSocket = std::make_unique<udp::socket>(*mpIOService);
        mpSenderSocket->open(boost::asio::ip::udp::v4());
    }
}

The start code is as follows:

void
CADaemon::start()
{
    mpServerEndpoint = std::make_unique<boost::asio::ip::udp::endpoint>();
    mpStatusTimer = std::make_unique<steady_timer>(*mpIOService, milliseconds(1000));
    mpStatusTimer->async_wait(boost::bind(&CADaemon::heartBeatTimer, this, milliseconds(1000)));
    // kick off the boost::asio proactor
    start_receive();
    // launch gRPC RPC service daemon as a stand alone detached thread
    std::thread{ &CADaemon::grpcService, this, "0.0.0.0:50051" }.detach();
}

And

/** initiate the receive operation */
void
CADaemon::start_receive()
{
    // initiate multiple asynchronous read operations
    for (auto& next : mServerSocketInfo) {
        next->async_receive_from(
            boost::asio::buffer(mpReceiveBuffer.get(), 512),
            *mpServerEndpoint,
            boost::bind(&CADaemon::handle_receive, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
}

The core of the application is the following heartbeat callback

void
CADaemon::heartBeatTimer(
    const milliseconds& rHeartBeatMs)
{
    static auto& gEvtLog = gpLogger->getLoggerRef(
        Logger::LogDest::EventLog);
    // changed from absolute expires_at (previous + rHeartBeatMs) to
    // expires_from_now to account for background change in the system
    // time from the CMC
    mpStatusTimer->expires_from_now(rHeartBeatMs);
    mpStatusTimer->async_wait(boost::bind(
        &CADaemon::heartBeatTimer,
        this, rHeartBeatMs));
    // send heartbeats to all remote listeners
    for (const auto& nextEndPoint : mRemoteEndPointInfo) {
        // if this is
        mpSenderSocket->async_send_to(
            buffer(mpStatusMessage.get(), 
                sizeof(MemberSystemStatusMessage)),
            *nextEndPoint,
            boost::bind(&CADaemon::handle_send, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }
}

The send handler callback is

void
CADaemon::handle_send(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
{
    static auto& gEvtLog = gpLogger->getLoggerRef(
        Logger::LogDest::EventLog);
    if (!error || (error == boost::asio::error::message_size)) {
        // Critical Section - exclusive write
        boost::unique_lock<boost::shared_mutex> uniqueLock(gRWMutexGuard);
        if (bytes_transferred == sizeof(MemberSystemStatusMessage)) {
            LOG_EVT_INFO(gEvtLog) << *mpStatusMessage;
            mpStatusMessage->incrementSequenceCounter();
            mpStatusMessage->setFaultReport(boost::endian::native_to_big(mFaultReport));
        } else if (bytes_transferred == sizeof(CAServiceHeartbeatMessage)) {
            LOG_EVT_INFO(gEvtLog) << *mpHeartbeatMessage;
        } else {
            LOG_EVT_ERROR(gEvtLog)
                << "unexpected datagram sent: size ["
                << bytes_transferred << "] bytes";
        }
    } else {
        LOG_EVT_ERROR(gEvtLog) << "handle_send: asio error code["
            << error.value() << "]";
    }
}

and the receive callback is as follows

void
CADaemon::handle_receive(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
{
    static auto& gEvtLog = gpLogger->getLoggerRef(
        Logger::LogDest::EventLog);
    if (!error || (error == boost::asio::error::message_size)) {
        try {
            try {
           . . .
            } catch (const std::out_of_range& rEx) {
                LOG_EVT_INFO(gEvtLog) << rEx.what();
            }
        } catch (const std::exception& rEx) {
            LOG_EVT_ERROR(gEvtLog) << rEx.what();
        }
    } else {
        LOG_EVT_ERROR(gEvtLog) << "asio error code[" << error.value() << "]";
    }
    // keep the handlers busy
    start_receive();
}

Aucun commentaire:

Enregistrer un commentaire