3#include <asio/read.hpp>
4#include <asio/write.hpp>
34 "TCP backend initialized");
48 "TCP backend started");
64 if (conn->socket.close(ec)) {
66 "Error closing TCP connection {}: {}",
id, ec.message());
69 if (conn->reconnect_timer) {
70 conn->reconnect_timer->cancel();
79 if (!listener->acceptor.close(ec)) {
81 "Error closing TCP listener {}: {}",
id, ec.message());
87 "TCP backend stopped");
107 "TCP backend shutdown");
120 auto listener = std::make_unique<ListenerState>(
m_context);
121 listener->info = info;
125 auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), info.
local_port);
127 if (listener->acceptor.open(endpoint.protocol(), ec)) {
129 "Failed to open TCP acceptor: {}", ec.message());
133 if (listener->acceptor.set_option(asio::socket_base::reuse_address(
true), ec)) {
135 "Failed to set SO_REUSEADDR on TCP acceptor: {}", ec.message());
139 if (listener->acceptor.bind(endpoint, ec)) {
141 "Failed to bind TCP acceptor to port {}: {}", info.
local_port, ec.message());
145 if (listener->acceptor.listen(asio::socket_base::max_listen_connections, ec)) {
147 "Failed to listen on port {}: {}", info.
local_port, ec.message());
153 auto* raw = listener.get();
167 "TCP listener {} opened on port {}", info.
id, info.
local_port);
172 auto conn = std::make_unique<ConnectionState>(
m_context);
175 conn->is_outbound =
true;
177 auto* raw = conn.get();
200 if (!it->second->socket.close(ec)) {
202 "Error closing TCP connection {}: {}", endpoint_id, ec.message());
205 if (it->second->reconnect_timer) {
206 it->second->reconnect_timer->cancel();
211 "TCP connection {} closed", endpoint_id);
222 if (it->second->acceptor.close(ec)) {
224 "Error closing TCP listener {}: {}", endpoint_id, ec.message());
230 "TCP listener {} closed", endpoint_id);
241 return it->second->info.state;
249 return it->second->info.state;
258 std::vector<EndpointInfo> result;
264 result.push_back(conn->info);
271 result.push_back(listener->info);
290 auto& conn = *it->second;
295 auto frame = std::make_shared<std::vector<uint8_t>>(
sizeof(uint32_t) +
size);
296 uint32_t net_len = htonl(
static_cast<uint32_t
>(
size));
297 std::memcpy(frame->data(), &net_len,
sizeof(uint32_t));
298 std::memcpy(frame->data() +
sizeof(uint32_t), data,
size);
302 asio::buffer(*frame),
303 [frame, endpoint_id,
this](
const asio::error_code& ec,
size_t) {
305 MF_WARN(Journal::Component::Core, Journal::Context::NetworkBackend,
306 "TCP write failed on endpoint {}: {}", endpoint_id, ec.message());
308 std::shared_lock lk(m_connections_mutex);
309 auto cit = m_connections.find(endpoint_id);
310 if (cit != m_connections.end()) {
311 on_connection_error(*cit->second, ec);
319bool TCPBackend::send_to(uint64_t endpoint_id,
const uint8_t* data,
size_t size,
320 const std::string&, uint16_t)
322 return send(endpoint_id, data,
size);
331 m_receive_callback = std::move(callback);
336 m_state_callback = std::move(callback);
350 transition_state(conn.
info, EndpointState::ERROR);
356 conn.
socket.async_connect(
358 [
this, ep_id = conn.
info.
id](
const asio::error_code& connect_ec) {
359 std::shared_lock lock(m_connections_mutex);
360 auto it = m_connections.find(ep_id);
361 if (it == m_connections.end()) {
365 auto& c = *it->second;
369 "TCP connect failed for endpoint {}: {}",
370 ep_id, connect_ec.message());
372 if (m_config.auto_reconnect) {
373 transition_state(c.info, EndpointState::RECONNECTING);
374 schedule_reconnect(c);
376 transition_state(c.info, EndpointState::ERROR);
381 transition_state(c.info, EndpointState::OPEN);
382 start_receive_chain(c);
385 "TCP endpoint {} connected to {}:{}",
386 ep_id, c.info.remote_address, c.info.remote_port);
400 [
this, listener_id = listener.
info.
id](
const asio::error_code& ec) {
401 std::shared_lock lock(m_listeners_mutex);
402 auto it = m_listeners.find(listener_id);
403 if (it == m_listeners.end()) {
407 auto& lst = *it->second;
410 if (ec == asio::error::operation_aborted) {
414 "TCP accept error on listener {}: {}", listener_id, ec.message());
420 if (m_allocate_endpoint_id) {
421 new_id = m_allocate_endpoint_id();
424 auto peer_ep = lst.pending_socket.remote_endpoint();
425 std::string peer_addr = peer_ep.address().to_string();
426 uint16_t peer_port = peer_ep.port();
428 auto conn = std::make_unique<ConnectionState>(m_context);
429 conn->socket = std::move(lst.pending_socket);
430 conn->is_outbound =
false;
431 conn->info.id = new_id;
432 conn->info.transport = NetworkTransport::TCP;
433 conn->info.role = EndpointRole::BIDIRECTIONAL;
434 conn->info.state = EndpointState::OPEN;
435 conn->info.remote_address = peer_addr;
436 conn->info.remote_port = peer_port;
437 conn->info.local_port = lst.info.local_port;
439 auto* raw = conn.get();
442 std::unique_lock conn_lock(m_connections_mutex);
443 m_connections[new_id] = std::move(conn);
446 if (m_state_callback) {
447 m_state_callback(raw->info, EndpointState::CLOSED, EndpointState::OPEN);
450 start_receive_chain(*raw);
453 "TCP accepted connection {} from {}:{} on listener {}",
454 new_id, peer_addr, peer_port, listener_id);
469 [
this, ep_id = conn.
info.
id](
const asio::error_code& ec,
size_t bytes) {
470 std::shared_lock lock(m_connections_mutex);
471 auto it = m_connections.find(ep_id);
472 if (it == m_connections.end()) {
475 on_header_received(*it->second, ec, bytes);
480 const asio::error_code& ec,
size_t)
483 on_connection_error(conn, ec);
488 std::memcpy(&net_len, conn.
header_buf.data(),
sizeof(uint32_t));
489 uint32_t payload_size = ntohl(net_len);
491 if (payload_size == 0 || payload_size > 64 * 1024 * 1024) {
493 "TCP endpoint {} received invalid frame length: {}",
494 conn.
info.
id, payload_size);
495 on_connection_error(conn, asio::error::message_size);
504 [
this, ep_id = conn.
info.
id](
const asio::error_code& read_ec,
size_t bytes) {
505 std::shared_lock lock(m_connections_mutex);
506 auto it = m_connections.find(ep_id);
507 if (it == m_connections.end()) {
510 on_payload_received(*it->second, read_ec, bytes);
515 const asio::error_code& ec,
size_t )
518 on_connection_error(conn, ec);
522 if (m_receive_callback) {
525 m_receive_callback(conn.
info.
id,
530 start_receive_chain(conn);
537void TCPBackend::on_connection_error(
ConnectionState& conn,
const asio::error_code& ec)
539 if (ec == asio::error::operation_aborted) {
543 if (ec == asio::error::eof) {
545 "TCP endpoint {} peer disconnected", conn.
info.
id);
548 "TCP endpoint {} error: {}", conn.
info.
id, ec.message());
551 asio::error_code close_ec;
552 if (!conn.
socket.close(close_ec)) {
554 "Error closing TCP socket for endpoint {}: {}", conn.
info.
id, close_ec.message());
558 transition_state(conn.
info, EndpointState::RECONNECTING);
559 schedule_reconnect(conn);
561 transition_state(conn.
info, EndpointState::ERROR);
568 conn.
reconnect_timer = std::make_unique<asio::steady_timer>(m_context);
572 std::chrono::milliseconds(m_config.reconnect_interval_ms));
575 [
this, ep_id = conn.
info.
id](
const asio::error_code& ec) {
576 if (ec == asio::error::operation_aborted) {
580 std::shared_lock lock(m_connections_mutex);
581 auto it = m_connections.find(ep_id);
582 if (it == m_connections.end()) {
586 auto& c = *it->second;
588 c.socket = asio::ip::tcp::socket(m_context);
591 "TCP endpoint {} attempting reconnect to {}:{}",
592 ep_id, c.info.remote_address, c.info.remote_port);
601 info.
state = new_state;
603 if (m_state_callback && prev != new_state) {
604 m_state_callback(info, prev, new_state);
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
void close_endpoint(uint64_t endpoint_id) override
Close an endpoint and release its resources.
void start_connect(ConnectionState &conn)
Initiate async_connect for an outbound endpoint.
std::atomic< bool > m_running
std::unordered_map< uint64_t, std::unique_ptr< ConnectionState > > m_connections
std::shared_mutex m_listeners_mutex
void shutdown() override
Release all resources, close all endpoints.
std::unordered_map< uint64_t, std::unique_ptr< ListenerState > > m_listeners
bool initialize() override
Initialise backend resources (sockets, SHM segments, etc.)
bool send(uint64_t endpoint_id, const uint8_t *data, size_t size) override
Send data through an endpoint.
void start_accept(ListenerState &listener)
Initiate async_accept loop for a listener.
std::shared_mutex m_connections_mutex
EndpointState get_endpoint_state(uint64_t endpoint_id) const override
Query the current state of an endpoint.
std::vector< EndpointInfo > get_endpoints() const override
List all endpoints currently managed by this backend.
void stop() override
Stop receive threads without releasing resources.
uint64_t open_endpoint(const EndpointInfo &info) override
Open a new endpoint.
asio::io_context & m_context
TCPBackend(const TCPBackendInfo &config, asio::io_context &context)
Construct with config and a reference to the shared io_context.
EndpointStateCallback m_state_callback
std::atomic< bool > m_initialized
void start() override
Start receive threads and accept connections.
EndpointState
Observable connection state for an endpoint.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
std::function< void(const EndpointInfo &info, EndpointState previous, EndpointState current)> EndpointStateCallback
Callback signature for endpoint state changes.
@ Shutdown
Engine/subsystem shutdown and cleanup.
@ NetworkBackend
Network transport backend (UDP, TCP, SHM)
@ Init
Engine/subsystem initialization.
@ Core
Core engine, backend, subsystems.
std::string remote_address
Describes one logical send/receive endpoint managed by a backend.
Configuration for the TCP transport backend.
asio::ip::tcp::socket socket
std::array< uint8_t, 4 > header_buf
std::unique_ptr< asio::steady_timer > reconnect_timer
std::vector< uint8_t > payload_buf
State for a connected TCP peer (inbound or outbound)
asio::ip::tcp::acceptor acceptor
asio::ip::tcp::socket pending_socket
State for a listening acceptor.