vendredi 26 mars 2021

ASIO based UDP client/server to redirect messages to other clients

As the code below will undoubtedly indicate, I am new to this. Most of what is written is based on the work of DarkWanderer. My aim is to start a server, connect to clients, receive messages from clients and redirect the messages in the form of std::strings to the appropriate client. Each client produces data in a separate thread that adds messages to local queue and then the main thread sends the message to the server who then redirects etc. The server is started before the clients, issues an initial message that tells the clients to start. Each client then uses that message to start the local thread. After a set time, the server provides a stop message that is sent to all clients.

My problem is that this is not happening exactly as I would like.

In Case A: server starts, clients connect (max number of clients, known beforehand) server sends start message to all clients, clients seem to start threads but only the client registered as client 1 actually gets to send messages to client 2 and client 2's messages are ignored.

In case B: server starts, clients connect but as each client is started each thread is started (i.e. server start instruction for thread is ignored) so that each local client queue is populated from the moment client is started. In this case messages are sent between clients and servers as expected.

Note that this is all being run on a single machine and I have just added arbitrary port numbers to the server and clients.

I apologise for having a long winded and weak question but I don't know enough about what is going on to ask this question more directly. My question is: Why does case B only happen when the threads are started when the client is started (that is, when the client initialisation and start function code black is ignored)?

I have provided the code below. It is mainly c++11 but includes c++14 for some use of smart pointers. The code consists of header files only, except for the main files.

First the main files:

// MAIN FILE FOR EXAMPLE CLIENT
#include "mfactory.hpp"
#include "global.hpp"

using namespace std::chrono_literals;

std::mutex smtx;
std::condition_variable scv;
bool sready = false;

std::atomic<bool> stop(false);
std::atomic<bool> start(false);
std::atomic<int> tint(1);
std::queue<std::string> localq;

void testf() {
    std::string sendMessage;
    while(!stop & start) {
        sendMessage = "uiph0000"+std::to_string(tint);
        std::this_thread::sleep_for(2s);
        std::cout << "In Thread: " << sendMessage << std::endl;
        {
            std::lock_guard<std::mutex> slk(smtx);
            localq.push(sendMessage);
            sready = true;
        }
        ++tint;
    }
}

#### This client is called 'ui', rather than add a second block of code, substitute 'ph' for
#### 'ui' in this code. 
static std::unique_ptr<Client> CreateClient()
{
    return Factory::CreateClient("localhost", 12345, "uisv0000i", 11000);
};

int main(int argc, char *argv[]) {
    std::string sstart;
    bool server_up = false;
    if (argc == 2) {
        std::stringstream ss(argv[1]);
        ss >> sstart;
        if (sstart == "s") { server_up = true; start = true; }
    }
    auto client = CreateClient();
    
    // Wait for connection from server to be established.
    std::string initMessage;
    std::string initSender;
    ####IGNORED####
    while(!server_up) {
        if (client->HasIncoming()) {
            initMessage = client->PopIncoming();
            initSender = initMessage.substr(0,2);
            //std::cout << "Initmessage Sender is: " << initSender << std::endl;
            if (initSender == "sv") {
                server_up = true;
                std::cout << "Server is: " << server_up << std::endl;
            }
        }
    }

    // Start Thread
    std::thread test(testf);
    
    // Implementation of messages loop is cluster dependent
    // Check for messages
    std::string recvdMessage;
    std::string sendMessage;
    while(!stop & server_up) {
        if (client->HasIncoming()) {
            recvdMessage = client->PopIncoming();
            std::cout << "Received Message is: " << recvdMessage << std::endl;
            if(recvdMessage.compare(8,1, "s") == 0) { start = true; } //####IGNORED####
            if(recvdMessage.compare(8,1, "q") == 0) { stop = true; break; }
            else {
                //Do something with incoming messages
                //In separate thread?
            }
        }
        while(!localq.empty()) {
            {
                std::unique_lock<std::mutex> slk(smtx);
                scv.wait(slk, []{return sready;});
                sendMessage = localq.front();
                localq.pop();
                client->Send(sendMessage);
                //client->PushOutgoing(sendMessage);
                slk.unlock();
                scv.notify_one();
            }   
            // Create Messages in other threads and add these to the queue
        } //else { std::this_thread::sleep_for(1s); }
        //client->SendFromQueue();
        sready = false;
    }
    test.join();
}
// MAIN FILE FOR SERVER
#include "global.hpp"
#define NDEBUG
#include <cassert>

