MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
RootBuffer.hpp
Go to the documentation of this file.
1#pragma once
2
4
5namespace MayaFlux::Buffers {
6
7template <typename BufferType>
8class MAYAFLUX_API RootBuffer : public BufferType {
9public:
10 using BufferType::BufferType;
11
12 /**
13 * @brief Adds a tributary buffer to this root buffer
14 * @param buffer Child buffer to add to the aggregation hierarchy
15 *
16 * Tributary buffers contribute their data to the root buffer
17 * when the root buffer is processed. This allows multiple computational
18 * streams to be combined into a single output channel.
19 *
20 * @throws std::runtime_error if buffer is not acceptable based on current token enforcement strategy
21 */
22 virtual void add_child_buffer(std::shared_ptr<BufferType> buffer)
23 {
24 if (this->is_processing()) {
25 for (auto& m_pending_op : m_pending_ops) {
26 bool expected = false;
27 if (m_pending_op.active.compare_exchange_strong(expected, true, std::memory_order_acquire, std::memory_order_relaxed)) {
28 m_pending_op.buffer = buffer;
29 m_pending_op.is_addition = true;
30 m_pending_count.fetch_add(1, std::memory_order_relaxed);
31 return;
32 }
33 }
34
35 return;
36 }
37 add_child_buffer_direct(buffer);
38 }
39
40 /**
41 * @brief Attempts to add a child buffer without throwing exceptions
42 * @param buffer Child buffer to add to the aggregation hierarchy
43 * @param rejection_reason Optional output parameter for rejection reason
44 * @return True if buffer was successfully added, false otherwise
45 *
46 * This is a non-throwing version of add_child_buffer() that can be used
47 * when you want to handle rejection gracefully without exception handling.
48 */
49 virtual bool try_add_child_buffer(std::shared_ptr<BufferType> buffer, std::string* rejection_reason = nullptr)
50 {
51 if (!is_buffer_acceptable(buffer, rejection_reason)) {
52 return false;
53 }
54
55 add_child_buffer(buffer);
56 return true;
57 }
58
59 /**
60 * @brief Removes a tributary buffer from this root buffer
61 * @param buffer Child buffer to remove from the aggregation hierarchy
62 */
63 virtual void remove_child_buffer(std::shared_ptr<BufferType> buffer)
64 {
65 if (this->is_processing()) {
66 for (auto& m_pending_op : m_pending_ops) {
67 bool expected = false;
68 if (m_pending_op.active.compare_exchange_strong(expected, true, std::memory_order_acquire, std::memory_order_relaxed)) {
69 m_pending_op.buffer = buffer;
70 m_pending_op.is_addition = false;
71 m_pending_count.fetch_add(1, std::memory_order_relaxed);
72 return;
73 }
74 }
75
76 return;
77 }
78
79 remove_child_buffer_direct(buffer);
80 }
81
82 /**
83 * @brief Gets the number of tributary buffers in the aggregation hierarchy
84 * @return Number of tributary buffers
85 */
86 [[nodiscard]] size_t get_num_children() const { return m_child_buffers.size(); }
87
88 /**
89 * @brief Gets all tributary buffers in the aggregation hierarchy
90 * @return Constant reference to the vector of tributary buffers
91 */
92 const std::vector<std::shared_ptr<BufferType>>& get_child_buffers() const
93 {
94 return m_child_buffers;
95 }
96
97 /**
98 * @brief Resets all data values in this buffer and its tributaries
99 *
100 * Initializes all sample values to zero in this buffer and optionally
101 * in all tributary buffers in the aggregation hierarchy.
102 */
103 void clear() override
104 {
105 BufferType::clear();
106 for (auto& child : m_child_buffers) {
107 child->clear();
108 }
109 }
110
111 /**
112 * @brief Activates/deactivates processing for the current token
113 * @param active Whether this buffer should process when its token is active
114 *
115 * This allows subsystems to selectively enable/disable buffers based on
116 * current processing requirements without changing the token assignment.
117 */
118 virtual void set_token_active(bool active) = 0;
119
120 /**
121 * @brief Checks if the buffer is active for its assigned token
122 * @return True if the buffer will process when its token is processed
123 */
124 [[nodiscard]] virtual bool is_token_active() const = 0;
125
126 /**
127 * @brief Sets processing rate hint for the buffer
128 * @param samples_per_second Expected processing rate for this token
129 *
130 * This helps the buffer optimize its processing for different rates.
131 * Audio might be 48kHz, visual might be 60Hz, custom might be variable.
132 */
133 virtual void set_processing_rate_hint(uint32_t tick_rate) { m_processing_rate_hint = tick_rate; }
134
135 /**
136 * @brief Gets the processing rate hint
137 * @return Expected processing rate in samples/frames per second
138 */
139 [[nodiscard]] virtual uint32_t get_processing_rate_hint() const { return m_processing_rate_hint; }
140
141 /**
142 * @brief Enables cross-modal data sharing
143 * @param enabled Whether this buffer should share data across processing domains
144 *
145 * When enabled, the buffer can be accessed by different subsystems
146 * simultaneously, enabling advanced cross-modal processing.
147 */
148 virtual void enable_cross_modal_sharing(bool enabled) { m_cross_modal_sharing = enabled; }
149
150 /**
151 * @brief Checks if cross-modal sharing is enabled
152 * @return True if buffer can be shared across processing domains
153 */
154 [[nodiscard]] virtual bool is_cross_modal_sharing_enabled() const { return m_cross_modal_sharing; }
155
156 /**
157 * @brief Validates if a buffer is acceptable based on current token enforcement strategy
158 * @param buffer Buffer to validate
159 * @param rejection_reason Optional output parameter for rejection reason
160 * @return True if buffer is acceptable, false otherwise
161 *
162 * This method encapsulates all token compatibility validation logic based on the
163 * current enforcement strategy. It provides a clean separation between validation
164 * logic and the actual buffer addition process, making the code more maintainable
165 * and testable.
166 */
167 bool is_buffer_acceptable(std::shared_ptr<BufferType> buffer, std::string* rejection_reason = nullptr) const
168 {
169 auto default_processor = buffer->get_default_processor();
170 if (!default_processor) {
171 return true;
172 }
173
174 ProcessingToken child_token = default_processor->get_processing_token();
175
176 switch (m_token_enforcement_strategy) {
177 case TokenEnforcementStrategy::STRICT:
178 if (child_token != m_preferred_processing_token) {
179 if (rejection_reason) {
180 *rejection_reason = "Child buffer's default processor token does not match preferred processing token (STRICT mode)";
181 }
182 return false;
183 }
184 break;
185
186 case TokenEnforcementStrategy::FILTERED:
187 if (!are_tokens_compatible(m_preferred_processing_token, child_token)) {
188 if (rejection_reason) {
189 *rejection_reason = "Child buffer's default processor token is not compatible with preferred processing token (FILTERED mode)";
190 }
191 return false;
192 }
193 break;
194
195 case TokenEnforcementStrategy::OVERRIDE_SKIP:
196 if (!are_tokens_compatible(m_preferred_processing_token, child_token)) {
197 if (rejection_reason) {
198 *rejection_reason = "Child buffer token is incompatible but will be conditionally processed (OVERRIDE_SKIP mode)";
199 }
200 }
201 break;
202
203 case TokenEnforcementStrategy::OVERRIDE_REJECT:
204 if (!are_tokens_compatible(m_preferred_processing_token, child_token)) {
205 if (rejection_reason) {
206 *rejection_reason = "Child buffer token is incompatible and will be removed later (OVERRIDE_REJECT mode)";
207 }
208 }
209 break;
210
211 case TokenEnforcementStrategy::IGNORE:
212 break;
213 }
214
215 return true;
216 }
217
218 [[nodiscard]] bool has_pending_operations() const
219 {
220 return m_pending_count.load(std::memory_order_relaxed) > 0;
221 }
222
223protected:
224 void add_child_buffer_direct(std::shared_ptr<BufferType> buffer)
225 {
226 std::string rejection_reason;
227 if (!is_buffer_acceptable(buffer, &rejection_reason)) {
228 throw std::runtime_error("Cannot add child buffer: " + rejection_reason);
229 }
230
231 m_child_buffers.push_back(buffer);
232
233 if (!buffer->get_processing_chain() && this->get_processing_chain()) {
234 buffer->set_processing_chain(this->get_processing_chain());
235 }
236 }
237
238 void remove_child_buffer_direct(std::shared_ptr<BufferType> buffer)
239 {
240 auto it = std::find(m_child_buffers.begin(), m_child_buffers.end(), buffer);
241 if (it != m_child_buffers.end()) {
242 m_child_buffers.erase(it);
243 }
244 }
245
246 /**
247 * @brief Process pending operations - call this at start of processing cycles
248 */
250 {
251 for (auto& m_pending_op : m_pending_ops) {
252 if (m_pending_op.active.load(std::memory_order_acquire)) {
253 auto& op = m_pending_op;
254
255 if (op.is_addition) {
256 add_child_buffer_direct(op.buffer);
257 } else {
258 remove_child_buffer_direct(op.buffer);
259 }
260
261 // Clear operation
262 op.buffer.reset();
263 op.active.store(false, std::memory_order_release);
264 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
265 }
266 }
267 }
268
269 /**
270 * @brief Vector of tributary buffers that contribute to this root buffer
271 */
272 std::vector<std::shared_ptr<BufferType>> m_child_buffers;
273
274 /**
275 * @brief Processing rate hint for this buffer
276 *
277 * This is used to optimize processing based on expected
278 * sample/frame rates, allowing the buffer to adapt its
279 * processing strategy accordingly.
280 */
282
283 /**
284 * @brief Whether this buffer allows cross-modal data sharing
285 *
286 * When enabled, the buffer can be accessed by different
287 * subsystems simultaneously, allowing advanced cross-modal
288 * processing techniques.
289 */
291
292 /**
293 * @brief Current token enforcement strategy for this root buffer
294 *
295 * This defines how child buffers are validated and processed
296 * based on their processing tokens. It allows for flexible
297 * control over how different processing streams interact.
298 */
299 TokenEnforcementStrategy m_token_enforcement_strategy { TokenEnforcementStrategy::STRICT };
300
301 /**
302 * @brief Preferred processing token for this root buffer
303 *
304 * This is the token that child buffers should ideally match
305 * to be accepted into the aggregation hierarchy. It defines
306 * the primary processing stream for this root buffer.
307 */
309
310 std::atomic<uint32_t> m_pending_count { 0 };
311
312 static constexpr size_t MAX_PENDING = 64;
313
314 /**
315 * @brief Structure for storing pending buffer add/remove operations
316 *
317 * Similar to RootNode's PendingOp, this handles buffer operations
318 * that need to be deferred when the buffer is currently processing.
319 */
321 std::atomic<bool> active { false };
322 std::shared_ptr<BufferType> buffer;
323 bool is_addition { true }; // true = add, false = remove
324 } m_pending_ops[MAX_PENDING];
325};
326}
virtual void set_processing_rate_hint(uint32_t tick_rate)
Sets processing rate hint for the buffer.
bool m_cross_modal_sharing
Whether this buffer allows cross-modal data sharing.
const std::vector< std::shared_ptr< BufferType > > & get_child_buffers() const
Gets all tributary buffers in the aggregation hierarchy.
virtual void enable_cross_modal_sharing(bool enabled)
Enables cross-modal data sharing.
void process_pending_buffer_operations()
Process pending operations - call this at start of processing cycles.
virtual uint32_t get_processing_rate_hint() const
Gets the processing rate hint.
virtual bool is_token_active() const =0
Checks if the buffer is active for its assigned token.
void clear() override
Resets all data values in this buffer and its tributaries.
virtual void add_child_buffer(std::shared_ptr< BufferType > buffer)
Adds a tributary buffer to this root buffer.
void remove_child_buffer_direct(std::shared_ptr< BufferType > buffer)
virtual void remove_child_buffer(std::shared_ptr< BufferType > buffer)
Removes a tributary buffer from this root buffer.
uint32_t m_processing_rate_hint
Processing rate hint for this buffer.
bool is_buffer_acceptable(std::shared_ptr< BufferType > buffer, std::string *rejection_reason=nullptr) const
Validates if a buffer is acceptable based on current token enforcement strategy.
ProcessingToken m_preferred_processing_token
Preferred processing token for this root buffer.
void add_child_buffer_direct(std::shared_ptr< BufferType > buffer)
std::vector< std::shared_ptr< BufferType > > m_child_buffers
Vector of tributary buffers that contribute to this root buffer.
size_t get_num_children() const
Gets the number of tributary buffers in the aggregation hierarchy.
virtual void set_token_active(bool active)=0
Activates/deactivates processing for the current token.
virtual bool is_cross_modal_sharing_enabled() const
Checks if cross-modal sharing is enabled.
virtual bool try_add_child_buffer(std::shared_ptr< BufferType > buffer, std::string *rejection_reason=nullptr)
Attempts to add a child buffer without throwing exceptions.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
TokenEnforcementStrategy
Defines how strictly processing token requirements are enforced in buffer processing chains.
Structure for storing pending buffer add/remove operations.