MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
NetworkSubsystem.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "Subsystem.hpp"
4
6
7#include <asio/executor_work_guard.hpp>
8#include <asio/io_context.hpp>
9
11struct NetworkService;
12}
13
14namespace MayaFlux::Core {
15
16/**
17 * @class NetworkSubsystem
18 * @brief General-purpose network transport subsystem
19 *
20 * Owns multiple INetworkBackend instances (UDP, TCP, SharedMemory) and
21 * registers a NetworkService into BackendRegistry for decoupled access
22 * by any engine component.
23 *
24 * Structural parallel to InputSubsystem:
25 * - InputSubsystem owns IInputBackend instances (HID, MIDI).
26 * - NetworkSubsystem owns INetworkBackend instances (UDP, TCP, SHM).
27 * - InputSubsystem registers InputService.
28 * - NetworkSubsystem registers NetworkService.
29 *
30 * Key difference: InputSubsystem is receive-only (InputCallback delivers
31 * events to InputManager). NetworkSubsystem is bidirectional. It provides
32 * send and receive through NetworkService, and manages persistent
33 * endpoints rather than device handles.
34 *
35 * Relationship to InputSubsystem's OSC/Serial stubs:
36 * InputSubsystem::initialize_osc_backend() will create an OSCBackend
37 * (IInputBackend) that internally obtains a UDP endpoint through
38 * NetworkService. The OSCBackend handles OSC address parsing and
39 * produces InputValue events. The actual socket is managed here.
40 * Same pattern for Serial if it ever rides over TCP or SHM.
41 *
42 * Endpoint IDs are globally unique across all backends. The subsystem
43 * maintains a transport-to-backend routing map so callers never need
44 * to know which backend handles their endpoint.
45 *
46 * Tokens:
47 * Buffer = NETWORK_BACKEND (new ProcessingToken)
48 * Node = EVENT_RATE (shared with InputSubsystem, async arrival)
49 * Task = EVENT_DRIVEN (shared with InputSubsystem)
50 *
51 * @code
52 * GlobalNetworkConfig config;
53 * config.udp.enabled = true;
54 * config.tcp.enabled = true;
55 *
56 * auto subsystem = std::make_unique<NetworkSubsystem>(config);
57 * subsystem_manager->add_subsystem(SubsystemType::NETWORK, subsystem);
58 * @endcode
59 */
60class MAYAFLUX_API NetworkSubsystem : public ISubsystem {
61public:
62 explicit NetworkSubsystem(const GlobalNetworkConfig& config);
63 ~NetworkSubsystem() override;
64
69
70 // ─────────────────────────────────────────────────────────────────────────
71 // ISubsystem implementation
72 // ─────────────────────────────────────────────────────────────────────────
73
74 void register_callbacks() override;
75 void initialize(SubsystemProcessingHandle& handle) override;
76 void start() override;
77 void pause() override;
78 void resume() override;
79 void stop() override;
80 void shutdown() override;
81 void wait_until_running() override;
82
83 [[nodiscard]] SubsystemTokens get_tokens() const override { return m_tokens; }
84 [[nodiscard]] bool is_ready() const override { return m_ready.load(); }
85 [[nodiscard]] bool is_running() const override { return m_running.load(); }
86 [[nodiscard]] SubsystemType get_type() const override { return SubsystemType::NETWORK; }
88
89 // ─────────────────────────────────────────────────────────────────────────
90 // Backend management (mirrors InputSubsystem::add_backend pattern)
91 // ─────────────────────────────────────────────────────────────────────────
92
93 /**
94 * @brief Add a network backend
95 * @param backend Backend instance (takes ownership)
96 * @return true if added successfully
97 *
98 * Only one backend per NetworkTransport type is permitted.
99 */
100 bool add_backend(std::unique_ptr<INetworkBackend> backend);
101
102 /**
103 * @brief Get a backend by transport type
104 */
105 [[nodiscard]] INetworkBackend* get_backend(NetworkTransport transport) const;
106
107 /**
108 * @brief Get all active backends
109 */
110 [[nodiscard]] std::vector<INetworkBackend*> get_backends() const;
111
112 // ─────────────────────────────────────────────────────────────────────────
113 // Endpoint management (routed through to correct backend)
114 // ─────────────────────────────────────────────────────────────────────────
115
116 /**
117 * @brief Open an endpoint on the backend matching info.transport
118 * @param info Endpoint configuration.
119 * @return Globally unique endpoint id, or 0 on failure.
120 */
121 uint64_t open_endpoint(const EndpointInfo& info);
122
123 /**
124 * @brief Close an endpoint
125 */
126 void close_endpoint(uint64_t endpoint_id);
127
128 /**
129 * @brief Send data through an endpoint
130 */
131 bool send(uint64_t endpoint_id, const uint8_t* data, size_t size);
132
133 /**
134 * @brief Send data to a specific address
135 */
136 bool send_to(uint64_t endpoint_id, const uint8_t* data, size_t size,
137 const std::string& address, uint16_t port);
138
139 /**
140 * @brief Query endpoint state
141 */
142 [[nodiscard]] EndpointState get_endpoint_state(uint64_t endpoint_id) const;
143
144 /**
145 * @brief Register a per-endpoint receive callback
146 *
147 * Stored locally and invoked from the backend's receive callback
148 * after endpoint routing.
149 */
150 void set_endpoint_receive_callback(uint64_t endpoint_id,
151 NetworkReceiveCallback callback);
152
153 /**
154 * @brief List all open endpoints across all backends
155 */
156 [[nodiscard]] std::vector<EndpointInfo> get_all_endpoints() const;
157
158private:
160 SubsystemProcessingHandle* m_handle { nullptr };
162
163 std::atomic<bool> m_ready { false };
164 std::atomic<bool> m_running { false };
165
166 // ─── Asio event loop ────────────────────────────────────────────────────
167 //
168 // Single io_context shared by all backends. One jthread runs
169 // io_context::run(). All async operations (send, receive, accept,
170 // connect, reconnect timers) complete on this thread. No per-socket
171 // threads, no per-connection threads, no mutexes on the data path.
172 //
173 // work_guard prevents io_context::run() from returning when there
174 // are no pending operations (idle between endpoint opens).
175 // ────────────────────────────────────────────────────────────────────────
176
177 std::unique_ptr<asio::io_context> m_io_context;
178 std::unique_ptr<asio::executor_work_guard<asio::io_context::executor_type>> m_work_guard;
179
180#if MAYAFLUX_USE_JTHREAD
181 std::jthread m_io_thread;
182#else
183 std::thread m_io_thread;
184 std::atomic<bool> m_io_stop_requested { false };
185#endif
186
187 mutable std::shared_mutex m_backends_mutex;
188 std::unordered_map<NetworkTransport, std::unique_ptr<INetworkBackend>> m_backends;
189
190 std::shared_ptr<Registry::Service::NetworkService> m_network_service;
191
192 /**
193 * @brief Maps endpoint_id -> transport for routing send/close/query
194 * to the correct backend without scanning all backends.
195 */
196 mutable std::shared_mutex m_routing_mutex;
197 std::unordered_map<uint64_t, NetworkTransport> m_endpoint_routing;
198
199 /**
200 * @brief Per-endpoint receive callbacks set by consumers via
201 * set_endpoint_receive_callback(). Backend-level receive
202 * callbacks route here.
203 */
204 mutable std::shared_mutex m_callbacks_mutex;
205 std::unordered_map<uint64_t, NetworkReceiveCallback> m_endpoint_callbacks;
206
207 /**
208 * @brief Global endpoint id counter shared across all backends.
209 * Backends do not assign their own ids. The subsystem assigns
210 * and maps them.
211 */
212 std::atomic<uint64_t> m_next_endpoint_id { 1 };
213
214 void initialize_udp_backend();
215 void initialize_tcp_backend();
216 void initialize_shm_backend();
217
218 void register_backend_service();
219
220 /**
221 * @brief Wired as the receive callback on each backend. Routes
222 * to per-endpoint callbacks in m_endpoint_callbacks.
223 */
224 void on_backend_receive(uint64_t endpoint_id, const uint8_t* data,
225 size_t size, std::string_view sender_addr);
226
227 /**
228 * @brief Wired as the state callback on each backend. Forwards
229 * to any registered state observers.
230 */
231 void on_backend_state_change(const EndpointInfo& info,
233
234 /**
235 * @brief Resolve endpoint_id to its owning backend. Returns nullptr
236 * if the id is unknown.
237 */
238 INetworkBackend* resolve_backend(uint64_t endpoint_id) const;
239};
240
241} // namespace MayaFlux::Core
glm::vec2 current
Abstract interface for network transport backends.
Base interface for all subsystems in the MayaFlux processing architecture.
Definition Subsystem.hpp:26
std::unique_ptr< asio::io_context > m_io_context
std::shared_mutex m_routing_mutex
Maps endpoint_id -> transport for routing send/close/query to the correct backend without scanning al...
bool is_running() const override
Check if subsystem is currently processing.
std::unique_ptr< asio::executor_work_guard< asio::io_context::executor_type > > m_work_guard
NetworkSubsystem & operator=(const NetworkSubsystem &)=delete
SubsystemProcessingHandle * get_processing_context_handle() override
Get the processing context handle for this subsystem.
std::unordered_map< uint64_t, NetworkTransport > m_endpoint_routing
SubsystemTokens get_tokens() const override
Get the processing token configuration this subsystem manages.
NetworkSubsystem(const NetworkSubsystem &)=delete
std::unordered_map< uint64_t, NetworkReceiveCallback > m_endpoint_callbacks
std::shared_mutex m_callbacks_mutex
Per-endpoint receive callbacks set by consumers via set_endpoint_receive_callback().
NetworkSubsystem & operator=(NetworkSubsystem &&)=delete
bool is_ready() const override
Check if subsystem is ready for operation.
SubsystemType get_type() const override
Get the type of this subsystem.
NetworkSubsystem(NetworkSubsystem &&)=delete
std::unordered_map< NetworkTransport, std::unique_ptr< INetworkBackend > > m_backends
std::shared_ptr< Registry::Service::NetworkService > m_network_service
General-purpose network transport subsystem.
Unified interface combining buffer and node processing for subsystems.
void initialize()
Definition main.cpp:11
EndpointState
Observable connection state for an endpoint.
NetworkTransport
Identifies the transport protocol a backend implements.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
Describes one logical send/receive endpoint managed by a backend.
Configuration for the NetworkSubsystem.
Processing token configuration for subsystem operation.