#include "mfactory.hpp"

using namespace std::chrono_literals;
std::atomic<bool> stop(false);

static std::unique_ptr<Server> CreateServer()
{
    return Factory::CreateServer(12345);
};

int main(int argc, char *argv[]) {
    int input_duration;
    if (argc < 2) {
        input_duration = 30;
    }
    else {
        std::stringstream ss(argv[1]);
        ss >> input_duration;
    }

    auto server = CreateServer();

    // Get all the clients (for now it is 2)
    // Associate a name with a clientID
    std::map<std::string, uint32_t> ClientByName;
    size_t maxclients = 2;
    while(ClientByName.size() < maxclients) {
        std::string initserver("svaa0000i");
        if(server->HasIncoming()) {
            auto recvdMessage = server->PopIncoming();
            std::string inMessage = recvdMessage.first;
            uint32_t ClientID = recvdMessage.second;
            std::string ClientName = inMessage.substr(0,2);
            std::cout << "ClientName is: " << ClientName << ", ClientID is: " << ClientID << std::endl;
            initserver.replace(2,2, ClientName);
            ClientByName.emplace(ClientName, ClientID);
            server->SendToClient(initserver, ClientID);
        }
    }

    //Run the server for fixed time and send signal to clients
    auto now = std::chrono::steady_clock::now;
    auto duration = std::chrono::seconds(input_duration);
    auto start = now();
    std::string outMessage;
    std::string startMessage = "svaa0000s";
    server->SendToAll(startMessage);
    std::this_thread::sleep_for(1s);
    std::string sender;
    std::string destination;
    while (!stop && (now() - start) < duration) {
        std::this_thread::sleep_for(1s);
        if(server->HasIncoming()) {
            std::cout << "Incoming Count: " << server->GetIncomingCount() << std::endl;
            while(server->GetIncomingCount() > 0) {
                auto incomingMessage = server->PopIncoming();
                std::string recvdMessage = incomingMessage.first;
                std::cout << "Received Message is: " << recvdMessage << std::endl;
                sender = recvdMessage.substr(0,2);
                destination = recvdMessage.substr(2,2);
                uint32_t destClientID = ClientByName[destination];
                std::cout << "Sender is: " << sender << ", DestClientID is: " << destClientID << std::endl;
                outMessage = recvdMessage.substr(4, std::string::npos);
                outMessage = sender+destination+outMessage;
                server->AddToQueue(outMessage, destClientID);
            }
        }
        if(server->HasOutgoing()) {
            server->SendFromQueue();
        }
    }
    std::string endMessage = "svaa0000q";
    server->SendToAll(endMessage);
    return 0;
}

Then the header files:

#pragma once
#include "mclient.hpp"
#include "mserver.hpp"
#include "global.hpp"

class Factory
{
public:
    static std::unique_ptr<Client> CreateClient(std::string host, unsigned int server_port, const std::string &initmessage, 
            unsigned int client_port);
    static std::unique_ptr<Server> CreateServer(unsigned int port);
};

std::unique_ptr<Client> Factory::CreateClient(std::string host, unsigned int server_port, const std::string &initmessage, 
        unsigned int client_port)
{
    auto client = new Client(host, server_port, initmessage, client_port);
    return std::unique_ptr<Client>(client);
}

std::unique_ptr<Server> Factory::CreateServer(unsigned int port)
{
    auto server = new Server(port);
    return std::unique_ptr<Server>(server);
}
#pragma once
#include "global.hpp"
#include "mqueue.hpp"
#include "mstat.hpp"

using asio::ip::udp;

typedef std::pair<std::string, uint32_t> ClientMessage;
typedef std::map<uint32_t, udp::endpoint> ClientList;
typedef ClientList::value_type LClient;

typedef std::pair<std::string, uint32_t> ServerMessage;
typedef std::map<uint32_t, udp::endpoint> ServerList;
typedef ServerList::value_type LServer;

class Server {
public:
    Server(unsigned short local_port);
    ~Server();

    bool HasIncoming();
    bool HasOutgoing();
    int GetIncomingCount();
    int GetOutgoingCount();
    ClientMessage PopIncoming();
    ServerMessage PopOutgoing();

    void SendToClient(const std::string& message, uint32_t clientID);
    void SendToAllExcept(const std::string& message, uint32_t clientID);
    void SendToAll(const std::string& message);
    void AddToQueue(const std::string& message, uint32_t clientID);
    void SendFromQueue();

    size_t GetClientCount();
    uint32_t GetClientIdByIndex(size_t index);

