MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
TCPBackend.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "NetworkBackend.hpp"
4
5#include <asio/io_context.hpp>
6#include <asio/ip/tcp.hpp>
7#include <asio/steady_timer.hpp>
8
9namespace MayaFlux::Core {
10
11/**
12 * @class TCPBackend
13 * @brief Connection-oriented reliable stream transport over TCP via standalone Asio
14 *
15 * Two endpoint modes:
16 * - Outbound (remote_address set): async_connect to a remote host.
17 * - Listener (local_port set, no remote_address): async_accept on a port.
18 * Accepted connections become new endpoints, announced via state callback.
19 *
20 * Framing: every message on the wire is [uint32_t network-order length][payload].
21 * The backend handles framing transparently. Callers send and receive complete
22 * messages, never partial streams.
23 *
24 * Reconnection: if auto_reconnect is enabled and a connection drops, the
25 * backend transitions to RECONNECTING and schedules async_connect retries
26 * via asio::steady_timer.
27 *
28 * All I/O is async on the shared io_context. No dedicated threads per
29 * connection. No mutexes on the data path. One io_context thread handles
30 * all sockets: accept, connect, read, write, reconnect timers.
31 *
32 * Receive chain per connection:
33 * async_read(4 bytes header) -> parse length
34 * -> async_read(length bytes payload) -> fire callback -> resubmit header read
35 *
36 * Send:
37 * async_write(4 bytes header + payload). Serialised per-connection via
38 * Asio strand (implicit: all operations on one io_context thread).
39 *
40 * @code
41 * asio::io_context ctx;
42 * TCPBackend tcp(config, ctx);
43 * tcp.initialize();
44 * tcp.set_receive_callback([](uint64_t id, const uint8_t* d, size_t s, std::string_view a) {
45 * // handle complete framed message
46 * });
47 * tcp.set_state_callback([](const EndpointInfo& ep, EndpointState prev, EndpointState cur) {
48 * // track connection/disconnection
49 * });
50 * tcp.start();
51 *
52 * // Outbound connection
53 * EndpointInfo client_ep;
54 * client_ep.role = EndpointRole::BIDIRECTIONAL;
55 * client_ep.remote_address = "192.168.1.50";
56 * client_ep.remote_port = 7000;
57 * auto ep_id = tcp.open_endpoint(client_ep);
58 *
59 * // Listener
60 * EndpointInfo server_ep;
61 * server_ep.role = EndpointRole::RECEIVE;
62 * server_ep.local_port = 7000;
63 * auto listen_id = tcp.open_endpoint(server_ep);
64 *
65 * ctx.run();
66 * @endcode
67 */
68class MAYAFLUX_API TCPBackend : public INetworkBackend {
69public:
70 /**
71 * @brief Construct with config and a reference to the shared io_context
72 * @param config TCP-specific configuration.
73 * @param context The io_context owned by NetworkSubsystem.
74 */
75 TCPBackend(const TCPBackendInfo& config, asio::io_context& context);
76 ~TCPBackend() override;
77
78 TCPBackend(const TCPBackend&) = delete;
79 TCPBackend& operator=(const TCPBackend&) = delete;
82
83 // ─── INetworkBackend lifecycle ──────────────────────────────────────────
84
85 bool initialize() override;
86 void start() override;
87 void stop() override;
88 void shutdown() override;
89
90 [[nodiscard]] bool is_initialized() const override { return m_initialized.load(); }
91 [[nodiscard]] bool is_running() const override { return m_running.load(); }
92
93 [[nodiscard]] NetworkTransport get_transport() const override { return NetworkTransport::TCP; }
94 [[nodiscard]] std::string get_name() const override { return "TCP (Asio)"; }
95 [[nodiscard]] std::string get_version() const override { return "1.0"; }
96
97 // ─── INetworkBackend endpoints ──────────────────────────────────────────
98
99 uint64_t open_endpoint(const EndpointInfo& info) override;
100 void close_endpoint(uint64_t endpoint_id) override;
101 [[nodiscard]] EndpointState get_endpoint_state(uint64_t endpoint_id) const override;
102 [[nodiscard]] std::vector<EndpointInfo> get_endpoints() const override;
103
104 // ─── INetworkBackend data ───────────────────────────────────────────────
105
106 bool send(uint64_t endpoint_id, const uint8_t* data, size_t size) override;
107 bool send_to(uint64_t endpoint_id, const uint8_t* data, size_t size,
108 const std::string& address, uint16_t port) override;
109
110 // ─── INetworkBackend callbacks ──────────────────────────────────────────
111
112 void set_receive_callback(NetworkReceiveCallback callback) override;
113 void set_state_callback(EndpointStateCallback callback) override;
114
115private:
116 /**
117 * @brief State for a connected TCP peer (inbound or outbound)
118 */
121 asio::ip::tcp::socket socket;
122 std::array<uint8_t, 4> header_buf {};
123 std::vector<uint8_t> payload_buf;
124 std::unique_ptr<asio::steady_timer> reconnect_timer;
125 bool is_outbound {};
126
127 explicit ConnectionState(asio::io_context& ctx)
128 : socket(ctx)
129 {
130 }
131 };
132
133 /**
134 * @brief State for a listening acceptor
135 */
138 asio::ip::tcp::acceptor acceptor;
139 asio::ip::tcp::socket pending_socket;
140
141 explicit ListenerState(asio::io_context& ctx)
142 : acceptor(ctx)
143 , pending_socket(ctx)
144 {
145 }
146 };
147
149 asio::io_context& m_context;
150
151 std::atomic<bool> m_initialized { false };
152 std::atomic<bool> m_running { false };
153
154 mutable std::shared_mutex m_connections_mutex;
155 std::unordered_map<uint64_t, std::unique_ptr<ConnectionState>> m_connections;
156
157 mutable std::shared_mutex m_listeners_mutex;
158 std::unordered_map<uint64_t, std::unique_ptr<ListenerState>> m_listeners;
159
162
163 /**
164 * @brief Pointer to subsystem's endpoint id allocator.
165 *
166 * When a listener accepts an inbound connection, the new connection
167 * needs a globally unique endpoint id. The subsystem provides this
168 * via a callback set during add_backend().
169 */
170 std::function<uint64_t()> m_allocate_endpoint_id;
171
172 // ─── Async operation chains ─────────────────────────────────────────────
173
174 /**
175 * @brief Initiate async_connect for an outbound endpoint.
176 *
177 * On success: transitions to OPEN, starts receive chain.
178 * On failure: transitions to ERROR or RECONNECTING.
179 */
180 void start_connect(ConnectionState& conn);
181
182 /**
183 * @brief Initiate async_accept loop for a listener.
184 *
185 * On accept: creates ConnectionState for the new peer, assigns
186 * endpoint id via m_allocate_endpoint_id, starts receive chain,
187 * fires state callback. Then resubmits async_accept.
188 */
189 void start_accept(ListenerState& listener);
190
191 /**
192 * @brief Start the framed message receive chain on a connection.
193 *
194 * Posts async_read for 4-byte header. Completion parses length,
195 * posts async_read for payload. Completion fires receive callback,
196 * then resubmits header read.
197 */
198 void start_receive_chain(ConnectionState& conn);
199
200 /**
201 * @brief Header read completion handler.
202 */
203 void on_header_received(ConnectionState& conn,
204 const asio::error_code& ec, size_t bytes);
205
206 /**
207 * @brief Payload read completion handler.
208 */
209 void on_payload_received(ConnectionState& conn,
210 const asio::error_code& ec, size_t bytes);
211
212 /**
213 * @brief Handle a connection error (read/write failure).
214 *
215 * Transitions to ERROR. If auto_reconnect and outbound, schedules
216 * reconnect via steady_timer.
217 */
218 void on_connection_error(ConnectionState& conn, const asio::error_code& ec);
219
220 /**
221 * @brief Schedule a reconnect attempt after the configured interval.
222 */
223 void schedule_reconnect(ConnectionState& conn);
224
225 void transition_state(EndpointInfo& info, EndpointState new_state);
226
227public:
228 /**
229 * @brief Set the endpoint id allocator (called by NetworkSubsystem)
230 *
231 * Required for listener-accepted connections that need globally
232 * unique ids. Set before start().
233 */
234 void set_endpoint_id_allocator(std::function<uint64_t()> allocator)
235 {
236 m_allocate_endpoint_id = std::move(allocator);
237 }
238};
239
240} // namespace MayaFlux::Core
Range size
Abstract interface for network transport backends.
std::string get_name() const override
void set_endpoint_id_allocator(std::function< uint64_t()> allocator)
Set the endpoint id allocator (called by NetworkSubsystem)
std::function< uint64_t()> m_allocate_endpoint_id
Pointer to subsystem's endpoint id allocator.
NetworkReceiveCallback m_receive_callback
TCPBackend & operator=(const TCPBackend &)=delete
std::unordered_map< uint64_t, std::unique_ptr< ConnectionState > > m_connections
std::shared_mutex m_listeners_mutex
TCPBackend(TCPBackend &&)=delete
std::unordered_map< uint64_t, std::unique_ptr< ListenerState > > m_listeners
bool is_initialized() const override
TCPBackend(const TCPBackend &)=delete
std::shared_mutex m_connections_mutex
std::string get_version() const override
bool is_running() const override
TCPBackend & operator=(TCPBackend &&)=delete
asio::io_context & m_context
NetworkTransport get_transport() const override
EndpointStateCallback m_state_callback
Connection-oriented reliable stream transport over TCP via standalone Asio.
void initialize()
Definition main.cpp:11
EndpointState
Observable connection state for an endpoint.
NetworkTransport
Identifies the transport protocol a backend implements.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
std::function< void(const EndpointInfo &info, EndpointState previous, EndpointState current)> EndpointStateCallback
Callback signature for endpoint state changes.
Describes one logical send/receive endpoint managed by a backend.
Configuration for the TCP transport backend.
std::unique_ptr< asio::steady_timer > reconnect_timer
State for a connected TCP peer (inbound or outbound)
State for a listening acceptor.