I am having some issues putting together a boost based async UDP networking daemon application. At its core, my application is timer driven using a boost timer callback function which is shown below. The behavior of this callback transmits a datagram stored in a smart pointer member mpStatusMessage
to multiple predefined endpoints stored in a vector of std::unique_ptr
endpoints mRemoteEndPointInfo
which are running remotely as UDP listeners. These remote listeners are also send asynchronous heartbeat UDP datagrams back to my application application which I have setup to run as a daemon.
The server part of my application needs to setup multiple UDP listening sockets to handle the incoming traffic that can arrive at any time. I am having trouble with setting up the fields below and I would like to know if there is a similar example to perform what I am trying to do. I would like to avoid creating separate threads if that at all possible as I think boost asio should be able to handle this easily enough using its IO_Service object, anyway the heart beats of my application are only at 1HZ.
std::shared_ptr<boost::asio::io_service> mpIOService;
std::unique_ptr<boost::asio::ip::udp::socket> mpSenderSocket;
std::vector<std::unique_ptr<boost::asio::ip::udp::endpoint>> mRemoteEndPointInfo;
std::vector<std::unique_ptr<boost::asio::ip::udp::socket>> mServerSocketInfo;
std::unique_ptr<boost::asio::ip::udp::endpoint> mpServerEndpoint;
std::unique_ptr<boost::asio::steady_timer> mpStatusTimer;
std::unique_ptr<uint8_t[]> mpReceiveBuffer;
I think I only need a single mpSenderSocket
to communicate with the mRemoteEndPointInfo
but I am not sure about this.
The constructor code initializes the various message datagram smart pointers - these are basically structs stored as std::unique_ptr<T>
s
CADaemon::CADaemon(
const CAFileInfo& rCAServiceRows,
const CFGParameterInfo& rCFGParameterInfo,
std::shared_ptr<boost::asio::io_service> pIOService)
: ca::CAService::Service{}
, mpStatusMessage{}
, mpHeartbeatMessage{}
, mpOpSupportMessage{}
, mpOTPMessage{}
, mpIOService(pIOService)
, mRemoteEndPointInfo{}
, mServerSocketInfo{}
, mpServerEndpoint{}
, mpStatusTimer{}
, mpReceiveBuffer(std::make_unique<uint8_t[]>(512))
. . .
{
. . .
// range represents lines of networking config info
// iterate over the range allocating socket listeners
for (const auto& rNext : range) {
auto serviceIter = rNext.find(CAParameter::ServiceName);
if (serviceIter != rNext.cend()) {
auto rxIter = . . .
std::smatch match;
if (std::regex_match(. . .) {
// update the remove endpoints (where we send messages)
auto port = static_cast<uint16_t>(std::stoi(match[5]));
mRemoteEndPointInfo.emplace_back(std::make_unique<
udp::endpoint>(address::from_string(match[1]), port));
}
auto txIter = . . .
if (std::regex_match(. . .) {
// allocate sockets to listen for incoming messages on these ports
auto port = static_cast<uint16_t>(std::stoi(match[5]));
mServerSocketInfo.emplace_back(std::make_unique<udp::socket>(
*mpIOService, udp::endpoint(udp::v4(), port)));
}
}
}
// open a single sender socket
if (!mRemoteEndPointInfo.empty()) {
mpSenderSocket = std::make_unique<udp::socket>(*mpIOService);
mpSenderSocket->open(boost::asio::ip::udp::v4());
}
}
The start code is as follows:
void
CADaemon::start()
{
mpServerEndpoint = std::make_unique<boost::asio::ip::udp::endpoint>();
mpStatusTimer = std::make_unique<steady_timer>(*mpIOService, milliseconds(1000));
mpStatusTimer->async_wait(boost::bind(&CADaemon::heartBeatTimer, this, milliseconds(1000)));
// kick off the boost::asio proactor
start_receive();
// launch gRPC RPC service daemon as a stand alone detached thread
std::thread{ &CADaemon::grpcService, this, "0.0.0.0:50051" }.detach();
}
And
/** initiate the receive operation */
void
CADaemon::start_receive()
{
// initiate multiple asynchronous read operations
for (auto& next : mServerSocketInfo) {
next->async_receive_from(
boost::asio::buffer(mpReceiveBuffer.get(), 512),
*mpServerEndpoint,
boost::bind(&CADaemon::handle_receive, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
The core of the application is the following heartbeat callback
void
CADaemon::heartBeatTimer(
const milliseconds& rHeartBeatMs)
{
static auto& gEvtLog = gpLogger->getLoggerRef(
Logger::LogDest::EventLog);
// changed from absolute expires_at (previous + rHeartBeatMs) to
// expires_from_now to account for background change in the system
// time from the CMC
mpStatusTimer->expires_from_now(rHeartBeatMs);
mpStatusTimer->async_wait(boost::bind(
&CADaemon::heartBeatTimer,
this, rHeartBeatMs));
// send heartbeats to all remote listeners
for (const auto& nextEndPoint : mRemoteEndPointInfo) {
// if this is
mpSenderSocket->async_send_to(
buffer(mpStatusMessage.get(),
sizeof(MemberSystemStatusMessage)),
*nextEndPoint,
boost::bind(&CADaemon::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
}
The send handler callback is
void
CADaemon::handle_send(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
static auto& gEvtLog = gpLogger->getLoggerRef(
Logger::LogDest::EventLog);
if (!error || (error == boost::asio::error::message_size)) {
// Critical Section - exclusive write
boost::unique_lock<boost::shared_mutex> uniqueLock(gRWMutexGuard);
if (bytes_transferred == sizeof(MemberSystemStatusMessage)) {
LOG_EVT_INFO(gEvtLog) << *mpStatusMessage;
mpStatusMessage->incrementSequenceCounter();
mpStatusMessage->setFaultReport(boost::endian::native_to_big(mFaultReport));
} else if (bytes_transferred == sizeof(CAServiceHeartbeatMessage)) {
LOG_EVT_INFO(gEvtLog) << *mpHeartbeatMessage;
} else {
LOG_EVT_ERROR(gEvtLog)
<< "unexpected datagram sent: size ["
<< bytes_transferred << "] bytes";
}
} else {
LOG_EVT_ERROR(gEvtLog) << "handle_send: asio error code["
<< error.value() << "]";
}
}
and the receive callback is as follows
void
CADaemon::handle_receive(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
static auto& gEvtLog = gpLogger->getLoggerRef(
Logger::LogDest::EventLog);
if (!error || (error == boost::asio::error::message_size)) {
try {
try {
. . .
} catch (const std::out_of_range& rEx) {
LOG_EVT_INFO(gEvtLog) << rEx.what();
}
} catch (const std::exception& rEx) {
LOG_EVT_ERROR(gEvtLog) << rEx.what();
}
} else {
LOG_EVT_ERROR(gEvtLog) << "asio error code[" << error.value() << "]";
}
// keep the handlers busy
start_receive();
}
Aucun commentaire:
Enregistrer un commentaire