    //const Statistics& GetStatistics() const { return statistics; };
    std::vector<std::function<void(uint32_t)>> clientDisconnectedHandlers;
private:
    // Network send/receive stuff
    asio::io_context io_context;
    udp::socket socket;
    udp::endpoint server_endpoint;
    udp::endpoint remote_endpoint;
    std::array<char, NETBUFF> recv_buffer;
    std::thread service_thread;

    // Low-level network functions
    void start_receive();
    void handle_remote_error(const std::error_code error_code, const udp::endpoint remote_endpoint);
    void handle_receive(const std::error_code& error, std::size_t bytes_transferred);
    void handle_send(std::string /*message*/, const std::error_code& /*error*/, std::size_t /*bytes_transferred*/)  {}
    void run_service();

    // Client management
    int32_t get_or_create_client_id(udp::endpoint endpoint);
    void on_client_disconnected(int32_t id);

    void Send(const std::string& message, udp::endpoint target);

    // Incoming messages queue
    LockedQueue<ClientMessage> incomingMessages;
    LockedQueue<ServerMessage> outgoingMessages;

    // Clients of the server
    ClientList clients;
    std::atomic_int32_t nextClientID;

    Server(Server&); // block default copy constructor

    // Statistics
    //Statistics statistics;
};
// Constructor
Server::Server(unsigned short local_port) :
    socket(io_context, udp::endpoint(udp::v4(), local_port)),
    service_thread(&Server::run_service, this),
    nextClientID(0L)
{
    std::cout << "Starting server on port " << local_port << std::endl;
};

void Server::run_service()
{
    start_receive();
    while (!io_context.stopped()){
        try {
            io_context.run();
        }
        catch (const std::exception& e) {
            std::cout << "Server: Network exception: " << e.what() << std::endl;
        }
        catch (...) {
            std::cout << "Server: Network exception: unknown" << std::endl;
        }
    }
    std::cout << "Server network thread stopped" << std::endl;
};

void Server::start_receive()
{
    socket.async_receive_from(asio::buffer(recv_buffer), remote_endpoint,
        [this](std::error_code ec, std::size_t bytes_recvd){ this->handle_receive(ec, bytes_recvd); });
}

void Server::handle_receive(const std::error_code& error, std::size_t bytes_transferred)
{
    if (!error)
    {
        try {
            std::string inbuff(recv_buffer.data(), recv_buffer.data() + bytes_transferred);
            auto message = ClientMessage(inbuff, get_or_create_client_id(remote_endpoint));
            if (!message.first.empty()) {
                incomingMessages.push(message);
            }
            //statistics.RegisterReceivedMessage(bytes_transferred);
        }
        catch (std::exception ex) {
            std::cout << "handle_receive: Error parsing incoming message:" << ex.what();
        }
        catch (...) {
            std::cout << "handle_receive: Unknown error while parsing incoming message" << std::endl;
        }
    }
    else
    {
        std::cout << "handle_receive: error: " << error.message() << " while receiving from address " << remote_endpoint;
        handle_remote_error(error, remote_endpoint);
    }

    start_receive();
}

int32_t Server::get_or_create_client_id(udp::endpoint endpoint)
{
    for (const auto& client : clients) {
        if (client.second == endpoint) {
            return client.first;
        }
    }
    auto id = ++nextClientID;
    clients.insert(LClient(id, endpoint));
    return id;
};

void Server::handle_remote_error(const std::error_code error_code, const udp::endpoint remote_endpoint)
{
    bool found = false;
    int32_t id;
    for (const auto& client : clients) {
        if (client.second == remote_endpoint) {
            found = true;
            id = client.first;
            break;
        }
    }
    if (found == false) { return; }

    clients.erase(id);
    on_client_disconnected(id);
}

void Server::on_client_disconnected(int32_t id)
{
    for (auto& handler : clientDisconnectedHandlers) {
        if (handler) { handler(id); }
    }
}

Server::~Server()
{
    io_context.stop();
    service_thread.join();
}

void Server::Send(const std::string& message, udp::endpoint target_endpoint)
{
    socket.send_to(asio::buffer(message), target_endpoint);
}

void Server::AddToQueue(const std::string& message, uint32_t clientID)
{
    auto outmessage = ServerMessage(message, clientID);
    outgoingMessages.push(outmessage);
}

void Server::SendFromQueue()
{
    while(!outgoingMessages.empty()) {
        auto sendmessage = PopOutgoing();
        std::cout << "Send message " << sendmessage.first << " to " << sendmessage.second << std::endl;
        SendToClient(sendmessage.first, sendmessage.second);
    }
}

