MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
TCPBackend.cpp
Go to the documentation of this file.
1#include "TCPBackend.hpp"
2
3#include <asio/read.hpp>
4#include <asio/write.hpp>
5
7
8namespace MayaFlux::Core {
9
10TCPBackend::TCPBackend(const TCPBackendInfo& config, asio::io_context& context)
11 : m_config(config)
12 , m_context(context)
13{
14}
15
20
21// ─────────────────────────────────────────────────────────────────────────────
22// Lifecycle
23// ─────────────────────────────────────────────────────────────────────────────
24
26{
27 if (m_initialized.load()) {
28 return true;
29 }
30
31 m_initialized.store(true);
32
34 "TCP backend initialized");
35
36 return true;
37}
38
40{
41 if (m_running.load()) {
42 return;
43 }
44
45 m_running.store(true);
46
48 "TCP backend started");
49}
50
52{
53 if (!m_running.load()) {
54 return;
55 }
56
57 m_running.store(false);
58
59 {
60 std::shared_lock lock(m_connections_mutex);
61 for (auto& [id, conn] : m_connections) {
62 asio::error_code ec;
63
64 if (conn->socket.close(ec)) {
66 "Error closing TCP connection {}: {}", id, ec.message());
67 }
68
69 if (conn->reconnect_timer) {
70 conn->reconnect_timer->cancel();
71 }
72 }
73 }
74
75 {
76 std::shared_lock lock(m_listeners_mutex);
77 for (auto& [id, listener] : m_listeners) {
78 asio::error_code ec;
79 if (!listener->acceptor.close(ec)) {
81 "Error closing TCP listener {}: {}", id, ec.message());
82 }
83 }
84 }
85
87 "TCP backend stopped");
88}
89
91{
92 stop();
93
94 {
95 std::unique_lock lock(m_connections_mutex);
96 m_connections.clear();
97 }
98
99 {
100 std::unique_lock lock(m_listeners_mutex);
101 m_listeners.clear();
102 }
103
104 m_initialized.store(false);
105
107 "TCP backend shutdown");
108}
109
110// ─────────────────────────────────────────────────────────────────────────────
111// Endpoint management
112// ─────────────────────────────────────────────────────────────────────────────
113
115{
116 bool is_listener = !info.remote_address.empty() ? false
117 : (info.local_port > 0);
118
119 if (is_listener) {
120 auto listener = std::make_unique<ListenerState>(m_context);
121 listener->info = info;
122 listener->info.state = EndpointState::OPENING;
123
124 asio::error_code ec;
125 auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), info.local_port);
126
127 if (listener->acceptor.open(endpoint.protocol(), ec)) {
129 "Failed to open TCP acceptor: {}", ec.message());
130 return 0;
131 }
132
133 if (listener->acceptor.set_option(asio::socket_base::reuse_address(true), ec)) {
135 "Failed to set SO_REUSEADDR on TCP acceptor: {}", ec.message());
136 return 0;
137 }
138
139 if (listener->acceptor.bind(endpoint, ec)) {
141 "Failed to bind TCP acceptor to port {}: {}", info.local_port, ec.message());
142 return 0;
143 }
144
145 if (listener->acceptor.listen(asio::socket_base::max_listen_connections, ec)) {
147 "Failed to listen on port {}: {}", info.local_port, ec.message());
148 return 0;
149 }
150
151 listener->info.state = EndpointState::OPEN;
152
153 auto* raw = listener.get();
154
155 {
156 std::unique_lock lock(m_listeners_mutex);
157 m_listeners[info.id] = std::move(listener);
158 }
159
160 if (m_state_callback) {
162 }
163
164 start_accept(*raw);
165
167 "TCP listener {} opened on port {}", info.id, info.local_port);
168
169 return info.id;
170 }
171
172 auto conn = std::make_unique<ConnectionState>(m_context);
173 conn->info = info;
174 conn->info.state = EndpointState::OPENING;
175 conn->is_outbound = true;
176
177 auto* raw = conn.get();
178
179 {
180 std::unique_lock lock(m_connections_mutex);
181 m_connections[info.id] = std::move(conn);
182 }
183
184 start_connect(*raw);
185
187 "TCP connection {} opening to {}:{}", info.id, info.remote_address, info.remote_port);
188
189 return info.id;
190}
191
192void TCPBackend::close_endpoint(uint64_t endpoint_id)
193{
194 {
195 std::unique_lock lock(m_connections_mutex);
196 auto it = m_connections.find(endpoint_id);
197 if (it != m_connections.end()) {
198 asio::error_code ec;
199
200 if (!it->second->socket.close(ec)) {
202 "Error closing TCP connection {}: {}", endpoint_id, ec.message());
203 }
204
205 if (it->second->reconnect_timer) {
206 it->second->reconnect_timer->cancel();
207 }
208 m_connections.erase(it);
209
211 "TCP connection {} closed", endpoint_id);
212 return;
213 }
214 }
215
216 {
217 std::unique_lock lock(m_listeners_mutex);
218 auto it = m_listeners.find(endpoint_id);
219 if (it != m_listeners.end()) {
220 asio::error_code ec;
221
222 if (it->second->acceptor.close(ec)) {
224 "Error closing TCP listener {}: {}", endpoint_id, ec.message());
225 }
226
227 m_listeners.erase(it);
228
230 "TCP listener {} closed", endpoint_id);
231 }
232 }
233}
234
236{
237 {
238 std::shared_lock lock(m_connections_mutex);
239 auto it = m_connections.find(endpoint_id);
240 if (it != m_connections.end()) {
241 return it->second->info.state;
242 }
243 }
244
245 {
246 std::shared_lock lock(m_listeners_mutex);
247 auto it = m_listeners.find(endpoint_id);
248 if (it != m_listeners.end()) {
249 return it->second->info.state;
250 }
251 }
252
254}
255
256std::vector<EndpointInfo> TCPBackend::get_endpoints() const
257{
258 std::vector<EndpointInfo> result;
259
260 {
261 std::shared_lock lock(m_connections_mutex);
262 result.reserve(m_connections.size());
263 for (const auto& [id, conn] : m_connections) {
264 result.push_back(conn->info);
265 }
266 }
267
268 {
269 std::shared_lock lock(m_listeners_mutex);
270 for (const auto& [id, listener] : m_listeners) {
271 result.push_back(listener->info);
272 }
273 }
274
275 return result;
276}
277
278// ─────────────────────────────────────────────────────────────────────────────
279// Data transfer
280// ─────────────────────────────────────────────────────────────────────────────
281
282bool TCPBackend::send(uint64_t endpoint_id, const uint8_t* data, size_t size)
283{
284 std::shared_lock lock(m_connections_mutex);
285 auto it = m_connections.find(endpoint_id);
286 if (it == m_connections.end()) {
287 return false;
288 }
289
290 auto& conn = *it->second;
291 if (conn.info.state != EndpointState::OPEN) {
292 return false;
293 }
294
295 auto frame = std::make_shared<std::vector<uint8_t>>(sizeof(uint32_t) + size);
296 uint32_t net_len = htonl(static_cast<uint32_t>(size));
297 std::memcpy(frame->data(), &net_len, sizeof(uint32_t));
298 std::memcpy(frame->data() + sizeof(uint32_t), data, size);
299
300 asio::async_write(
301 conn.socket,
302 asio::buffer(*frame),
303 [frame, endpoint_id, this](const asio::error_code& ec, size_t) {
304 if (ec) {
305 MF_WARN(Journal::Component::Core, Journal::Context::NetworkBackend,
306 "TCP write failed on endpoint {}: {}", endpoint_id, ec.message());
307
308 std::shared_lock lk(m_connections_mutex);
309 auto cit = m_connections.find(endpoint_id);
310 if (cit != m_connections.end()) {
311 on_connection_error(*cit->second, ec);
312 }
313 }
314 });
315
316 return true;
317}
318
319bool TCPBackend::send_to(uint64_t endpoint_id, const uint8_t* data, size_t size,
320 const std::string&, uint16_t)
321{
322 return send(endpoint_id, data, size);
323}
324
325// ─────────────────────────────────────────────────────────────────────────────
326// Callbacks
327// ─────────────────────────────────────────────────────────────────────────────
328
329void TCPBackend::set_receive_callback(NetworkReceiveCallback callback)
330{
331 m_receive_callback = std::move(callback);
332}
333
334void TCPBackend::set_state_callback(EndpointStateCallback callback)
335{
336 m_state_callback = std::move(callback);
337}
338
339// ─────────────────────────────────────────────────────────────────────────────
340// Private: async connect
341// ─────────────────────────────────────────────────────────────────────────────
342
343void TCPBackend::start_connect(ConnectionState& conn)
344{
345 asio::error_code ec;
346 auto addr = asio::ip::make_address(conn.info.remote_address, ec);
347 if (ec) {
349 "Invalid remote address '{}': {}", conn.info.remote_address, ec.message());
350 transition_state(conn.info, EndpointState::ERROR);
351 return;
352 }
353
354 asio::ip::tcp::endpoint target(addr, conn.info.remote_port);
355
356 conn.socket.async_connect(
357 target,
358 [this, ep_id = conn.info.id](const asio::error_code& connect_ec) {
359 std::shared_lock lock(m_connections_mutex);
360 auto it = m_connections.find(ep_id);
361 if (it == m_connections.end()) {
362 return;
363 }
364
365 auto& c = *it->second;
366
367 if (connect_ec) {
369 "TCP connect failed for endpoint {}: {}",
370 ep_id, connect_ec.message());
371
372 if (m_config.auto_reconnect) {
373 transition_state(c.info, EndpointState::RECONNECTING);
374 schedule_reconnect(c);
375 } else {
376 transition_state(c.info, EndpointState::ERROR);
377 }
378 return;
379 }
380
381 transition_state(c.info, EndpointState::OPEN);
382 start_receive_chain(c);
383
385 "TCP endpoint {} connected to {}:{}",
386 ep_id, c.info.remote_address, c.info.remote_port);
387 });
388}
389
390// ─────────────────────────────────────────────────────────────────────────────
391// Private: async accept
392// ─────────────────────────────────────────────────────────────────────────────
393
394void TCPBackend::start_accept(ListenerState& listener)
395{
396 listener.pending_socket = asio::ip::tcp::socket(m_context);
397
398 listener.acceptor.async_accept(
399 listener.pending_socket,
400 [this, listener_id = listener.info.id](const asio::error_code& ec) {
401 std::shared_lock lock(m_listeners_mutex);
402 auto it = m_listeners.find(listener_id);
403 if (it == m_listeners.end()) {
404 return;
405 }
406
407 auto& lst = *it->second;
408
409 if (ec) {
410 if (ec == asio::error::operation_aborted) {
411 return;
412 }
414 "TCP accept error on listener {}: {}", listener_id, ec.message());
415 start_accept(lst);
416 return;
417 }
418
419 uint64_t new_id = 0;
420 if (m_allocate_endpoint_id) {
421 new_id = m_allocate_endpoint_id();
422 }
423
424 auto peer_ep = lst.pending_socket.remote_endpoint();
425 std::string peer_addr = peer_ep.address().to_string();
426 uint16_t peer_port = peer_ep.port();
427
428 auto conn = std::make_unique<ConnectionState>(m_context);
429 conn->socket = std::move(lst.pending_socket);
430 conn->is_outbound = false;
431 conn->info.id = new_id;
432 conn->info.transport = NetworkTransport::TCP;
433 conn->info.role = EndpointRole::BIDIRECTIONAL;
434 conn->info.state = EndpointState::OPEN;
435 conn->info.remote_address = peer_addr;
436 conn->info.remote_port = peer_port;
437 conn->info.local_port = lst.info.local_port;
438
439 auto* raw = conn.get();
440
441 {
442 std::unique_lock conn_lock(m_connections_mutex);
443 m_connections[new_id] = std::move(conn);
444 }
445
446 if (m_state_callback) {
447 m_state_callback(raw->info, EndpointState::CLOSED, EndpointState::OPEN);
448 }
449
450 start_receive_chain(*raw);
451
453 "TCP accepted connection {} from {}:{} on listener {}",
454 new_id, peer_addr, peer_port, listener_id);
455
456 start_accept(lst);
457 });
458}
459
460// ─────────────────────────────────────────────────────────────────────────────
461// Private: framed receive chain
462// ─────────────────────────────────────────────────────────────────────────────
463
464void TCPBackend::start_receive_chain(ConnectionState& conn)
465{
466 asio::async_read(
467 conn.socket,
468 asio::buffer(conn.header_buf),
469 [this, ep_id = conn.info.id](const asio::error_code& ec, size_t bytes) {
470 std::shared_lock lock(m_connections_mutex);
471 auto it = m_connections.find(ep_id);
472 if (it == m_connections.end()) {
473 return;
474 }
475 on_header_received(*it->second, ec, bytes);
476 });
477}
478
479void TCPBackend::on_header_received(ConnectionState& conn,
480 const asio::error_code& ec, size_t)
481{
482 if (ec) {
483 on_connection_error(conn, ec);
484 return;
485 }
486
487 uint32_t net_len {};
488 std::memcpy(&net_len, conn.header_buf.data(), sizeof(uint32_t));
489 uint32_t payload_size = ntohl(net_len);
490
491 if (payload_size == 0 || payload_size > 64 * 1024 * 1024) {
493 "TCP endpoint {} received invalid frame length: {}",
494 conn.info.id, payload_size);
495 on_connection_error(conn, asio::error::message_size);
496 return;
497 }
498
499 conn.payload_buf.resize(payload_size);
500
501 asio::async_read(
502 conn.socket,
503 asio::buffer(conn.payload_buf),
504 [this, ep_id = conn.info.id](const asio::error_code& read_ec, size_t bytes) {
505 std::shared_lock lock(m_connections_mutex);
506 auto it = m_connections.find(ep_id);
507 if (it == m_connections.end()) {
508 return;
509 }
510 on_payload_received(*it->second, read_ec, bytes);
511 });
512}
513
514void TCPBackend::on_payload_received(ConnectionState& conn,
515 const asio::error_code& ec, size_t /*bytes*/)
516{
517 if (ec) {
518 on_connection_error(conn, ec);
519 return;
520 }
521
522 if (m_receive_callback) {
523 std::string peer_str = conn.info.remote_address + ":" + std::to_string(conn.info.remote_port);
524
525 m_receive_callback(conn.info.id,
526 conn.payload_buf.data(), conn.payload_buf.size(),
527 peer_str);
528 }
529
530 start_receive_chain(conn);
531}
532
533// ─────────────────────────────────────────────────────────────────────────────
534// Private: error handling and reconnect
535// ─────────────────────────────────────────────────────────────────────────────
536
537void TCPBackend::on_connection_error(ConnectionState& conn, const asio::error_code& ec)
538{
539 if (ec == asio::error::operation_aborted) {
540 return;
541 }
542
543 if (ec == asio::error::eof) {
545 "TCP endpoint {} peer disconnected", conn.info.id);
546 } else {
548 "TCP endpoint {} error: {}", conn.info.id, ec.message());
549 }
550
551 asio::error_code close_ec;
552 if (!conn.socket.close(close_ec)) {
554 "Error closing TCP socket for endpoint {}: {}", conn.info.id, close_ec.message());
555 }
556
557 if (conn.is_outbound && m_config.auto_reconnect) {
558 transition_state(conn.info, EndpointState::RECONNECTING);
559 schedule_reconnect(conn);
560 } else {
561 transition_state(conn.info, EndpointState::ERROR);
562 }
563}
564
565void TCPBackend::schedule_reconnect(ConnectionState& conn)
566{
567 if (!conn.reconnect_timer) {
568 conn.reconnect_timer = std::make_unique<asio::steady_timer>(m_context);
569 }
570
571 conn.reconnect_timer->expires_after(
572 std::chrono::milliseconds(m_config.reconnect_interval_ms));
573
574 conn.reconnect_timer->async_wait(
575 [this, ep_id = conn.info.id](const asio::error_code& ec) {
576 if (ec == asio::error::operation_aborted) {
577 return;
578 }
579
580 std::shared_lock lock(m_connections_mutex);
581 auto it = m_connections.find(ep_id);
582 if (it == m_connections.end()) {
583 return;
584 }
585
586 auto& c = *it->second;
587
588 c.socket = asio::ip::tcp::socket(m_context);
589
591 "TCP endpoint {} attempting reconnect to {}:{}",
592 ep_id, c.info.remote_address, c.info.remote_port);
593
594 start_connect(c);
595 });
596}
597
598void TCPBackend::transition_state(EndpointInfo& info, EndpointState new_state)
599{
600 EndpointState prev = info.state;
601 info.state = new_state;
602
603 if (m_state_callback && prev != new_state) {
604 m_state_callback(info, prev, new_state);
605 }
606}
607
608} // namespace MayaFlux::Core
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
Range size
void close_endpoint(uint64_t endpoint_id) override
Close an endpoint and release its resources.
void start_connect(ConnectionState &conn)
Initiate async_connect for an outbound endpoint.
std::atomic< bool > m_running
std::unordered_map< uint64_t, std::unique_ptr< ConnectionState > > m_connections
std::shared_mutex m_listeners_mutex
void shutdown() override
Release all resources, close all endpoints.
std::unordered_map< uint64_t, std::unique_ptr< ListenerState > > m_listeners
bool initialize() override
Initialise backend resources (sockets, SHM segments, etc.)
bool send(uint64_t endpoint_id, const uint8_t *data, size_t size) override
Send data through an endpoint.
void start_accept(ListenerState &listener)
Initiate async_accept loop for a listener.
std::shared_mutex m_connections_mutex
EndpointState get_endpoint_state(uint64_t endpoint_id) const override
Query the current state of an endpoint.
std::vector< EndpointInfo > get_endpoints() const override
List all endpoints currently managed by this backend.
void stop() override
Stop receive threads without releasing resources.
uint64_t open_endpoint(const EndpointInfo &info) override
Open a new endpoint.
asio::io_context & m_context
TCPBackend(const TCPBackendInfo &config, asio::io_context &context)
Construct with config and a reference to the shared io_context.
EndpointStateCallback m_state_callback
std::atomic< bool > m_initialized
void start() override
Start receive threads and accept connections.
EndpointState
Observable connection state for an endpoint.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
std::function< void(const EndpointInfo &info, EndpointState previous, EndpointState current)> EndpointStateCallback
Callback signature for endpoint state changes.
@ Shutdown
Engine/subsystem shutdown and cleanup.
@ NetworkBackend
Network transport backend (UDP, TCP, SHM)
@ Init
Engine/subsystem initialization.
@ Core
Core engine, backend, subsystems.
Describes one logical send/receive endpoint managed by a backend.
Configuration for the TCP transport backend.
std::unique_ptr< asio::steady_timer > reconnect_timer
State for a connected TCP peer (inbound or outbound)
State for a listening acceptor.