MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
RootBuffer.hpp
Go to the documentation of this file.
1#pragma once
2
4
6
7namespace MayaFlux::Buffers {
8
9template <typename BufferType>
10class MAYAFLUX_API RootBuffer : public BufferType {
11public:
12 using BufferType::BufferType;
13
14 /**
15 * @brief Adds a tributary buffer to this root buffer
16 * @param buffer Child buffer to add to the aggregation hierarchy
17 *
18 * Tributary buffers contribute their data to the root buffer
19 * when the root buffer is processed. This allows multiple computational
20 * streams to be combined into a single output channel.
21 *
22 * @throws std::runtime_error if buffer is not acceptable based on current token enforcement strategy
23 */
24 virtual void add_child_buffer(const std::shared_ptr<BufferType>& buffer)
25 {
26 if (this->is_processing()) {
27 for (auto& m_pending_op : m_pending_ops) {
28 bool expected = false;
29 if (m_pending_op.active.compare_exchange_strong(expected, true, std::memory_order_acquire, std::memory_order_relaxed)) {
30 m_pending_op.buffer = buffer;
31 m_pending_op.is_addition = true;
32 m_pending_count.fetch_add(1, std::memory_order_relaxed);
33 return;
34 }
35 }
36
37 return;
38 }
39 add_child_buffer_direct(buffer);
40 }
41
42 /**
43 * @brief Attempts to add a child buffer without throwing exceptions
44 * @param buffer Child buffer to add to the aggregation hierarchy
45 * @param rejection_reason Optional output parameter for rejection reason
46 * @return True if buffer was successfully added, false otherwise
47 *
48 * This is a non-throwing version of add_child_buffer() that can be used
49 * when you want to handle rejection gracefully without exception handling.
50 */
51 virtual bool try_add_child_buffer(const std::shared_ptr<BufferType>& buffer, std::string* rejection_reason = nullptr)
52 {
53 if (!is_buffer_acceptable(buffer, rejection_reason)) {
54 return false;
55 }
56
57 add_child_buffer(buffer);
58 return true;
59 }
60
61 /**
62 * @brief Removes a tributary buffer from this root buffer
63 * @param buffer Child buffer to remove from the aggregation hierarchy
64 */
65 virtual void remove_child_buffer(const std::shared_ptr<BufferType>& buffer)
66 {
67 if (this->is_processing()) {
68 for (auto& m_pending_op : m_pending_ops) {
69 bool expected = false;
70 if (m_pending_op.active.compare_exchange_strong(expected, true, std::memory_order_acquire, std::memory_order_relaxed)) {
71 m_pending_op.buffer = buffer;
72 m_pending_op.is_addition = false;
73 m_pending_count.fetch_add(1, std::memory_order_relaxed);
74 return;
75 }
76 }
77
78 return;
79 }
80
81 remove_child_buffer_direct(buffer);
82 }
83
84 /**
85 * @brief Gets the number of tributary buffers in the aggregation hierarchy
86 * @return Number of tributary buffers
87 */
88 [[nodiscard]] size_t get_num_children() const { return m_child_buffers.size(); }
89
90 /**
91 * @brief Gets all tributary buffers in the aggregation hierarchy
92 * @return Constant reference to the vector of tributary buffers
93 */
94 const std::vector<std::shared_ptr<BufferType>>& get_child_buffers() const
95 {
96 return m_child_buffers;
97 }
98
99 /**
100 * @brief Resets all data values in this buffer and its tributaries
101 *
102 * Initializes all sample values to zero in this buffer and optionally
103 * in all tributary buffers in the aggregation hierarchy.
104 */
105 void clear() override
106 {
107 BufferType::clear();
108 for (auto& child : m_child_buffers) {
109 child->clear();
110 }
111 }
112
113 /**
114 * @brief Activates/deactivates processing for the current token
115 * @param active Whether this buffer should process when its token is active
116 *
117 * This allows subsystems to selectively enable/disable buffers based on
118 * current processing requirements without changing the token assignment.
119 */
120 virtual void set_token_active(bool active) = 0;
121
122 /**
123 * @brief Checks if the buffer is active for its assigned token
124 * @return True if the buffer will process when its token is processed
125 */
126 [[nodiscard]] virtual bool is_token_active() const = 0;
127
128 /**
129 * @brief Sets processing rate hint for the buffer
130 * @param samples_per_second Expected processing rate for this token
131 *
132 * This helps the buffer optimize its processing for different rates.
133 * Audio might be 48kHz, visual might be 60Hz, custom might be variable.
134 */
135 virtual void set_processing_rate_hint(uint32_t tick_rate) { m_processing_rate_hint = tick_rate; }
136
137 /**
138 * @brief Gets the processing rate hint
139 * @return Expected processing rate in samples/frames per second
140 */
141 [[nodiscard]] virtual uint32_t get_processing_rate_hint() const { return m_processing_rate_hint; }
142
143 /**
144 * @brief Enables cross-modal data sharing
145 * @param enabled Whether this buffer should share data across processing domains
146 *
147 * When enabled, the buffer can be accessed by different subsystems
148 * simultaneously, enabling advanced cross-modal processing.
149 */
150 virtual void enable_cross_modal_sharing(bool enabled) { m_cross_modal_sharing = enabled; }
151
152 /**
153 * @brief Checks if cross-modal sharing is enabled
154 * @return True if buffer can be shared across processing domains
155 */
156 [[nodiscard]] virtual bool is_cross_modal_sharing_enabled() const { return m_cross_modal_sharing; }
157
158 /**
159 * @brief Validates if a buffer is acceptable based on current token enforcement strategy
160 * @param buffer Buffer to validate
161 * @param rejection_reason Optional output parameter for rejection reason
162 * @return True if buffer is acceptable, false otherwise
163 *
164 * This method encapsulates all token compatibility validation logic based on the
165 * current enforcement strategy. It provides a clean separation between validation
166 * logic and the actual buffer addition process, making the code more maintainable
167 * and testable.
168 */
169 bool is_buffer_acceptable(const std::shared_ptr<BufferType>& buffer, std::string* rejection_reason = nullptr) const
170 {
171 auto default_processor = buffer->get_default_processor();
172 if (!default_processor) {
173 return true;
174 }
175
176 ProcessingToken child_token = default_processor->get_processing_token();
177
178 switch (m_token_enforcement_strategy) {
179 case TokenEnforcementStrategy::STRICT:
180 if (child_token != m_preferred_processing_token) {
181 if (rejection_reason) {
182 *rejection_reason = "Child buffer's default processor token does not match preferred processing token (STRICT mode)";
183 }
184 return false;
185 }
186 break;
187
188 case TokenEnforcementStrategy::FILTERED:
189 if (!are_tokens_compatible(m_preferred_processing_token, child_token)) {
190 if (rejection_reason) {
191 *rejection_reason = "Child buffer's default processor token is not compatible with preferred processing token (FILTERED mode)";
192 }
193 return false;
194 }
195 break;
196
197 case TokenEnforcementStrategy::OVERRIDE_SKIP:
198 if (!are_tokens_compatible(m_preferred_processing_token, child_token)) {
199 if (rejection_reason) {
200 *rejection_reason = "Child buffer token is incompatible but will be conditionally processed (OVERRIDE_SKIP mode)";
201 }
202 }
203 break;
204
205 case TokenEnforcementStrategy::OVERRIDE_REJECT:
206 if (!are_tokens_compatible(m_preferred_processing_token, child_token)) {
207 if (rejection_reason) {
208 *rejection_reason = "Child buffer token is incompatible and will be removed later (OVERRIDE_REJECT mode)";
209 }
210 }
211 break;
212
213 case TokenEnforcementStrategy::IGNORE:
214 break;
215 }
216
217 return true;
218 }
219
220 [[nodiscard]] bool has_pending_operations() const
221 {
222 return m_pending_count.load(std::memory_order_relaxed) > 0;
223 }
224
225protected:
226 void add_child_buffer_direct(const std::shared_ptr<BufferType>& buffer)
227 {
228 std::string rejection_reason;
229 if (!is_buffer_acceptable(buffer, &rejection_reason)) {
230 error<std::runtime_error>(Journal::Component::Buffers, Journal::Context::BufferManagement, std::source_location::current(), "Cannot add child buffer: {}", rejection_reason);
231 }
232
233 m_child_buffers.push_back(buffer);
234
235 if (!buffer->get_processing_chain() && this->get_processing_chain()) {
236 buffer->set_processing_chain(this->get_processing_chain());
237 }
238 }
239
240 void remove_child_buffer_direct(const std::shared_ptr<BufferType>& buffer)
241 {
242 auto it = std::find(m_child_buffers.begin(), m_child_buffers.end(), buffer);
243 if (it != m_child_buffers.end()) {
244 m_child_buffers.erase(it);
245 }
246 }
247
248 /**
249 * @brief Process pending operations - call this at start of processing cycles
250 */
252 {
253 for (auto& m_pending_op : m_pending_ops) {
254 if (m_pending_op.active.load(std::memory_order_acquire)) {
255 auto& op = m_pending_op;
256
257 if (op.is_addition) {
258 add_child_buffer_direct(op.buffer);
259 } else {
260 remove_child_buffer_direct(op.buffer);
261 }
262
263 // Clear operation
264 op.buffer.reset();
265 op.active.store(false, std::memory_order_release);
266 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
267 }
268 }
269 }
270
271 /**
272 * @brief Vector of tributary buffers that contribute to this root buffer
273 */
274 std::vector<std::shared_ptr<BufferType>> m_child_buffers;
275
276 /**
277 * @brief Processing rate hint for this buffer
278 *
279 * This is used to optimize processing based on expected
280 * sample/frame rates, allowing the buffer to adapt its
281 * processing strategy accordingly.
282 */
284
285 /**
286 * @brief Whether this buffer allows cross-modal data sharing
287 *
288 * When enabled, the buffer can be accessed by different
289 * subsystems simultaneously, allowing advanced cross-modal
290 * processing techniques.
291 */
293
294 /**
295 * @brief Current token enforcement strategy for this root buffer
296 *
297 * This defines how child buffers are validated and processed
298 * based on their processing tokens. It allows for flexible
299 * control over how different processing streams interact.
300 */
301 TokenEnforcementStrategy m_token_enforcement_strategy { TokenEnforcementStrategy::STRICT };
302
303 /**
304 * @brief Preferred processing token for this root buffer
305 *
306 * This is the token that child buffers should ideally match
307 * to be accepted into the aggregation hierarchy. It defines
308 * the primary processing stream for this root buffer.
309 */
311
312 std::atomic<uint32_t> m_pending_count { 0 };
313
314 static constexpr size_t MAX_PENDING = 64;
315
316 /**
317 * @brief Structure for storing pending buffer add/remove operations
318 *
319 * Similar to RootNode's PendingOp, this handles buffer operations
320 * that need to be deferred when the buffer is currently processing.
321 */
323 std::atomic<bool> active { false };
324 std::shared_ptr<BufferType> buffer;
325 bool is_addition { true }; // true = add, false = remove
326 } m_pending_ops[MAX_PENDING];
327};
328}
virtual void remove_child_buffer(const std::shared_ptr< BufferType > &buffer)
Removes a tributary buffer from this root buffer.
virtual void add_child_buffer(const std::shared_ptr< BufferType > &buffer)
Adds a tributary buffer to this root buffer.
virtual void set_processing_rate_hint(uint32_t tick_rate)
Sets processing rate hint for the buffer.
bool m_cross_modal_sharing
Whether this buffer allows cross-modal data sharing.
const std::vector< std::shared_ptr< BufferType > > & get_child_buffers() const
Gets all tributary buffers in the aggregation hierarchy.
virtual void enable_cross_modal_sharing(bool enabled)
Enables cross-modal data sharing.
void add_child_buffer_direct(const std::shared_ptr< BufferType > &buffer)
bool is_buffer_acceptable(const std::shared_ptr< BufferType > &buffer, std::string *rejection_reason=nullptr) const
Validates if a buffer is acceptable based on current token enforcement strategy.
void process_pending_buffer_operations()
Process pending operations - call this at start of processing cycles.
virtual uint32_t get_processing_rate_hint() const
Gets the processing rate hint.
virtual bool is_token_active() const =0
Checks if the buffer is active for its assigned token.
void clear() override
Resets all data values in this buffer and its tributaries.
uint32_t m_processing_rate_hint
Processing rate hint for this buffer.
virtual bool try_add_child_buffer(const std::shared_ptr< BufferType > &buffer, std::string *rejection_reason=nullptr)
Attempts to add a child buffer without throwing exceptions.
ProcessingToken m_preferred_processing_token
Preferred processing token for this root buffer.
void remove_child_buffer_direct(const std::shared_ptr< BufferType > &buffer)
std::vector< std::shared_ptr< BufferType > > m_child_buffers
Vector of tributary buffers that contribute to this root buffer.
size_t get_num_children() const
Gets the number of tributary buffers in the aggregation hierarchy.
virtual void set_token_active(bool active)=0
Activates/deactivates processing for the current token.
virtual bool is_cross_modal_sharing_enabled() const
Checks if cross-modal sharing is enabled.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
TokenEnforcementStrategy
Defines how strictly processing token requirements are enforced in buffer processing chains.
Structure for storing pending buffer add/remove operations.