MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
NetworkSubsystem.cpp
Go to the documentation of this file.
2
5
9
11
12namespace MayaFlux::Core {
13
15 : m_config(config)
16 , m_tokens {
17 .Buffer = Buffers::ProcessingToken::EVENT_RATE,
18 .Node = Nodes::ProcessingToken::EVENT_RATE,
19 .Task = Vruta::ProcessingToken::EVENT_DRIVEN
20 }
21 , m_io_context(std::make_unique<asio::io_context>())
22{
23}
24
29
30// ─────────────────────────────────────────────────────────────────────────────
31// ISubsystem lifecycle
32// ─────────────────────────────────────────────────────────────────────────────
33
37
39{
41 "Initializing Network Subsystem...");
42
43 m_handle = &handle;
44
45 if (m_config.udp.enabled) {
47 }
48 if (m_config.tcp.enabled) {
50 }
53 }
54
56
58
59 m_ready.store(true);
60
62 "Network Subsystem initialized with {} backend(s)", m_backends.size());
63}
64
66{
67 if (!m_ready.load()) {
69 "Cannot start NetworkSubsystem: not initialized");
70 return;
71 }
72
73 if (m_running.load()) {
74 return;
75 }
76
77 {
78 std::shared_lock lock(m_backends_mutex);
79 for (auto& [transport, backend] : m_backends) {
80 backend->start();
81 }
82 }
83
84 m_work_guard = std::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(
85 m_io_context->get_executor());
86
87#if MAYAFLUX_USE_JTHREAD
88 m_io_thread = std::jthread([this](const std::stop_token& token) {
89 while (!token.stop_requested()) {
90 try {
91 m_io_context->run();
92 break;
93 } catch (const std::exception& e) {
95 "IO context exception: {}", e.what());
96 }
97 }
98
100 "Network IO thread exiting");
101 });
102#else
103 m_io_stop_requested.store(false);
104 m_io_thread = std::thread([this]() {
105 while (!m_io_stop_requested.load()) {
106 try {
107 m_io_context->run();
108 break;
109 } catch (const std::exception& e) {
111 "IO context exception: {}", e.what());
112 }
113 }
114
116 "Network IO thread exiting");
117 });
118#endif
119
120 m_running.store(true);
121
123 "Network Subsystem started");
124}
125
126void NetworkSubsystem::pause()
127{
128 stop();
129}
130
131void NetworkSubsystem::resume()
132{
133 start();
134}
135
136void NetworkSubsystem::stop()
137{
138 if (!m_running.load()) {
139 return;
140 }
141
142 m_running.store(false);
143
144 {
145 std::shared_lock lock(m_backends_mutex);
146 for (auto& [transport, backend] : m_backends) {
147 backend->stop();
148 }
149 }
150
153 }
154
155 m_work_guard.reset();
156 m_io_context->stop();
157
158#if MAYAFLUX_USE_JTHREAD
159 if (m_io_thread.joinable()) {
160 m_io_thread.request_stop();
161 m_io_thread.join();
162 }
163#else
164 m_io_stop_requested.store(true);
165 if (m_io_thread.joinable()) {
166 m_io_thread.join();
167 }
168#endif
169
170 m_io_context->restart();
171
173 "Network Subsystem stopped");
174}
175
176void NetworkSubsystem::shutdown()
177{
178 stop();
179
180 {
181 std::unique_lock lock(m_routing_mutex);
182 m_endpoint_routing.clear();
183 }
184
185 {
186 std::unique_lock lock(m_callbacks_mutex);
187 m_endpoint_callbacks.clear();
188 }
189
190 {
191 std::unique_lock lock(m_backends_mutex);
192 for (auto& [transport, backend] : m_backends) {
193 backend->shutdown();
194 }
195 m_backends.clear();
196 }
197
200 }
201
202 if (m_network_service) {
205 m_network_service.reset();
206 }
207
208 m_ready.store(false);
209
211 "Network Subsystem shutdown complete");
212}
213
214// ─────────────────────────────────────────────────────────────────────────────
215// Backend management
216// ─────────────────────────────────────────────────────────────────────────────
217
218bool NetworkSubsystem::add_backend(std::unique_ptr<INetworkBackend> backend)
219{
220 if (!backend) {
221 return false;
222 }
223
224 NetworkTransport transport = backend->get_transport();
225
226 std::unique_lock lock(m_backends_mutex);
227
228 if (m_backends.contains(transport)) {
230 "Network backend {} already registered", backend->get_name());
231 return false;
232 }
233
234 if (!backend->initialize()) {
236 "Failed to initialize network backend: {}", backend->get_name());
237 return false;
238 }
239
240 backend->set_receive_callback(
241 [this](uint64_t id, const uint8_t* data, size_t size, std::string_view addr) {
242 on_backend_receive(id, data, size, addr);
243 });
244
245 backend->set_state_callback(
246 [this](const EndpointInfo& info, EndpointState prev, EndpointState curr) {
247 on_backend_state_change(info, prev, curr);
248 });
249
250 if (auto* tcp = dynamic_cast<TCPBackend*>(backend.get())) {
251 tcp->set_endpoint_id_allocator([this]() -> uint64_t {
252 return m_next_endpoint_id.fetch_add(1);
253 });
254 }
255
257 "Added network backend: {}", backend->get_name());
258
259 m_backends[transport] = std::move(backend);
260 return true;
261}
262
263INetworkBackend* NetworkSubsystem::get_backend(NetworkTransport transport) const
264{
265 std::shared_lock lock(m_backends_mutex);
266 auto it = m_backends.find(transport);
267 return (it != m_backends.end()) ? it->second.get() : nullptr;
268}
269
270std::vector<INetworkBackend*> NetworkSubsystem::get_backends() const
271{
272 std::shared_lock lock(m_backends_mutex);
273 std::vector<INetworkBackend*> result;
274 result.reserve(m_backends.size());
275 for (const auto& [transport, backend] : m_backends) {
276 result.push_back(backend.get());
277 }
278 return result;
279}
280
281// ─────────────────────────────────────────────────────────────────────────────
282// Endpoint management
283// ─────────────────────────────────────────────────────────────────────────────
284
285uint64_t NetworkSubsystem::open_endpoint(const EndpointInfo& info)
286{
287 auto* backend = get_backend(info.transport);
288 if (!backend) {
290 "No backend for transport {}", static_cast<int>(info.transport));
291 return 0;
292 }
293
294 EndpointInfo ep = info;
295 ep.id = m_next_endpoint_id.fetch_add(1);
296
297 uint64_t backend_id = backend->open_endpoint(ep);
298 if (backend_id == 0) {
299 return 0;
300 }
301
302 {
303 std::unique_lock lock(m_routing_mutex);
304 m_endpoint_routing[ep.id] = info.transport;
305 }
306
307 return ep.id;
308}
309
310void NetworkSubsystem::close_endpoint(uint64_t endpoint_id)
311{
312 auto* backend = resolve_backend(endpoint_id);
313 if (!backend) {
314 return;
315 }
316
317 backend->close_endpoint(endpoint_id);
318
319 {
320 std::unique_lock lock(m_routing_mutex);
321 m_endpoint_routing.erase(endpoint_id);
322 }
323
324 {
325 std::unique_lock lock(m_callbacks_mutex);
326 m_endpoint_callbacks.erase(endpoint_id);
327 }
328}
329
330bool NetworkSubsystem::send(uint64_t endpoint_id, const uint8_t* data, size_t size)
331{
332 auto* backend = resolve_backend(endpoint_id);
333 if (!backend) {
334 return false;
335 }
336 return backend->send(endpoint_id, data, size);
337}
338
339bool NetworkSubsystem::send_to(uint64_t endpoint_id, const uint8_t* data, size_t size,
340 const std::string& address, uint16_t port)
341{
342 auto* backend = resolve_backend(endpoint_id);
343 if (!backend) {
344 return false;
345 }
346 return backend->send_to(endpoint_id, data, size, address, port);
347}
348
349EndpointState NetworkSubsystem::get_endpoint_state(uint64_t endpoint_id) const
350{
351 auto* backend = resolve_backend(endpoint_id);
352 if (!backend) {
353 return EndpointState::CLOSED;
354 }
355 return backend->get_endpoint_state(endpoint_id);
356}
357
358void NetworkSubsystem::set_endpoint_receive_callback(uint64_t endpoint_id,
359 NetworkReceiveCallback callback)
360{
361 std::unique_lock lock(m_callbacks_mutex);
362 m_endpoint_callbacks[endpoint_id] = std::move(callback);
363}
364
365std::vector<EndpointInfo> NetworkSubsystem::get_all_endpoints() const
366{
367 std::shared_lock lock(m_backends_mutex);
368 std::vector<EndpointInfo> result;
369 for (const auto& [transport, backend] : m_backends) {
370 auto eps = backend->get_endpoints();
371 result.insert(result.end(), eps.begin(), eps.end());
372 }
373 return result;
374}
375
376// ─────────────────────────────────────────────────────────────────────────────
377// Private: backend initialisation
378// ─────────────────────────────────────────────────────────────────────────────
379
380void NetworkSubsystem::initialize_udp_backend()
381{
382 auto udp = std::make_unique<UDPBackend>(m_config.udp, *m_io_context);
383 add_backend(std::move(udp));
384}
385
386void NetworkSubsystem::initialize_tcp_backend()
387{
388 auto tcp = std::make_unique<TCPBackend>(m_config.tcp, *m_io_context);
389 add_backend(std::move(tcp));
390}
391
392void NetworkSubsystem::initialize_shm_backend()
393{
395 "SharedMemory backend not yet implemented");
396}
397
398void NetworkSubsystem::register_backend_service()
399{
400 auto& registry = Registry::BackendRegistry::instance();
401
402 auto service = std::make_shared<Registry::Service::NetworkService>();
403
404 service->open_endpoint = [this](const EndpointInfo& info) {
405 return open_endpoint(info);
406 };
407
408 service->close_endpoint = [this](uint64_t id) {
409 close_endpoint(id);
410 };
411
412 service->send = [this](uint64_t id, const uint8_t* data, size_t size) {
413 return send(id, data, size);
414 };
415
416 service->send_to = [this](uint64_t id, const uint8_t* data, size_t size,
417 const std::string& addr, uint16_t port) {
418 return send_to(id, data, size, addr, port);
419 };
420
421 service->get_endpoint_state = [this](uint64_t id) {
422 return get_endpoint_state(id);
423 };
424
425 service->set_endpoint_receive_callback = [this](uint64_t id, NetworkReceiveCallback cb) {
426 set_endpoint_receive_callback(id, std::move(cb));
427 };
428
429 service->get_all_endpoints = [this]() {
430 return get_all_endpoints();
431 };
432
433 m_network_service = service;
434
435 registry.register_service<Registry::Service::NetworkService>(
436 [service]() -> void* {
437 return service.get();
438 });
439}
440
441// ─────────────────────────────────────────────────────────────────────────────
442// Private: callback routing
443// ─────────────────────────────────────────────────────────────────────────────
444
445void NetworkSubsystem::on_backend_receive(uint64_t endpoint_id, const uint8_t* data,
446 size_t size, std::string_view sender_addr)
447{
448 std::shared_lock lock(m_callbacks_mutex);
449 auto it = m_endpoint_callbacks.find(endpoint_id);
450 if (it != m_endpoint_callbacks.end() && it->second) {
451 it->second(endpoint_id, data, size, sender_addr);
452 }
453}
454
455void NetworkSubsystem::on_backend_state_change(const EndpointInfo& info,
456 EndpointState previous, EndpointState current)
457{
459 "Endpoint {} state: {} -> {}",
460 info.id, static_cast<int>(previous), static_cast<int>(current));
461}
462
463// ─────────────────────────────────────────────────────────────────────────────
464// Private: routing
465// ─────────────────────────────────────────────────────────────────────────────
466
467INetworkBackend* NetworkSubsystem::resolve_backend(uint64_t endpoint_id) const
468{
469 NetworkTransport transport {};
470 {
471 std::shared_lock lock(m_routing_mutex);
472 auto it = m_endpoint_routing.find(endpoint_id);
473 if (it == m_endpoint_routing.end()) {
474 return nullptr;
475 }
476 transport = it->second;
477 }
478
479 return get_backend(transport);
480}
481
482} // namespace MayaFlux::Core
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
Range size
uint32_t id
Abstract interface for network transport backends.
std::unique_ptr< asio::io_context > m_io_context
void shutdown() override
Shutdown and cleanup subsystem resources.
std::unique_ptr< asio::executor_work_guard< asio::io_context::executor_type > > m_work_guard
void initialize(SubsystemProcessingHandle &handle) override
Initialize with a handle provided by SubsystemManager.
SubsystemProcessingHandle * m_handle
void start() override
Start the subsystem's processing/event loops.
NetworkSubsystem(const GlobalNetworkConfig &config)
void register_callbacks() override
Register callback hooks for this domain.
std::unordered_map< NetworkTransport, std::unique_ptr< INetworkBackend > > m_backends
std::shared_ptr< Registry::Service::NetworkService > m_network_service
Unified interface combining buffer and node processing for subsystems.
Connection-oriented reliable stream transport over TCP via standalone Asio.
static BackendRegistry & instance()
Get the global registry instance.
void unregister_service()
Unregister a service.
EndpointState
Observable connection state for an endpoint.
NetworkTransport
Identifies the transport protocol a backend implements.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
@ Shutdown
Engine/subsystem shutdown and cleanup.
@ NetworkSubsystem
Network subsystem operations (endpoint management, data routing)
@ Init
Engine/subsystem initialization.
@ Networking
Network operations (data transfer, protocol handling)
@ Core
Core engine, backend, subsystems.
void stop()
Stop active Portal::Network operations.
Definition Network.cpp:36
bool initialize(Registry::Service::NetworkService *service)
Initialize Portal::Network.
Definition Network.cpp:11
void shutdown()
Shutdown Portal::Network and release all resources.
Definition Network.cpp:50
bool is_initialized()
Return true if Portal::Network has been initialized.
Definition Network.cpp:66
Describes one logical send/receive endpoint managed by a backend.
Configuration for the NetworkSubsystem.
Backend network transport service interface.