MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
NetworkSource.hpp
Go to the documentation of this file.
1#pragma once
2
5
7
8namespace MayaFlux::Kriya {
9class NetworkAwaiter;
10}
11
12namespace MayaFlux::Vruta {
13
14/**
15 * @class NetworkSource
16 * @brief Awaitable broadcast message stream for a network endpoint
17 *
18 * Opens and owns a network endpoint on construction. Any number of
19 * coroutines may co_await the same source simultaneously via next_message().
20 * signal() broadcasts to all registered waiters on every incoming message.
21 *
22 * Fully lock-free on all paths. Waiter registration uses an intrusive
23 * singly-linked list with an atomic head. signal() atomically detaches
24 * the entire list and resumes each awaiter. The message queue is a
25 * lock-free ring buffer shared by all waiters — each awaiter pops its
26 * own copy on resumption.
27 *
28 * Endpoint lifecycle is internal. BackendRegistry is consulted in the
29 * constructor and destructor; the caller never touches NetworkService.
30 *
31 * @code
32 * Vruta::NetworkSource source({ .transport = NetworkTransport::UDP, .local_port = 8000 });
33 *
34 * auto handler = [&source]() -> Vruta::Event {
35 * while (true) {
36 * auto msg = co_await source.next_message();
37 * std::cout << msg.data.size() << " bytes from "
38 * << msg.sender_address << "\n";
39 * }
40 * };
41 * @endcode
42 */
43class MAYAFLUX_API NetworkSource : public EventSource {
44public:
45 /**
46 * @brief Open a network endpoint and begin receiving
47 * @param info Endpoint configuration.
48 *
49 * Opens the endpoint via NetworkService and registers a receive
50 * callback that calls signal(). Logs a warning and remains inert
51 * if NetworkService is unavailable.
52 */
53 explicit NetworkSource(const Core::EndpointInfo& info);
54
55 /**
56 * @brief Close the endpoint and release all resources
57 */
59
60 NetworkSource(const NetworkSource&) = delete;
62 NetworkSource(NetworkSource&&) noexcept = default;
63 NetworkSource& operator=(NetworkSource&&) noexcept = default;
64
65 /**
66 * @brief Signal that a message arrived (called from Asio IO thread)
67 * @param message The received message.
68 *
69 * Lock-free. Pushes into ring buffer, atomically detaches the full
70 * waiter list, and calls try_resume() on every registered awaiter.
71 * Safe to call from any thread.
72 */
73 void signal(const Core::NetworkMessage& message);
74
75 /**
76 * @brief Create an awaiter for the next message
77 * @return Awaiter that suspends until a message arrives.
78 */
79 Kriya::NetworkAwaiter next_message();
80
81 /**
82 * @brief Check if messages are pending
83 */
84 [[nodiscard]] bool has_pending() const override;
85
86 /**
87 * @brief Get number of pending messages
88 */
89 [[nodiscard]] size_t pending_count() const;
90
91 /**
92 * @brief Clear all pending messages
93 */
94 void clear() override;
95
96private:
97 static constexpr size_t QUEUE_CAPACITY = 256;
98
99 uint64_t m_endpoint_id {};
101
102 /**
103 * @brief Head of the intrusive lock-free waiter list.
104 *
105 * Each NetworkAwaiter carries an atomic next pointer. register_waiter
106 * pushes onto the head with a CAS loop. signal() atomically swaps the
107 * head to null, detaching the full list, then walks and resumes each
108 * awaiter. No mutex, no fixed cap, no missed wakeups.
109 */
110 std::atomic<Kriya::NetworkAwaiter*> m_waiters_head { nullptr };
111
112 std::optional<Core::NetworkMessage> pop_message();
113
114 void register_waiter(Kriya::NetworkAwaiter* awaiter);
115 void unregister_waiter(Kriya::NetworkAwaiter* awaiter);
116
118};
119
120} // namespace MayaFlux::Vruta
Awaiter for suspending a coroutine until a network message arrives.
Policy-driven unified circular buffer implementation.
Abstract base for all awaitable signal sources.
NetworkSource(const NetworkSource &)=delete
NetworkSource(NetworkSource &&) noexcept=default
NetworkSource & operator=(const NetworkSource &)=delete
Memory::LockFreeQueue< Core::NetworkMessage, QUEUE_CAPACITY > m_queue
Awaitable broadcast message stream for a network endpoint.
Describes one logical send/receive endpoint managed by a backend.