void Server::SendToClient(const std::string& message, uint32_t clientID)
{
    try {
        Send(message, clients.at(clientID));
    }
    catch (std::out_of_range) {
        std::cout << "Unknown client ID: " << clientID << std::endl;
    }
}

void Server::SendToAllExcept(const std::string& message, uint32_t clientID)
{
    for (auto client : clients) {
        if (client.first != clientID) {
            Send(message, client.second);
        }
    }
}

void Server::SendToAll(const std::string& message)
{
    for (auto client : clients) {
        Send(message, client.second);
    }
}

size_t Server::GetClientCount()
{
    return clients.size();
}

uint32_t Server::GetClientIdByIndex(size_t index)
{
    auto it = clients.begin();
    for (int i = 0; i < index; i++) {
        ++it;
    }
    return it->first;
}

ClientMessage Server::PopIncoming() {
    return incomingMessages.pop();
}

ServerMessage Server::PopOutgoing() {
    return outgoingMessages.pop();
}

bool Server::HasIncoming()
{
    return !incomingMessages.empty();
}

bool Server::HasOutgoing()
{
    return !outgoingMessages.empty();
}
int Server::GetIncomingCount()
{
    return incomingMessages.size();
}
int Server::GetOutgoingCount()
{
    return outgoingMessages.size();
}
#pragma once
#include "global.hpp"
#include "mqueue.hpp"
#include "mstat.hpp"

using asio::ip::udp;

class Client {
public:
    Client(std::string host, unsigned short server_port, const std::string &initmessage, unsigned short local_port = 0);
    ~Client();

    size_t Send(const std::string& message);
    bool HasIncoming();
    bool HasOutgoing();
    std::string PopIncoming();
    std::string PopOutgoing();
    void PushOutgoing(std::string message);
    void SendFromQueue(); 
private:
    // Network send/receive stuff
    asio::io_context io_context;
    udp::socket socket;
    udp::endpoint server_endpoint;
    udp::endpoint client_endpoint;
    std::array<char, NETBUFF> recv_buffer;
    std::thread service_thread;

    // Queues for messages
    LockedQueue<std::string> incomingMessages;
    LockedQueue<std::string> outgoingMessages;

    void start_receive();
    void handle_receive(const std::error_code& error, std::size_t bytes_transferred);
    void run_service();

    Client(Client&); // block default copy constructor

    // Statistics
    //Statistics statistics;
};

Client::Client(std::string host, unsigned short server_port, const std::string &initmessage, unsigned short local_port) :
    socket(io_context, udp::endpoint(udp::v4(), local_port)),
    service_thread(&Client::run_service, this)
{
    udp::resolver resolver(io_context);
    udp::resolver::query query(udp::v4(), host, std::to_string(server_port));
    server_endpoint = *resolver.resolve(query);
    Client::Send(initmessage);
}

void Client::run_service()
{
    start_receive();
    while (!io_context.stopped()) {
        try {
            io_context.run();
        }
        catch (const std::exception& e) {
            std::cout << "Client: network exception: " << e.what();
        }
        catch (...) {
            std::cout << "Unknown exception in client network thread";
        }
    }
    std::cout << "Client network thread stopped" << std::endl;
}

void Client::start_receive()
{
    socket.async_receive_from(asio::buffer(recv_buffer), client_endpoint,
        [this](std::error_code ec, std::size_t bytes_recvd){ this->handle_receive(ec, bytes_recvd); });
}

void Client::handle_receive(const std::error_code& error, std::size_t bytes_transferred)
{
    if (!error)
    {   std::string inbuff(recv_buffer.data(), recv_buffer.data() + bytes_transferred);
        incomingMessages.push(inbuff);
        //statistics.RegisterReceivedMessage(bytes_transferred);
    }
    else
    {
        std::cout << "Client::handle_receive:" <<  error << std::endl;
    }

    start_receive();
}

size_t Client::Send(const std::string& message)
{
    size_t bytessent = socket.send_to(asio::buffer(message), server_endpoint);
    return bytessent;
    //statistics.RegisterSentMessage(message.size());
}

Client::~Client()
{
    io_context.stop();
    service_thread.join();
}

bool Client::HasIncoming()
{
    return !incomingMessages.empty();
}

bool Client::HasOutgoing()
{
    return !outgoingMessages.empty();
}

