MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
NetworkSubsystem.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "Subsystem.hpp"
4
6
7#include <asio/executor_work_guard.hpp>
8#include <asio/io_context.hpp>
9
11struct NetworkService;
12}
13
14namespace MayaFlux::Core {
15
16/**
17 * @class NetworkSubsystem
18 * @brief General-purpose network transport subsystem
19 *
20 * Owns multiple INetworkBackend instances (UDP, TCP, SharedMemory) and
21 * registers a NetworkService into BackendRegistry for decoupled access
22 * by any engine component.
23 *
24 * Structural parallel to InputSubsystem:
25 * - InputSubsystem owns IInputBackend instances (HID, MIDI).
26 * - NetworkSubsystem owns INetworkBackend instances (UDP, TCP, SHM).
27 * - InputSubsystem registers InputService.
28 * - NetworkSubsystem registers NetworkService.
29 *
30 * Key difference: InputSubsystem is receive-only (InputCallback delivers
31 * events to InputManager). NetworkSubsystem is bidirectional. It provides
32 * send and receive through NetworkService, and manages persistent
33 * endpoints rather than device handles.
34 *
35 * Relationship to InputSubsystem's OSC/Serial stubs:
36 * InputSubsystem::initialize_osc_backend() will create an OSCBackend
37 * (IInputBackend) that internally obtains a UDP endpoint through
38 * NetworkService. The OSCBackend handles OSC address parsing and
39 * produces InputValue events. The actual socket is managed here.
40 * Same pattern for Serial if it ever rides over TCP or SHM.
41 *
42 * Endpoint IDs are globally unique across all backends. The subsystem
43 * maintains a transport-to-backend routing map so callers never need
44 * to know which backend handles their endpoint.
45 *
46 * Tokens:
47 * Buffer = NETWORK_BACKEND (new ProcessingToken)
48 * Node = EVENT_RATE (shared with InputSubsystem, async arrival)
49 * Task = EVENT_DRIVEN (shared with InputSubsystem)
50 *
51 * @code
52 * GlobalNetworkConfig config;
53 * config.udp.enabled = true;
54 * config.tcp.enabled = true;
55 *
56 * auto subsystem = std::make_unique<NetworkSubsystem>(config);
57 * subsystem_manager->add_subsystem(SubsystemType::NETWORK, subsystem);
58 * @endcode
59 */
60class MAYAFLUX_API NetworkSubsystem : public ISubsystem {
61public:
62 explicit NetworkSubsystem(const GlobalNetworkConfig& config);
63 ~NetworkSubsystem() override;
64
69
70 // ─────────────────────────────────────────────────────────────────────────
71 // ISubsystem implementation
72 // ─────────────────────────────────────────────────────────────────────────
73
74 void register_callbacks() override;
75 void initialize(SubsystemProcessingHandle& handle) override;
76 void start() override;
77 void pause() override;
78 void resume() override;
79 void stop() override;
80 void shutdown() override;
81
82 [[nodiscard]] SubsystemTokens get_tokens() const override { return m_tokens; }
83 [[nodiscard]] bool is_ready() const override { return m_ready.load(); }
84 [[nodiscard]] bool is_running() const override { return m_running.load(); }
85 [[nodiscard]] SubsystemType get_type() const override { return SubsystemType::NETWORK; }
87
88 // ─────────────────────────────────────────────────────────────────────────
89 // Backend management (mirrors InputSubsystem::add_backend pattern)
90 // ─────────────────────────────────────────────────────────────────────────
91
92 /**
93 * @brief Add a network backend
94 * @param backend Backend instance (takes ownership)
95 * @return true if added successfully
96 *
97 * Only one backend per NetworkTransport type is permitted.
98 */
99 bool add_backend(std::unique_ptr<INetworkBackend> backend);
100
101 /**
102 * @brief Get a backend by transport type
103 */
104 [[nodiscard]] INetworkBackend* get_backend(NetworkTransport transport) const;
105
106 /**
107 * @brief Get all active backends
108 */
109 [[nodiscard]] std::vector<INetworkBackend*> get_backends() const;
110
111 // ─────────────────────────────────────────────────────────────────────────
112 // Endpoint management (routed through to correct backend)
113 // ─────────────────────────────────────────────────────────────────────────
114
115 /**
116 * @brief Open an endpoint on the backend matching info.transport
117 * @param info Endpoint configuration.
118 * @return Globally unique endpoint id, or 0 on failure.
119 */
120 uint64_t open_endpoint(const EndpointInfo& info);
121
122 /**
123 * @brief Close an endpoint
124 */
125 void close_endpoint(uint64_t endpoint_id);
126
127 /**
128 * @brief Send data through an endpoint
129 */
130 bool send(uint64_t endpoint_id, const uint8_t* data, size_t size);
131
132 /**
133 * @brief Send data to a specific address
134 */
135 bool send_to(uint64_t endpoint_id, const uint8_t* data, size_t size,
136 const std::string& address, uint16_t port);
137
138 /**
139 * @brief Query endpoint state
140 */
141 [[nodiscard]] EndpointState get_endpoint_state(uint64_t endpoint_id) const;
142
143 /**
144 * @brief Register a per-endpoint receive callback
145 *
146 * Stored locally and invoked from the backend's receive callback
147 * after endpoint routing.
148 */
149 void set_endpoint_receive_callback(uint64_t endpoint_id,
150 NetworkReceiveCallback callback);
151
152 /**
153 * @brief List all open endpoints across all backends
154 */
155 [[nodiscard]] std::vector<EndpointInfo> get_all_endpoints() const;
156
157private:
159 SubsystemProcessingHandle* m_handle { nullptr };
161
162 std::atomic<bool> m_ready { false };
163 std::atomic<bool> m_running { false };
164
165 // ─── Asio event loop ────────────────────────────────────────────────────
166 //
167 // Single io_context shared by all backends. One jthread runs
168 // io_context::run(). All async operations (send, receive, accept,
169 // connect, reconnect timers) complete on this thread. No per-socket
170 // threads, no per-connection threads, no mutexes on the data path.
171 //
172 // work_guard prevents io_context::run() from returning when there
173 // are no pending operations (idle between endpoint opens).
174 // ────────────────────────────────────────────────────────────────────────
175
176 std::unique_ptr<asio::io_context> m_io_context;
177 std::unique_ptr<asio::executor_work_guard<asio::io_context::executor_type>> m_work_guard;
178
179#if MAYAFLUX_USE_JTHREAD
180 std::jthread m_io_thread;
181#else
182 std::thread m_io_thread;
183 std::atomic<bool> m_io_stop_requested { false };
184#endif
185
186 mutable std::shared_mutex m_backends_mutex;
187 std::unordered_map<NetworkTransport, std::unique_ptr<INetworkBackend>> m_backends;
188
189 std::shared_ptr<Registry::Service::NetworkService> m_network_service;
190
191 /**
192 * @brief Maps endpoint_id -> transport for routing send/close/query
193 * to the correct backend without scanning all backends.
194 */
195 mutable std::shared_mutex m_routing_mutex;
196 std::unordered_map<uint64_t, NetworkTransport> m_endpoint_routing;
197
198 /**
199 * @brief Per-endpoint receive callbacks set by consumers via
200 * set_endpoint_receive_callback(). Backend-level receive
201 * callbacks route here.
202 */
203 mutable std::shared_mutex m_callbacks_mutex;
204 std::unordered_map<uint64_t, NetworkReceiveCallback> m_endpoint_callbacks;
205
206 /**
207 * @brief Global endpoint id counter shared across all backends.
208 * Backends do not assign their own ids. The subsystem assigns
209 * and maps them.
210 */
211 std::atomic<uint64_t> m_next_endpoint_id { 1 };
212
213 void initialize_udp_backend();
214 void initialize_tcp_backend();
215 void initialize_shm_backend();
216
217 void register_backend_service();
218
219 /**
220 * @brief Wired as the receive callback on each backend. Routes
221 * to per-endpoint callbacks in m_endpoint_callbacks.
222 */
223 void on_backend_receive(uint64_t endpoint_id, const uint8_t* data,
224 size_t size, std::string_view sender_addr);
225
226 /**
227 * @brief Wired as the state callback on each backend. Forwards
228 * to any registered state observers.
229 */
230 void on_backend_state_change(const EndpointInfo& info,
231 EndpointState previous, EndpointState current);
232
233 /**
234 * @brief Resolve endpoint_id to its owning backend. Returns nullptr
235 * if the id is unknown.
236 */
237 INetworkBackend* resolve_backend(uint64_t endpoint_id) const;
238};
239
240} // namespace MayaFlux::Core
Range size
Abstract interface for network transport backends.
Base interface for all subsystems in the MayaFlux processing architecture.
Definition Subsystem.hpp:26
std::unique_ptr< asio::io_context > m_io_context
std::shared_mutex m_routing_mutex
Maps endpoint_id -> transport for routing send/close/query to the correct backend without scanning al...
bool is_running() const override
Check if subsystem is currently processing.
std::unique_ptr< asio::executor_work_guard< asio::io_context::executor_type > > m_work_guard
NetworkSubsystem & operator=(const NetworkSubsystem &)=delete
SubsystemProcessingHandle * get_processing_context_handle() override
Get the processing context handle for this subsystem.
std::unordered_map< uint64_t, NetworkTransport > m_endpoint_routing
SubsystemTokens get_tokens() const override
Get the processing token configuration this subsystem manages.
NetworkSubsystem(const NetworkSubsystem &)=delete
std::unordered_map< uint64_t, NetworkReceiveCallback > m_endpoint_callbacks
std::shared_mutex m_callbacks_mutex
Per-endpoint receive callbacks set by consumers via set_endpoint_receive_callback().
NetworkSubsystem & operator=(NetworkSubsystem &&)=delete
bool is_ready() const override
Check if subsystem is ready for operation.
SubsystemType get_type() const override
Get the type of this subsystem.
NetworkSubsystem(NetworkSubsystem &&)=delete
std::unordered_map< NetworkTransport, std::unique_ptr< INetworkBackend > > m_backends
std::shared_ptr< Registry::Service::NetworkService > m_network_service
General-purpose network transport subsystem.
Unified interface combining buffer and node processing for subsystems.
void initialize()
Definition main.cpp:11
EndpointState
Observable connection state for an endpoint.
NetworkTransport
Identifies the transport protocol a backend implements.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
Describes one logical send/receive endpoint managed by a backend.
Configuration for the NetworkSubsystem.
Processing token configuration for subsystem operation.