MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BroadcastSource.hpp
Go to the documentation of this file.
1#pragma once
2
4
5namespace MayaFlux::Kriya {
6template <typename T>
7class BroadcastAwaiter;
8}
9
10namespace MayaFlux::Vruta {
11
12/**
13 * @class BroadcastSource
14 * @brief Awaitable single-value broadcast channel for cross-thread signal delivery.
15 *
16 * Bridges a callback-based producer (e.g. audio output observer) to a coroutine
17 * consumer via co_await. Each signal() call delivers to the suspended awaiter,
18 * or stores the value for the next await_ready() poll if no awaiter is registered.
19 *
20 * Thread safety contract:
21 * - signal() may be called from any thread at any time.
22 * - register_waiter() / unregister_waiter() / pop_pending() are called only
23 * from the coroutine's execution context (single-threaded per coroutine).
24 *
25 * Race window between await_ready() returning false and await_suspend()
26 * completing registration is closed by signal() itself: it uses a generation
27 * counter so a missed signal leaves the pending slot non-null, which
28 * await_suspend() checks after registration via a second pop_pending() call.
29 * The self-resume in await_suspend is intentionally absent; instead the CAS
30 * leaves the awaiter registered and the value in m_result, and returns to the
31 * coroutine machinery which will call await_resume() when the handle is resumed
32 * by the scheduler or by a subsequent signal() deliver() call.
33 *
34 * One coroutine per BroadcastSource instance. For fan-out, use one
35 * BroadcastSource per consumer.
36 *
37 * @tparam T Payload type. Must be copyable.
38 *
39 * @see BroadcastAwaiter
40 */
41template <typename T>
43public:
44 BroadcastSource() = default;
45 ~BroadcastSource() override = default;
46
51
52 /**
53 * @brief Deliver a value to the waiting coroutine, or store it as pending.
54 *
55 * Atomically takes the waiter slot. If a waiter is present, delivers
56 * directly and resumes. Otherwise stores the value for the next
57 * await_ready() or post-registration check.
58 *
59 * Safe to call from any thread.
60 *
61 * @param value Value to deliver.
62 */
63 void signal(const T& value)
64 {
65 auto* w = m_waiter.exchange(nullptr, std::memory_order_acq_rel);
66 if (w) {
67 w->deliver(value);
68 } else {
69 m_pending_value = value;
70 m_pending_flag.store(true, std::memory_order_release);
71 }
72 }
73
74 /**
75 * @brief Create an awaiter for the next signal.
76 */
81
82 /**
83 * @brief Returns true if a value arrived before any awaiter registered.
84 */
85 [[nodiscard]] bool has_pending() const override
86 {
87 return m_pending_flag.load(std::memory_order_acquire);
88 }
89
90 /**
91 * @brief Discard any stored pending value.
92 */
93 void clear() override
94 {
95 m_pending_flag.store(false, std::memory_order_release);
96 }
97
98private:
99 std::atomic<Kriya::BroadcastAwaiter<T>*> m_waiter { nullptr };
100
101 /// @brief Pending value written by signal() when no waiter is registered.
102 /// Written only by the signal() caller; read only by the coroutine thread.
104 std::atomic<bool> m_pending_flag { false };
105
106 /**
107 * @brief Consume and return the pending value if one is available.
108 *
109 * Called only from the coroutine's execution context.
110 *
111 * @return The pending value, or std::nullopt.
112 */
113 std::optional<T> pop_pending()
114 {
115 if (!m_pending_flag.load(std::memory_order_acquire))
116 return std::nullopt;
117 T val = m_pending_value;
118 m_pending_flag.store(false, std::memory_order_release);
119 return val;
120 }
121
123 {
124 m_waiter.store(awaiter, std::memory_order_release);
125 }
126
128 {
129 Kriya::BroadcastAwaiter<T>* expected = awaiter;
130 m_waiter.compare_exchange_strong(
131 expected, nullptr,
132 std::memory_order_acq_rel, std::memory_order_relaxed);
133 }
134
135 friend class Kriya::BroadcastAwaiter<T>;
136};
137
138} // namespace MayaFlux::Vruta
Awaiter for suspending a coroutine until a BroadcastSource<T> fires.
void signal(const T &value)
Deliver a value to the waiting coroutine, or store it as pending.
BroadcastSource(BroadcastSource &&)=delete
Kriya::BroadcastAwaiter< T > next()
Create an awaiter for the next signal.
BroadcastSource & operator=(const BroadcastSource &)=delete
bool has_pending() const override
Returns true if a value arrived before any awaiter registered.
void register_waiter(Kriya::BroadcastAwaiter< T > *awaiter)
~BroadcastSource() override=default
BroadcastSource & operator=(BroadcastSource &&)=delete
void unregister_waiter(Kriya::BroadcastAwaiter< T > *awaiter)
std::atomic< Kriya::BroadcastAwaiter< T > * > m_waiter
void clear() override
Discard any stored pending value.
std::optional< T > pop_pending()
Consume and return the pending value if one is available.
T m_pending_value
Pending value written by signal() when no waiter is registered.
BroadcastSource(const BroadcastSource &)=delete
Awaitable single-value broadcast channel for cross-thread signal delivery.
Abstract base for all awaitable signal sources.