std::string Client::PopIncoming()
{
    if (incomingMessages.empty()) { throw std::logic_error("No messages to pop"); }
    std::string output = incomingMessages.pop();
    return output;
}

std::string Client::PopOutgoing()
{
    if (outgoingMessages.empty()) { throw std::logic_error("No messages to pop"); }
    std::string output = outgoingMessages.pop();
    return output;
}

void Client::PushOutgoing(std::string message)
{
    outgoingMessages.push(message);
}

void Client::SendFromQueue() 
{
    while(!outgoingMessages.empty()) {
        std::string sendmessage = PopOutgoing();
        int bytessent  = Send(sendmessage);
    }
}
#pragma once
#include "global.hpp"

// Simple mutex-guarded queue
template<typename mtype> class LockedQueue
{
private:
    mutable std::mutex mtx;
    std::queue<mtype> q;
public:
    LockedQueue () {};
    // Move initialization
    LockedQueue(LockedQueue&& other);
    // Move assignment
    LockedQueue<mtype>& operator = (LockedQueue&& other);
    // Copy initialization
    LockedQueue(const LockedQueue& other);
    // Copy assignment
    LockedQueue<mtype>& operator = (LockedQueue& other);
    void push(mtype value)
    {
        std::unique_lock<std::mutex> lock(mtx);
        q.push(value);
    };

    // Get top message in the queue
    // Note: not exception-safe (will lose the message)
    mtype pop()
    {
        std::unique_lock<std::mutex> lock(mtx);
        mtype value;
        std::swap(value, q.front());
        q.pop();
        return value;
    };

    bool empty() {
        std::unique_lock<std::mutex> lock(mtx);
        return q.empty();
    }

    int size() {
        std::unique_lock<std::mutex> lock(mtx);
        return q.size();
    }
};

// Move initialization
template<typename mtype> 
LockedQueue<mtype>::LockedQueue(LockedQueue<mtype>&& other)
{
    std::lock_guard<std::mutex> lock(other.mtx);
    q = std::move(other.q);
    std::queue<mtype> empty;
    std::swap(other.q, empty);
}
// Move assignment
template<typename mtype> 
LockedQueue<mtype>& LockedQueue<mtype>::operator = (LockedQueue<mtype>&& other) 
{
    std::lock(mtx, other.mtx);
    std::lock_guard<std::mutex> self_lock(mtx, std::adopt_lock);
    std::lock_guard<std::mutex> other_lock(other.mtx, std::adopt_lock);
    q = std::move(other.q);
    std::queue<mtype> empty;
    std::swap(other.q, empty);
    return *this;
}
// Copy initialization
template<typename mtype> 
LockedQueue<mtype>::LockedQueue(const LockedQueue<mtype>& other)
{
    std::lock_guard<std::mutex> lock(other.mtx);
    q = other.q;
}
// Copy assignment
template<typename mtype>
LockedQueue<mtype>& LockedQueue<mtype>::operator = (LockedQueue<mtype>& other)
{
    std::lock(mtx, other.mtx);
    std::lock_guard<std::mutex> self_lock(mtx, std::adopt_lock);
    std::lock_guard<std::mutex> other_lock(other.mtx, std::adopt_lock);
    q = other.q;
    return *this;
}

and finally the globals:

//SET THE NAMES OF ALL GLOBAL VARS HERE
#ifndef GLOBALS
#define GLOBALS
#define STATS_ENABLE_EIGEN_WRAPPERS
#define STATS_ENABLE_STDVEC_WRAPPERS

#include <iostream>
#include <iomanip>
#include <utility>
#include <limits>
#include <fstream>
#include <sstream>
#include <algorithm>
#include <memory>
#include <random>
#include <iterator>
#include <chrono>
#include <string>
#include <vector>
#include <array>
#include <deque>
#include <queue>
#include <unordered_map>
#include <map>

#include <thread>
#include <mutex>
#include <atomic>
#include <future>
#include <condition_variable>

#include "asio.hpp"

static const int NETBUFF = 1024;

To get case A: start server, start clients but do not add first (and only) argument s; to get case B, follow the same procedure as A but use "s" argument. In case B the code block marked IGNORED is ignored. The provided client code is called ui, rather than add a second block of code, substitute ph for ui in this code.

I know that this is probably not what is meant by mwe and again I apologise for this excessive code. I was torn between here and code review (which I sorely need) but there the requirement is for working code and here it is to pose a question about code that does not work as expected. Thanks in advance for your time.

Aucun commentaire:

Enregistrer un commentaire