MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
NetworkSubsystem.cpp
Go to the documentation of this file.
2
5
9
11
12namespace MayaFlux::Core {
13
15 : m_config(config)
16 , m_tokens {
17 .Buffer = Buffers::ProcessingToken::EVENT_RATE,
18 .Node = Nodes::ProcessingToken::EVENT_RATE,
19 .Task = Vruta::ProcessingToken::EVENT_DRIVEN
20 }
21 , m_io_context(std::make_unique<asio::io_context>())
22{
23}
24
29
30// ─────────────────────────────────────────────────────────────────────────────
31// ISubsystem lifecycle
32// ─────────────────────────────────────────────────────────────────────────────
33
37
39{
41 "Initializing Network Subsystem...");
42
43 m_handle = &handle;
44
45 if (m_config.udp.enabled) {
47 }
48 if (m_config.tcp.enabled) {
50 }
53 }
54
56
58
59 m_ready.store(true);
60
62 "Network Subsystem initialized with {} backend(s)", m_backends.size());
63}
64
66{
67 if (!m_ready.load()) {
69 "Cannot start NetworkSubsystem: not initialized");
70 return;
71 }
72
73 if (m_running.load()) {
74 return;
75 }
76
77 {
78 std::shared_lock lock(m_backends_mutex);
79 for (auto& [transport, backend] : m_backends) {
80 backend->start();
81 }
82 }
83
84 m_work_guard = std::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(
85 m_io_context->get_executor());
86
87#if MAYAFLUX_USE_JTHREAD
88 m_io_thread = std::jthread([this](const std::stop_token& token) {
89 while (!token.stop_requested()) {
90 try {
91 m_io_context->run();
92 break;
93 } catch (const std::exception& e) {
95 "IO context exception: {}", e.what());
96 }
97 }
98
100 "Network IO thread exiting");
101 });
102#else
103 m_io_stop_requested.store(false);
104 m_io_thread = std::thread([this]() {
105 while (!m_io_stop_requested.load()) {
106 try {
107 m_io_context->run();
108 break;
109 } catch (const std::exception& e) {
111 "IO context exception: {}", e.what());
112 }
113 }
114
116 "Network IO thread exiting");
117 });
118#endif
119
120 m_running.store(true);
121
123 "Network Subsystem started");
124}
125
126void NetworkSubsystem::pause()
127{
128 stop();
129}
130
131void NetworkSubsystem::resume()
132{
133 start();
134}
135
136void NetworkSubsystem::stop()
137{
138 if (!m_running.load()) {
139 return;
140 }
141
142 m_running.store(false);
143
144 {
145 std::shared_lock lock(m_backends_mutex);
146 for (auto& [transport, backend] : m_backends) {
147 backend->stop();
148 }
149 }
150
153 }
154
155 m_work_guard.reset();
156 m_io_context->stop();
157
158#if MAYAFLUX_USE_JTHREAD
159 if (m_io_thread.joinable()) {
160 m_io_thread.request_stop();
161 m_io_thread.join();
162 }
163#else
164 m_io_stop_requested.store(true);
165 if (m_io_thread.joinable()) {
166 m_io_thread.join();
167 }
168#endif
169
170 m_io_context->restart();
171
173 "Network Subsystem stopped");
174}
175
176void NetworkSubsystem::shutdown()
177{
178 stop();
179
180 {
181 std::unique_lock lock(m_routing_mutex);
182 m_endpoint_routing.clear();
183 }
184
185 {
186 std::unique_lock lock(m_callbacks_mutex);
187 m_endpoint_callbacks.clear();
188 }
189
190 {
191 std::unique_lock lock(m_backends_mutex);
192 for (auto& [transport, backend] : m_backends) {
193 backend->shutdown();
194 }
195 m_backends.clear();
196 }
197
200 }
201
202 if (m_network_service) {
205 m_network_service.reset();
206 }
207
208 m_ready.store(false);
209
211 "Network Subsystem shutdown complete");
212}
213
214void NetworkSubsystem::wait_until_running()
215{
216 while (!m_running.load(std::memory_order_acquire))
217 std::this_thread::yield();
218}
219
220// ─────────────────────────────────────────────────────────────────────────────
221// Backend management
222// ─────────────────────────────────────────────────────────────────────────────
223
224bool NetworkSubsystem::add_backend(std::unique_ptr<INetworkBackend> backend)
225{
226 if (!backend) {
227 return false;
228 }
229
230 NetworkTransport transport = backend->get_transport();
231
232 std::unique_lock lock(m_backends_mutex);
233
234 if (m_backends.contains(transport)) {
236 "Network backend {} already registered", backend->get_name());
237 return false;
238 }
239
240 if (!backend->initialize()) {
242 "Failed to initialize network backend: {}", backend->get_name());
243 return false;
244 }
245
246 backend->set_receive_callback(
247 [this](uint64_t id, const uint8_t* data, size_t size, std::string_view addr) {
248 on_backend_receive(id, data, size, addr);
249 });
250
251 backend->set_state_callback(
252 [this](const EndpointInfo& info, EndpointState prev, EndpointState curr) {
253 on_backend_state_change(info, prev, curr);
254 });
255
256 if (auto* tcp = dynamic_cast<TCPBackend*>(backend.get())) {
257 tcp->set_endpoint_id_allocator([this]() -> uint64_t {
258 return m_next_endpoint_id.fetch_add(1);
259 });
260 }
261
263 "Added network backend: {}", backend->get_name());
264
265 m_backends[transport] = std::move(backend);
266 return true;
267}
268
269INetworkBackend* NetworkSubsystem::get_backend(NetworkTransport transport) const
270{
271 std::shared_lock lock(m_backends_mutex);
272 auto it = m_backends.find(transport);
273 return (it != m_backends.end()) ? it->second.get() : nullptr;
274}
275
276std::vector<INetworkBackend*> NetworkSubsystem::get_backends() const
277{
278 std::shared_lock lock(m_backends_mutex);
279 std::vector<INetworkBackend*> result;
280 result.reserve(m_backends.size());
281 for (const auto& [transport, backend] : m_backends) {
282 result.push_back(backend.get());
283 }
284 return result;
285}
286
287// ─────────────────────────────────────────────────────────────────────────────
288// Endpoint management
289// ─────────────────────────────────────────────────────────────────────────────
290
291uint64_t NetworkSubsystem::open_endpoint(const EndpointInfo& info)
292{
293 auto* backend = get_backend(info.transport);
294 if (!backend) {
296 "No backend for transport {}", static_cast<int>(info.transport));
297 return 0;
298 }
299
300 EndpointInfo ep = info;
301 ep.id = m_next_endpoint_id.fetch_add(1);
302
303 uint64_t backend_id = backend->open_endpoint(ep);
304 if (backend_id == 0) {
305 return 0;
306 }
307
308 {
309 std::unique_lock lock(m_routing_mutex);
310 m_endpoint_routing[ep.id] = info.transport;
311 }
312
313 return ep.id;
314}
315
316void NetworkSubsystem::close_endpoint(uint64_t endpoint_id)
317{
318 auto* backend = resolve_backend(endpoint_id);
319 if (!backend) {
320 return;
321 }
322
323 backend->close_endpoint(endpoint_id);
324
325 {
326 std::unique_lock lock(m_routing_mutex);
327 m_endpoint_routing.erase(endpoint_id);
328 }
329
330 {
331 std::unique_lock lock(m_callbacks_mutex);
332 m_endpoint_callbacks.erase(endpoint_id);
333 }
334}
335
336bool NetworkSubsystem::send(uint64_t endpoint_id, const uint8_t* data, size_t size)
337{
338 auto* backend = resolve_backend(endpoint_id);
339 if (!backend) {
340 return false;
341 }
342 return backend->send(endpoint_id, data, size);
343}
344
345bool NetworkSubsystem::send_to(uint64_t endpoint_id, const uint8_t* data, size_t size,
346 const std::string& address, uint16_t port)
347{
348 auto* backend = resolve_backend(endpoint_id);
349 if (!backend) {
350 return false;
351 }
352 return backend->send_to(endpoint_id, data, size, address, port);
353}
354
355EndpointState NetworkSubsystem::get_endpoint_state(uint64_t endpoint_id) const
356{
357 auto* backend = resolve_backend(endpoint_id);
358 if (!backend) {
359 return EndpointState::CLOSED;
360 }
361 return backend->get_endpoint_state(endpoint_id);
362}
363
364void NetworkSubsystem::set_endpoint_receive_callback(uint64_t endpoint_id,
365 NetworkReceiveCallback callback)
366{
367 std::unique_lock lock(m_callbacks_mutex);
368 m_endpoint_callbacks[endpoint_id] = std::move(callback);
369}
370
371std::vector<EndpointInfo> NetworkSubsystem::get_all_endpoints() const
372{
373 std::shared_lock lock(m_backends_mutex);
374 std::vector<EndpointInfo> result;
375 for (const auto& [transport, backend] : m_backends) {
376 auto eps = backend->get_endpoints();
377 result.insert(result.end(), eps.begin(), eps.end());
378 }
379 return result;
380}
381
382// ─────────────────────────────────────────────────────────────────────────────
383// Private: backend initialisation
384// ─────────────────────────────────────────────────────────────────────────────
385
386void NetworkSubsystem::initialize_udp_backend()
387{
388 auto udp = std::make_unique<UDPBackend>(m_config.udp, *m_io_context);
389 add_backend(std::move(udp));
390}
391
392void NetworkSubsystem::initialize_tcp_backend()
393{
394 auto tcp = std::make_unique<TCPBackend>(m_config.tcp, *m_io_context);
395 add_backend(std::move(tcp));
396}
397
398void NetworkSubsystem::initialize_shm_backend()
399{
401 "SharedMemory backend not yet implemented");
402}
403
404void NetworkSubsystem::register_backend_service()
405{
406 auto& registry = Registry::BackendRegistry::instance();
407
408 auto service = std::make_shared<Registry::Service::NetworkService>();
409
410 service->open_endpoint = [this](const EndpointInfo& info) {
411 return open_endpoint(info);
412 };
413
414 service->close_endpoint = [this](uint64_t id) {
415 close_endpoint(id);
416 };
417
418 service->send = [this](uint64_t id, const uint8_t* data, size_t size) {
419 return send(id, data, size);
420 };
421
422 service->send_to = [this](uint64_t id, const uint8_t* data, size_t size,
423 const std::string& addr, uint16_t port) {
424 return send_to(id, data, size, addr, port);
425 };
426
427 service->get_endpoint_state = [this](uint64_t id) {
428 return get_endpoint_state(id);
429 };
430
431 service->set_endpoint_receive_callback = [this](uint64_t id, NetworkReceiveCallback cb) {
432 set_endpoint_receive_callback(id, std::move(cb));
433 };
434
435 service->get_all_endpoints = [this]() {
436 return get_all_endpoints();
437 };
438
439 m_network_service = service;
440
441 registry.register_service<Registry::Service::NetworkService>(
442 [service]() -> void* {
443 return service.get();
444 });
445}
446
447// ─────────────────────────────────────────────────────────────────────────────
448// Private: callback routing
449// ─────────────────────────────────────────────────────────────────────────────
450
451void NetworkSubsystem::on_backend_receive(uint64_t endpoint_id, const uint8_t* data,
452 size_t size, std::string_view sender_addr)
453{
454 std::shared_lock lock(m_callbacks_mutex);
455 auto it = m_endpoint_callbacks.find(endpoint_id);
456 if (it != m_endpoint_callbacks.end() && it->second) {
457 it->second(endpoint_id, data, size, sender_addr);
458 }
459}
460
461void NetworkSubsystem::on_backend_state_change(const EndpointInfo& info,
463{
465 "Endpoint {} state: {} -> {}",
466 info.id, static_cast<int>(previous), static_cast<int>(current));
467}
468
469// ─────────────────────────────────────────────────────────────────────────────
470// Private: routing
471// ─────────────────────────────────────────────────────────────────────────────
472
473INetworkBackend* NetworkSubsystem::resolve_backend(uint64_t endpoint_id) const
474{
475 NetworkTransport transport {};
476 {
477 std::shared_lock lock(m_routing_mutex);
478 auto it = m_endpoint_routing.find(endpoint_id);
479 if (it == m_endpoint_routing.end()) {
480 return nullptr;
481 }
482 transport = it->second;
483 }
484
485 return get_backend(transport);
486}
487
488} // namespace MayaFlux::Core
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
glm::vec2 current
Abstract interface for network transport backends.
std::unique_ptr< asio::io_context > m_io_context
void shutdown() override
Shutdown and cleanup subsystem resources.
std::unique_ptr< asio::executor_work_guard< asio::io_context::executor_type > > m_work_guard
void initialize(SubsystemProcessingHandle &handle) override
Initialize with a handle provided by SubsystemManager.
SubsystemProcessingHandle * m_handle
void start() override
Start the subsystem's processing/event loops.
NetworkSubsystem(const GlobalNetworkConfig &config)
void register_callbacks() override
Register callback hooks for this domain.
std::unordered_map< NetworkTransport, std::unique_ptr< INetworkBackend > > m_backends
std::shared_ptr< Registry::Service::NetworkService > m_network_service
Unified interface combining buffer and node processing for subsystems.
Connection-oriented reliable stream transport over TCP via standalone Asio.
static BackendRegistry & instance()
Get the global registry instance.
void unregister_service()
Unregister a service.
EndpointState
Observable connection state for an endpoint.
NetworkTransport
Identifies the transport protocol a backend implements.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
@ Shutdown
Engine/subsystem shutdown and cleanup.
@ NetworkSubsystem
Network subsystem operations (endpoint management, data routing)
@ Init
Engine/subsystem initialization.
@ Networking
Network operations (data transfer, protocol handling)
@ Core
Core engine, backend, subsystems.
void stop()
Stop active Portal::Network operations.
Definition Network.cpp:36
bool initialize(Registry::Service::NetworkService *service)
Initialize Portal::Network.
Definition Network.cpp:11
void shutdown()
Shutdown Portal::Network and release all resources.
Definition Network.cpp:50
bool is_initialized()
Return true if Portal::Network has been initialized.
Definition Network.cpp:66
Describes one logical send/receive endpoint managed by a backend.
Configuration for the NetworkSubsystem.
Backend network transport service interface.