MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
NetworkSource.hpp
Go to the documentation of this file.
1#pragma once
2
5
6namespace MayaFlux::Kriya {
7class NetworkAwaiter;
8}
9
10namespace MayaFlux::Vruta {
11
12/**
13 * @class NetworkSource
14 * @brief Awaitable broadcast message stream for a network endpoint
15 *
16 * Opens and owns a network endpoint on construction. Any number of
17 * coroutines may co_await the same source simultaneously via next_message().
18 * signal() broadcasts to all registered waiters on every incoming message.
19 *
20 * Fully lock-free on all paths. Waiter registration uses an intrusive
21 * singly-linked list with an atomic head. signal() atomically detaches
22 * the entire list and resumes each awaiter. The message queue is a
23 * lock-free ring buffer shared by all waiters — each awaiter pops its
24 * own copy on resumption.
25 *
26 * Endpoint lifecycle is internal. BackendRegistry is consulted in the
27 * constructor and destructor; the caller never touches NetworkService.
28 *
29 * @code
30 * Vruta::NetworkSource source({ .transport = NetworkTransport::UDP, .local_port = 8000 });
31 *
32 * auto handler = [&source]() -> Vruta::Event {
33 * while (true) {
34 * auto msg = co_await source.next_message();
35 * std::cout << msg.data.size() << " bytes from "
36 * << msg.sender_address << "\n";
37 * }
38 * };
39 * @endcode
40 */
41class MAYAFLUX_API NetworkSource {
42public:
43 /**
44 * @brief Open a network endpoint and begin receiving
45 * @param info Endpoint configuration.
46 *
47 * Opens the endpoint via NetworkService and registers a receive
48 * callback that calls signal(). Logs a warning and remains inert
49 * if NetworkService is unavailable.
50 */
51 explicit NetworkSource(const Core::EndpointInfo& info);
52
53 /**
54 * @brief Close the endpoint and release all resources
55 */
57
58 NetworkSource(const NetworkSource&) = delete;
60 NetworkSource(NetworkSource&&) noexcept = default;
61 NetworkSource& operator=(NetworkSource&&) noexcept = default;
62
63 /**
64 * @brief Signal that a message arrived (called from Asio IO thread)
65 * @param message The received message.
66 *
67 * Lock-free. Pushes into ring buffer, atomically detaches the full
68 * waiter list, and calls try_resume() on every registered awaiter.
69 * Safe to call from any thread.
70 */
71 void signal(const Core::NetworkMessage& message);
72
73 /**
74 * @brief Create an awaiter for the next message
75 * @return Awaiter that suspends until a message arrives.
76 */
77 Kriya::NetworkAwaiter next_message();
78
79 /**
80 * @brief Check if messages are pending
81 */
82 [[nodiscard]] bool has_pending() const;
83
84 /**
85 * @brief Get number of pending messages
86 */
87 [[nodiscard]] size_t pending_count() const;
88
89 /**
90 * @brief Clear all pending messages
91 */
92 void clear();
93
94private:
95 static constexpr size_t QUEUE_CAPACITY = 256;
96
97 uint64_t m_endpoint_id {};
99
100 /**
101 * @brief Head of the intrusive lock-free waiter list.
102 *
103 * Each NetworkAwaiter carries an atomic next pointer. register_waiter
104 * pushes onto the head with a CAS loop. signal() atomically swaps the
105 * head to null, detaching the full list, then walks and resumes each
106 * awaiter. No mutex, no fixed cap, no missed wakeups.
107 */
108 std::atomic<Kriya::NetworkAwaiter*> m_waiters_head { nullptr };
109
110 std::optional<Core::NetworkMessage> pop_message();
111
112 void register_waiter(Kriya::NetworkAwaiter* awaiter);
113 void unregister_waiter(Kriya::NetworkAwaiter* awaiter);
114
116};
117
118} // namespace MayaFlux::Vruta
Awaiter for suspending a coroutine until a network message arrives.
Policy-driven unified circular buffer implementation.
NetworkSource(const NetworkSource &)=delete
NetworkSource(NetworkSource &&) noexcept=default
NetworkSource & operator=(const NetworkSource &)=delete
Memory::LockFreeQueue< Core::NetworkMessage, QUEUE_CAPACITY > m_queue
Awaitable broadcast message stream for a network endpoint.
Describes one logical send/receive endpoint managed by a backend.