31 "UDP backend initialized");
45 "UDP backend started");
57 "UDP backend stopped");
73 if (state->socket.close(ec)) {
75 "Error closing UDP socket on port {}: {}", port, ec.message());
84 "UDP backend shutdown");
97 }
else if (local_port == 0) {
115 "Invalid remote address '{}': {}", info.
remote_address, ec.message());
133 "UDP endpoint {} opened (local:{}, remote:{}:{})",
147 uint16_t local_port = it->second.socket_state->local_port;
158 "UDP endpoint {} closed", endpoint_id);
168 return it->second.info.state;
174 std::vector<EndpointInfo> result;
177 result.push_back(record.info);
194 auto& record = it->second;
195 if (!record.has_default_remote) {
197 "UDP endpoint {} has no default remote target", endpoint_id);
201 auto buf = std::make_shared<std::vector<uint8_t>>(data, data +
size);
202 auto remote = record.default_remote;
203 auto& socket = record.socket_state->socket;
205 socket.async_send_to(
206 asio::buffer(*buf), remote,
207 [buf, endpoint_id](
const asio::error_code& ec,
size_t ) {
210 "UDP send failed on endpoint {}: {}", endpoint_id, ec.message());
218 const std::string& address, uint16_t port)
227 auto addr = asio::ip::make_address(address, ec);
230 "Invalid send_to address '{}': {}", address, ec.message());
234 asio::ip::udp::endpoint target(addr, port);
235 auto buf = std::make_shared<std::vector<uint8_t>>(data, data +
size);
236 auto& socket = it->second.socket_state->socket;
238 socket.async_send_to(
239 asio::buffer(*buf), target,
240 [buf, endpoint_id](
const asio::error_code& send_ec, size_t) {
243 "UDP send_to failed on endpoint {}: {}", endpoint_id, send_ec.message());
274 it->second->ref_count++;
275 return it->second.get();
278 auto state = std::make_unique<SocketState>(
m_context);
279 state->local_port = local_port;
280 state->ref_count = 1;
283 if (state->socket.open(asio::ip::udp::v4(), ec)) {
285 "Failed to open UDP socket: {}", ec.message());
289 if (state->socket.set_option(asio::socket_base::reuse_address(
true), ec)) {
291 "Failed to set SO_REUSEADDR on UDP socket: {}", ec.message());
295 if (local_port > 0) {
298 asio::ip::udp::endpoint(asio::ip::udp::v4(), local_port), ec)) {
300 "Failed to bind UDP socket to port {}: {}", local_port, ec.message());
306 if (state->socket.set_option(
307 asio::socket_base::receive_buffer_size(
311 "Failed to set receive buffer size on UDP socket: {}", ec.message());
315 auto* raw = state.get();
316 m_sockets[local_port] = std::move(state);
321 "UDP socket bound to port {}", local_port);
334 it->second->ref_count--;
335 if (it->second->ref_count == 0) {
338 if (it->second->socket.close(ec)) {
340 "Error closing UDP socket on port {}: {}", local_port, ec.message());
345 "UDP socket on port {} released", local_port);
355 state.
socket.async_receive_from(
358 [
this, &state](
const asio::error_code& ec,
size_t bytes_received) {
359 on_receive(state, ec, bytes_received);
366 if (ec == asio::error::operation_aborted) {
370 "UDP receive error on port {}: {}", state.
local_port, ec.message());
381 if (record.socket_state->local_port == state.
local_port
392 uint16_t local_port,
const asio::ip::udp::endpoint& sender)
const
396 uint64_t fallback_id = 0;
399 if (record.socket_state->local_port != local_port) {
403 if (record.has_default_remote
404 && record.default_remote.address() == sender.address()
405 && record.default_remote.port() == sender.port()) {
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
void transition_state(EndpointRecord &record, EndpointState new_state)
void on_receive(SocketState &state, const asio::error_code &ec, size_t bytes_received)
Completion handler for async_receive_from.
std::atomic< bool > m_running
void close_endpoint(uint64_t endpoint_id) override
Close an endpoint and release its resources.
bool send(uint64_t endpoint_id, const uint8_t *data, size_t size) override
Send data through an endpoint.
std::unordered_map< uint16_t, std::unique_ptr< SocketState > > m_sockets
void set_receive_callback(NetworkReceiveCallback callback) override
Register the receive callback.
void set_state_callback(EndpointStateCallback callback) override
Register the endpoint state change callback.
NetworkReceiveCallback m_receive_callback
void release_socket(uint16_t local_port)
Decrement ref_count for a socket.
EndpointState get_endpoint_state(uint64_t endpoint_id) const override
Query the current state of an endpoint.
SocketState * acquire_socket(uint16_t local_port)
Get or create a socket bound to local_port.
void start() override
Start receive threads and accept connections.
void stop() override
Stop receive threads without releasing resources.
void shutdown() override
Release all resources, close all endpoints.
uint64_t open_endpoint(const EndpointInfo &info) override
Open a new endpoint.
std::atomic< bool > m_initialized
uint64_t resolve_endpoint_for_sender(uint16_t local_port, const asio::ip::udp::endpoint &sender) const
Resolve which endpoint id a received datagram belongs to.
std::vector< EndpointInfo > get_endpoints() const override
List all endpoints currently managed by this backend.
std::unordered_map< uint64_t, EndpointRecord > m_endpoints
bool initialize() override
Initialise backend resources (sockets, SHM segments, etc.)
UDPBackend(const UDPBackendInfo &config, asio::io_context &context)
Construct with config and a reference to the shared io_context.
EndpointStateCallback m_state_callback
void start_receive_loop(SocketState &state)
Post the first async_receive_from for a newly bound socket.
bool send_to(uint64_t endpoint_id, const uint8_t *data, size_t size, const std::string &address, uint16_t port) override
Send data to a specific address through an endpoint.
asio::io_context & m_context
std::shared_mutex m_sockets_mutex
std::shared_mutex m_endpoints_mutex
EndpointState
Observable connection state for an endpoint.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
std::function< void(const EndpointInfo &info, EndpointState previous, EndpointState current)> EndpointStateCallback
Callback signature for endpoint state changes.
@ Shutdown
Engine/subsystem shutdown and cleanup.
@ NetworkBackend
Network transport backend (UDP, TCP, SHM)
@ Init
Engine/subsystem initialization.
@ Core
Core engine, backend, subsystems.
std::string remote_address
Describes one logical send/receive endpoint managed by a backend.
size_t receive_buffer_size
uint16_t default_receive_port
Configuration for the UDP transport backend.
asio::ip::udp::endpoint default_remote
SocketState * socket_state
asio::ip::udp::endpoint sender_endpoint
std::array< uint8_t, 65536 > recv_buffer
asio::ip::udp::socket socket
Per-bound-port socket state.