MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
UDPBackend.cpp
Go to the documentation of this file.
1#include "UDPBackend.hpp"
2
4
5namespace MayaFlux::Core {
6
7UDPBackend::UDPBackend(const UDPBackendInfo& config, asio::io_context& context)
8 : m_config(config)
9 , m_context(context)
10{
11}
12
17
18// ─────────────────────────────────────────────────────────────────────────────
19// Lifecycle
20// ─────────────────────────────────────────────────────────────────────────────
21
23{
24 if (m_initialized.load()) {
25 return true;
26 }
27
28 m_initialized.store(true);
29
31 "UDP backend initialized");
32
33 return true;
34}
35
37{
38 if (m_running.load()) {
39 return;
40 }
41
42 m_running.store(true);
43
45 "UDP backend started");
46}
47
49{
50 if (!m_running.load()) {
51 return;
52 }
53
54 m_running.store(false);
55
57 "UDP backend stopped");
58}
59
61{
62 stop();
63
64 {
65 std::unique_lock lock(m_endpoints_mutex);
66 m_endpoints.clear();
67 }
68
69 {
70 std::unique_lock lock(m_sockets_mutex);
71 for (auto& [port, state] : m_sockets) {
72 asio::error_code ec;
73 if (state->socket.close(ec)) {
75 "Error closing UDP socket on port {}: {}", port, ec.message());
76 }
77 }
78 m_sockets.clear();
79 }
80
81 m_initialized.store(false);
82
84 "UDP backend shutdown");
85}
86
87// ─────────────────────────────────────────────────────────────────────────────
88// Endpoint management
89// ─────────────────────────────────────────────────────────────────────────────
90
92{
93 uint16_t local_port = info.local_port;
94
95 if (local_port == 0 && info.role == EndpointRole::SEND) {
96 local_port = 0;
97 } else if (local_port == 0) {
98 local_port = m_config.default_receive_port;
99 }
100
101 auto* sock = acquire_socket(local_port);
102 if (!sock) {
103 return 0;
104 }
105
106 EndpointRecord record;
107 record.info = info;
108 record.socket_state = sock;
109
110 if (!info.remote_address.empty() && info.remote_port > 0) {
111 asio::error_code ec;
112 auto addr = asio::ip::make_address(info.remote_address, ec);
113 if (ec) {
115 "Invalid remote address '{}': {}", info.remote_address, ec.message());
116 release_socket(local_port);
117 return 0;
118 }
119 record.default_remote = asio::ip::udp::endpoint(addr, info.remote_port);
120 record.has_default_remote = true;
121 }
122
124
125 {
126 std::unique_lock lock(m_endpoints_mutex);
127 m_endpoints[info.id] = std::move(record);
128 }
129
131
133 "UDP endpoint {} opened (local:{}, remote:{}:{})",
134 info.id, local_port, info.remote_address, info.remote_port);
135
136 return info.id;
137}
138
139void UDPBackend::close_endpoint(uint64_t endpoint_id)
140{
141 std::unique_lock lock(m_endpoints_mutex);
142 auto it = m_endpoints.find(endpoint_id);
143 if (it == m_endpoints.end()) {
144 return;
145 }
146
147 uint16_t local_port = it->second.socket_state->local_port;
148
149 EndpointState prev = it->second.info.state;
150 it->second.info.state = EndpointState::CLOSED;
151
152 m_endpoints.erase(it);
153 lock.unlock();
154
155 release_socket(local_port);
156
158 "UDP endpoint {} closed", endpoint_id);
159}
160
162{
163 std::shared_lock lock(m_endpoints_mutex);
164 auto it = m_endpoints.find(endpoint_id);
165 if (it == m_endpoints.end()) {
167 }
168 return it->second.info.state;
169}
170
171std::vector<EndpointInfo> UDPBackend::get_endpoints() const
172{
173 std::shared_lock lock(m_endpoints_mutex);
174 std::vector<EndpointInfo> result;
175 result.reserve(m_endpoints.size());
176 for (const auto& [id, record] : m_endpoints) {
177 result.push_back(record.info);
178 }
179 return result;
180}
181
182// ─────────────────────────────────────────────────────────────────────────────
183// Data transfer
184// ─────────────────────────────────────────────────────────────────────────────
185
186bool UDPBackend::send(uint64_t endpoint_id, const uint8_t* data, size_t size)
187{
188 std::shared_lock lock(m_endpoints_mutex);
189 auto it = m_endpoints.find(endpoint_id);
190 if (it == m_endpoints.end()) {
191 return false;
192 }
193
194 auto& record = it->second;
195 if (!record.has_default_remote) {
197 "UDP endpoint {} has no default remote target", endpoint_id);
198 return false;
199 }
200
201 auto buf = std::make_shared<std::vector<uint8_t>>(data, data + size);
202 auto remote = record.default_remote;
203 auto& socket = record.socket_state->socket;
204
205 socket.async_send_to(
206 asio::buffer(*buf), remote,
207 [buf, endpoint_id](const asio::error_code& ec, size_t /*bytes_sent*/) {
208 if (ec) {
210 "UDP send failed on endpoint {}: {}", endpoint_id, ec.message());
211 }
212 });
213
214 return true;
215}
216
217bool UDPBackend::send_to(uint64_t endpoint_id, const uint8_t* data, size_t size,
218 const std::string& address, uint16_t port)
219{
220 std::shared_lock lock(m_endpoints_mutex);
221 auto it = m_endpoints.find(endpoint_id);
222 if (it == m_endpoints.end()) {
223 return false;
224 }
225
226 asio::error_code ec;
227 auto addr = asio::ip::make_address(address, ec);
228 if (ec) {
230 "Invalid send_to address '{}': {}", address, ec.message());
231 return false;
232 }
233
234 asio::ip::udp::endpoint target(addr, port);
235 auto buf = std::make_shared<std::vector<uint8_t>>(data, data + size);
236 auto& socket = it->second.socket_state->socket;
237
238 socket.async_send_to(
239 asio::buffer(*buf), target,
240 [buf, endpoint_id](const asio::error_code& send_ec, size_t) {
241 if (send_ec) {
243 "UDP send_to failed on endpoint {}: {}", endpoint_id, send_ec.message());
244 }
245 });
246
247 return true;
248}
249
250// ─────────────────────────────────────────────────────────────────────────────
251// Callbacks
252// ─────────────────────────────────────────────────────────────────────────────
253
255{
256 m_receive_callback = std::move(callback);
257}
258
260{
261 m_state_callback = std::move(callback);
262}
263
264// ─────────────────────────────────────────────────────────────────────────────
265// Private: socket management
266// ─────────────────────────────────────────────────────────────────────────────
267
269{
270 std::unique_lock lock(m_sockets_mutex);
271
272 auto it = m_sockets.find(local_port);
273 if (it != m_sockets.end()) {
274 it->second->ref_count++;
275 return it->second.get();
276 }
277
278 auto state = std::make_unique<SocketState>(m_context);
279 state->local_port = local_port;
280 state->ref_count = 1;
281
282 asio::error_code ec;
283 if (state->socket.open(asio::ip::udp::v4(), ec)) {
285 "Failed to open UDP socket: {}", ec.message());
286 return nullptr;
287 }
288
289 if (state->socket.set_option(asio::socket_base::reuse_address(true), ec)) {
291 "Failed to set SO_REUSEADDR on UDP socket: {}", ec.message());
292 return nullptr;
293 }
294
295 if (local_port > 0) {
296 if (
297 state->socket.bind(
298 asio::ip::udp::endpoint(asio::ip::udp::v4(), local_port), ec)) {
300 "Failed to bind UDP socket to port {}: {}", local_port, ec.message());
301 return nullptr;
302 }
303 }
304
306 if (state->socket.set_option(
307 asio::socket_base::receive_buffer_size(
308 static_cast<int>(m_config.receive_buffer_size)),
309 ec)) {
311 "Failed to set receive buffer size on UDP socket: {}", ec.message());
312 }
313 }
314
315 auto* raw = state.get();
316 m_sockets[local_port] = std::move(state);
317
318 start_receive_loop(*raw);
319
321 "UDP socket bound to port {}", local_port);
322
323 return raw;
324}
325
326void UDPBackend::release_socket(uint16_t local_port)
327{
328 std::unique_lock lock(m_sockets_mutex);
329 auto it = m_sockets.find(local_port);
330 if (it == m_sockets.end()) {
331 return;
332 }
333
334 it->second->ref_count--;
335 if (it->second->ref_count == 0) {
336 asio::error_code ec;
337
338 if (it->second->socket.close(ec)) {
340 "Error closing UDP socket on port {}: {}", local_port, ec.message());
341 }
342 m_sockets.erase(it);
343
345 "UDP socket on port {} released", local_port);
346 }
347}
348
349// ─────────────────────────────────────────────────────────────────────────────
350// Private: async receive loop
351// ─────────────────────────────────────────────────────────────────────────────
352
354{
355 state.socket.async_receive_from(
356 asio::buffer(state.recv_buffer),
357 state.sender_endpoint,
358 [this, &state](const asio::error_code& ec, size_t bytes_received) {
359 on_receive(state, ec, bytes_received);
360 });
361}
362
363void UDPBackend::on_receive(SocketState& state, const asio::error_code& ec, size_t bytes_received)
364{
365 if (ec) {
366 if (ec == asio::error::operation_aborted) {
367 return;
368 }
370 "UDP receive error on port {}: {}", state.local_port, ec.message());
371 start_receive_loop(state);
372 return;
373 }
374
375 if (m_receive_callback && bytes_received > 0) {
376 std::string sender_str = state.sender_endpoint.address().to_string()
377 + ":" + std::to_string(state.sender_endpoint.port());
378
379 std::shared_lock lock(m_endpoints_mutex);
380 for (const auto& [id, record] : m_endpoints) {
381 if (record.socket_state->local_port == state.local_port
382 && record.info.role != EndpointRole::SEND) {
383 m_receive_callback(id, state.recv_buffer.data(), bytes_received, sender_str);
384 }
385 }
386 }
387
388 start_receive_loop(state);
389}
390
392 uint16_t local_port, const asio::ip::udp::endpoint& sender) const
393{
394 std::shared_lock lock(m_endpoints_mutex);
395
396 uint64_t fallback_id = 0;
397
398 for (const auto& [id, record] : m_endpoints) {
399 if (record.socket_state->local_port != local_port) {
400 continue;
401 }
402
403 if (record.has_default_remote
404 && record.default_remote.address() == sender.address()
405 && record.default_remote.port() == sender.port()) {
406 return id;
407 }
408
409 if (fallback_id == 0
410 && (record.info.role == EndpointRole::RECEIVE
411 || record.info.role == EndpointRole::BIDIRECTIONAL)) {
412 fallback_id = id;
413 }
414 }
415
416 return fallback_id;
417}
418
420{
421 EndpointState prev = record.info.state;
422 record.info.state = new_state;
423
424 if (m_state_callback && prev != new_state) {
425 m_state_callback(record.info, prev, new_state);
426 }
427}
428
429} // namespace MayaFlux::Core
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
Range size
uint32_t id
void transition_state(EndpointRecord &record, EndpointState new_state)
void on_receive(SocketState &state, const asio::error_code &ec, size_t bytes_received)
Completion handler for async_receive_from.
std::atomic< bool > m_running
void close_endpoint(uint64_t endpoint_id) override
Close an endpoint and release its resources.
bool send(uint64_t endpoint_id, const uint8_t *data, size_t size) override
Send data through an endpoint.
std::unordered_map< uint16_t, std::unique_ptr< SocketState > > m_sockets
void set_receive_callback(NetworkReceiveCallback callback) override
Register the receive callback.
void set_state_callback(EndpointStateCallback callback) override
Register the endpoint state change callback.
NetworkReceiveCallback m_receive_callback
void release_socket(uint16_t local_port)
Decrement ref_count for a socket.
EndpointState get_endpoint_state(uint64_t endpoint_id) const override
Query the current state of an endpoint.
SocketState * acquire_socket(uint16_t local_port)
Get or create a socket bound to local_port.
void start() override
Start receive threads and accept connections.
void stop() override
Stop receive threads without releasing resources.
void shutdown() override
Release all resources, close all endpoints.
uint64_t open_endpoint(const EndpointInfo &info) override
Open a new endpoint.
std::atomic< bool > m_initialized
uint64_t resolve_endpoint_for_sender(uint16_t local_port, const asio::ip::udp::endpoint &sender) const
Resolve which endpoint id a received datagram belongs to.
std::vector< EndpointInfo > get_endpoints() const override
List all endpoints currently managed by this backend.
std::unordered_map< uint64_t, EndpointRecord > m_endpoints
bool initialize() override
Initialise backend resources (sockets, SHM segments, etc.)
UDPBackend(const UDPBackendInfo &config, asio::io_context &context)
Construct with config and a reference to the shared io_context.
Definition UDPBackend.cpp:7
EndpointStateCallback m_state_callback
void start_receive_loop(SocketState &state)
Post the first async_receive_from for a newly bound socket.
bool send_to(uint64_t endpoint_id, const uint8_t *data, size_t size, const std::string &address, uint16_t port) override
Send data to a specific address through an endpoint.
asio::io_context & m_context
std::shared_mutex m_sockets_mutex
std::shared_mutex m_endpoints_mutex
EndpointState
Observable connection state for an endpoint.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
std::function< void(const EndpointInfo &info, EndpointState previous, EndpointState current)> EndpointStateCallback
Callback signature for endpoint state changes.
@ Shutdown
Engine/subsystem shutdown and cleanup.
@ NetworkBackend
Network transport backend (UDP, TCP, SHM)
@ Init
Engine/subsystem initialization.
@ Core
Core engine, backend, subsystems.
Describes one logical send/receive endpoint managed by a backend.
Configuration for the UDP transport backend.
asio::ip::udp::endpoint sender_endpoint
std::array< uint8_t, 65536 > recv_buffer
Per-bound-port socket state.