17 .Buffer = Buffers::ProcessingToken::EVENT_RATE,
18 .Node = Nodes::ProcessingToken::EVENT_RATE,
19 .Task = Vruta::ProcessingToken::EVENT_DRIVEN
21 , m_io_context(
std::make_unique<asio::io_context>())
41 "Initializing Network Subsystem...");
62 "Network Subsystem initialized with {} backend(s)",
m_backends.size());
69 "Cannot start NetworkSubsystem: not initialized");
79 for (
auto& [transport, backend] :
m_backends) {
84 m_work_guard = std::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(
87#if MAYAFLUX_USE_JTHREAD
88 m_io_thread = std::jthread([
this](
const std::stop_token& token) {
89 while (!token.stop_requested()) {
93 } catch (
const std::exception& e) {
95 "IO context exception: {}", e.what());
100 "Network IO thread exiting");
103 m_io_stop_requested.store(
false);
104 m_io_thread = std::thread([
this]() {
105 while (!m_io_stop_requested.load()) {
109 } catch (
const std::exception& e) {
111 "IO context exception: {}", e.what());
116 "Network IO thread exiting");
120 m_running.store(
true);
123 "Network Subsystem started");
126void NetworkSubsystem::pause()
131void NetworkSubsystem::resume()
136void NetworkSubsystem::stop()
138 if (!m_running.load()) {
142 m_running.store(
false);
145 std::shared_lock lock(m_backends_mutex);
146 for (
auto& [transport, backend] : m_backends) {
155 m_work_guard.reset();
156 m_io_context->stop();
158#if MAYAFLUX_USE_JTHREAD
159 if (m_io_thread.joinable()) {
160 m_io_thread.request_stop();
164 m_io_stop_requested.store(
true);
165 if (m_io_thread.joinable()) {
170 m_io_context->restart();
173 "Network Subsystem stopped");
176void NetworkSubsystem::shutdown()
181 std::unique_lock lock(m_routing_mutex);
182 m_endpoint_routing.clear();
186 std::unique_lock lock(m_callbacks_mutex);
187 m_endpoint_callbacks.clear();
191 std::unique_lock lock(m_backends_mutex);
192 for (
auto& [transport, backend] : m_backends) {
202 if (m_network_service) {
205 m_network_service.reset();
208 m_ready.store(
false);
211 "Network Subsystem shutdown complete");
218bool NetworkSubsystem::add_backend(std::unique_ptr<INetworkBackend> backend)
226 std::unique_lock lock(m_backends_mutex);
228 if (m_backends.contains(transport)) {
230 "Network backend {} already registered", backend->get_name());
234 if (!backend->initialize()) {
236 "Failed to initialize network backend: {}", backend->get_name());
240 backend->set_receive_callback(
241 [
this](uint64_t
id,
const uint8_t* data,
size_t size, std::string_view addr) {
242 on_backend_receive(
id, data,
size, addr);
245 backend->set_state_callback(
247 on_backend_state_change(info, prev, curr);
250 if (
auto* tcp =
dynamic_cast<TCPBackend*
>(backend.get())) {
251 tcp->set_endpoint_id_allocator([
this]() -> uint64_t {
252 return m_next_endpoint_id.fetch_add(1);
257 "Added network backend: {}", backend->get_name());
259 m_backends[transport] = std::move(backend);
265 std::shared_lock lock(m_backends_mutex);
266 auto it = m_backends.find(transport);
267 return (it != m_backends.end()) ? it->second.get() :
nullptr;
270std::vector<INetworkBackend*> NetworkSubsystem::get_backends()
const
272 std::shared_lock lock(m_backends_mutex);
273 std::vector<INetworkBackend*> result;
274 result.reserve(m_backends.size());
275 for (
const auto& [transport, backend] : m_backends) {
276 result.push_back(backend.get());
287 auto* backend = get_backend(info.
transport);
290 "No backend for transport {}",
static_cast<int>(info.
transport));
295 ep.
id = m_next_endpoint_id.fetch_add(1);
297 uint64_t backend_id = backend->open_endpoint(ep);
298 if (backend_id == 0) {
303 std::unique_lock lock(m_routing_mutex);
310void NetworkSubsystem::close_endpoint(uint64_t endpoint_id)
312 auto* backend = resolve_backend(endpoint_id);
317 backend->close_endpoint(endpoint_id);
320 std::unique_lock lock(m_routing_mutex);
321 m_endpoint_routing.erase(endpoint_id);
325 std::unique_lock lock(m_callbacks_mutex);
326 m_endpoint_callbacks.erase(endpoint_id);
330bool NetworkSubsystem::send(uint64_t endpoint_id,
const uint8_t* data,
size_t size)
332 auto* backend = resolve_backend(endpoint_id);
336 return backend->send(endpoint_id, data,
size);
339bool NetworkSubsystem::send_to(uint64_t endpoint_id,
const uint8_t* data,
size_t size,
340 const std::string& address, uint16_t port)
342 auto* backend = resolve_backend(endpoint_id);
346 return backend->send_to(endpoint_id, data,
size, address, port);
349EndpointState NetworkSubsystem::get_endpoint_state(uint64_t endpoint_id)
const
351 auto* backend = resolve_backend(endpoint_id);
353 return EndpointState::CLOSED;
355 return backend->get_endpoint_state(endpoint_id);
358void NetworkSubsystem::set_endpoint_receive_callback(uint64_t endpoint_id,
361 std::unique_lock lock(m_callbacks_mutex);
362 m_endpoint_callbacks[endpoint_id] = std::move(callback);
365std::vector<EndpointInfo> NetworkSubsystem::get_all_endpoints()
const
367 std::shared_lock lock(m_backends_mutex);
368 std::vector<EndpointInfo> result;
369 for (
const auto& [transport, backend] : m_backends) {
370 auto eps = backend->get_endpoints();
371 result.insert(result.end(), eps.begin(), eps.end());
380void NetworkSubsystem::initialize_udp_backend()
382 auto udp = std::make_unique<UDPBackend>(m_config.udp, *m_io_context);
383 add_backend(std::move(udp));
386void NetworkSubsystem::initialize_tcp_backend()
388 auto tcp = std::make_unique<TCPBackend>(m_config.tcp, *m_io_context);
389 add_backend(std::move(tcp));
392void NetworkSubsystem::initialize_shm_backend()
395 "SharedMemory backend not yet implemented");
398void NetworkSubsystem::register_backend_service()
402 auto service = std::make_shared<Registry::Service::NetworkService>();
404 service->open_endpoint = [
this](
const EndpointInfo& info) {
405 return open_endpoint(info);
408 service->close_endpoint = [
this](uint64_t
id) {
412 service->send = [
this](uint64_t
id,
const uint8_t* data,
size_t size) {
413 return send(
id, data,
size);
416 service->send_to = [
this](uint64_t
id,
const uint8_t* data,
size_t size,
417 const std::string& addr, uint16_t port) {
418 return send_to(
id, data,
size, addr, port);
421 service->get_endpoint_state = [
this](uint64_t
id) {
422 return get_endpoint_state(
id);
426 set_endpoint_receive_callback(
id, std::move(cb));
429 service->get_all_endpoints = [
this]() {
430 return get_all_endpoints();
433 m_network_service = service;
436 [service]() ->
void* {
437 return service.get();
445void NetworkSubsystem::on_backend_receive(uint64_t endpoint_id,
const uint8_t* data,
446 size_t size, std::string_view sender_addr)
448 std::shared_lock lock(m_callbacks_mutex);
449 auto it = m_endpoint_callbacks.find(endpoint_id);
450 if (it != m_endpoint_callbacks.end() && it->second) {
451 it->second(endpoint_id, data,
size, sender_addr);
455void NetworkSubsystem::on_backend_state_change(
const EndpointInfo& info,
459 "Endpoint {} state: {} -> {}",
460 info.
id,
static_cast<int>(previous),
static_cast<int>(current));
471 std::shared_lock lock(m_routing_mutex);
472 auto it = m_endpoint_routing.find(endpoint_id);
473 if (it == m_endpoint_routing.end()) {
476 transport = it->second;
479 return get_backend(transport);
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
Abstract interface for network transport backends.
std::unique_ptr< asio::io_context > m_io_context
void shutdown() override
Shutdown and cleanup subsystem resources.
GlobalNetworkConfig m_config
std::shared_mutex m_backends_mutex
void register_backend_service()
void initialize_tcp_backend()
~NetworkSubsystem() override
std::unique_ptr< asio::executor_work_guard< asio::io_context::executor_type > > m_work_guard
void initialize_shm_backend()
void initialize(SubsystemProcessingHandle &handle) override
Initialize with a handle provided by SubsystemManager.
SubsystemProcessingHandle * m_handle
std::atomic< bool > m_ready
void start() override
Start the subsystem's processing/event loops.
NetworkSubsystem(const GlobalNetworkConfig &config)
void register_callbacks() override
Register callback hooks for this domain.
void initialize_udp_backend()
std::unordered_map< NetworkTransport, std::unique_ptr< INetworkBackend > > m_backends
std::atomic< bool > m_running
std::shared_ptr< Registry::Service::NetworkService > m_network_service
Unified interface combining buffer and node processing for subsystems.
Connection-oriented reliable stream transport over TCP via standalone Asio.
static BackendRegistry & instance()
Get the global registry instance.
void unregister_service()
Unregister a service.
EndpointState
Observable connection state for an endpoint.
NetworkTransport
Identifies the transport protocol a backend implements.
std::function< void(uint64_t endpoint_id, const uint8_t *data, size_t size, std::string_view sender_addr)> NetworkReceiveCallback
Callback signature for inbound data on an endpoint.
@ Shutdown
Engine/subsystem shutdown and cleanup.
@ NetworkSubsystem
Network subsystem operations (endpoint management, data routing)
@ Init
Engine/subsystem initialization.
@ Networking
Network operations (data transfer, protocol handling)
@ Core
Core engine, backend, subsystems.
void stop()
Stop active Portal::Network operations.
bool initialize(Registry::Service::NetworkService *service)
Initialize Portal::Network.
void shutdown()
Shutdown Portal::Network and release all resources.
bool is_initialized()
Return true if Portal::Network has been initialized.
NetworkTransport transport
Describes one logical send/receive endpoint managed by a backend.
SharedMemoryBackendInfo shared_memory
Configuration for the NetworkSubsystem.
Backend network transport service interface.