MayaFlux 0.2.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferProcessingChain.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "BufferProcessor.hpp"
4
5namespace MayaFlux::Buffers {
6
7/**
8 * @class BufferProcessingChain
9 * @brief Advanced pipeline manager for multi-stage buffer transformations with backend optimization
10 *
11 * BufferProcessingChain organizes multiple BufferProcessor objects into sophisticated transformation
12 * pipelines for one or more buffers. This system enables complex multi-stage computational processes
13 * to be applied to data in a controlled, deterministic order while leveraging the expanded capabilities
14 * of modern BufferProcessors for optimal performance and backend utilization.
15 *
16 * The chain implements an intelligent directed acyclic graph (DAG) of transformations, maintaining
17 * separate processor sequences for each buffer while enabling advanced features:
18 *
19 * **Backend-Aware Processing:**
20 * - Automatic backend optimization based on processor recommendations
21 * - Intelligent batching of compatible processors for parallel execution
22 * - Dynamic backend switching to optimize processing pipelines
23 * - Resource-aware scheduling to prevent backend conflicts
24 *
25 * **Multi-Modal Data Support:**
26 * - Seamless processing of different data types (audio, video, texture) within unified chains
27 * - Type-safe processor assignment and validation
28 * - Cross-domain transformations between different buffer types
29 * - Unified interface for heterogeneous data processing
30 *
31 * **Performance Optimization:**
32 * - Processor compatibility validation and automatic optimization
33 * - Complexity-based scheduling for optimal resource utilization
34 * - Parallel execution of independent processing stages
35 * - Memory layout optimization for improved cache performance
36 *
37 * Key features:
38 * - Enables construction of complex computational pipelines with backend optimization
39 * - Supports both parallel and sequential transformation patterns with automatic selection
40 * - Preserves transformation order while optimizing execution strategy
41 * - Provides special "final" processors for guaranteed post-processing operations
42 * - Allows dynamic reconfiguration of transformation pipelines at runtime
43 * - Leverages processor agency for optimal backend selection and resource utilization
44 */
45class MAYAFLUX_API BufferProcessingChain {
46public:
47 friend class BufferProcessor;
48
49 /**
50 * @brief Adds a processor to the transformation pipeline for a specific buffer
51 * @param processor Processor to add
52 * @param buffer Buffer to associate with this processor
53 * @param rejection_reason Optional reason for rejection if the processor cannot be added
54 * @return true if the processor was successfully added, false if it was rejected
55 *
56 * The processor is added to the end of the transformation sequence for the specified buffer.
57 * If this is the first processor for the buffer, a new sequence is created. The chain
58 * performs intelligent validation and optimization:
59 *
60 * - **Compatibility Validation**: Ensures the processor can handle the buffer's data type
61 * - **Backend Analysis**: Analyzes processor backend preferences for optimization opportunities
62 * - **Pipeline Optimization**: May reorder or batch processors for improved performance
63 * - **Resource Planning**: Allocates necessary resources for the processor's execution
64 */
65 bool add_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason = nullptr);
66
67 /**
68 * @brief Removes a processor from the pipeline for a specific buffer
69 * @param processor Processor to remove
70 * @param buffer Buffer to remove the processor from
71 *
72 * If the processor is found in the buffer's transformation sequence, it is removed
73 * and its on_detach method is called. The chain also performs cleanup optimization:
74 *
75 * - **Resource Cleanup**: Ensures all processor resources are properly released
76 * - **Pipeline Reoptimization**: Rebuilds optimization plans without the removed processor
77 * - **Backend Restoration**: Restores default backends if the processor was overriding them
78 */
79 void remove_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
80
81 /**
82 * @brief Applies the transformation pipeline to a buffer with intelligent execution
83 * @param buffer Buffer to transform
84 *
85 * Applies each processor in the buffer's sequence using an optimized execution strategy.
86 * The chain leverages processor capabilities for maximum performance:
87 *
88 * - **Backend Optimization**: Uses processor-recommended backends when beneficial
89 * - **Parallel Execution**: Executes compatible processors in parallel when possible
90 * - **Resource Management**: Optimally allocates CPU, GPU, and memory resources
91 * - **Error Handling**: Provides robust error recovery and fallback mechanisms
92 *
93 * This does not include the final processor, which must be applied separately with
94 * process_final() to ensure proper pipeline completion.
95 */
96 void process(const std::shared_ptr<Buffer>& buffer);
97
98 /**
99 * @brief Applies preprocessors, processing chain, post processors and final processors sequentially to a buffer
100 * @param buffer Buffer to transform
101 *
102 * Use this when explicit control of order is not needed, and you want to ensure that all stages
103 * of the processing pipeline are applied in a strict sequence. This method guarantees that
104 * preprocessors, main processors, postprocessors, and final processors are executed one after
105 * the other, maintaining the exact order of operations as defined in the chain.
106 */
107 void process_complete(const std::shared_ptr<Buffer>& buffer);
108
109 /**
110 * @brief Sets a preprocessor to be applied before the main pipeline
111 * @param processor Preprocessor to add
112 * @param buffer Buffer to associate with this preprocessor
113 *
114 * The preprocessor is applied before all regular processors when process() is called.
115 * This is useful for initial data preparation steps that must occur prior to the
116 * main transformation sequence, such as format conversion, normalization, or validation.
117 * NOTE: This runs after the buffer's own default processor. If you wish this to be the
118 * preprocessor, remove the default processor first.
119 * This is done to allow buffers to configure their own default processing behavior.
120 * i.e NodeBuffer WILL acquire node data using its default processor before any processing chain preprocessor.
121 */
122 void add_preprocessor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
123
124 /**
125 * @brief Sets a postprocessor to be applied after the main pipeline
126 * @param processor Postprocessor to add
127 * @param buffer Buffer to associate with this postprocessor
128 *
129 * The postprocessor is applied after all regular processors when process() is called.
130 * This is useful for final data adjustments that must occur immediately after the
131 * main transformation sequence, such as clamping values, applying effects, or cleanup.
132 * NOTE: This is different from the final processor, and runs before it.
133 */
134 void add_postprocessor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
135
136 /**
137 * @brief Sets a special processor to be applied after the main pipeline
138 * @param processor Final processor to add
139 * @param buffer Buffer to associate with this final processor
140 *
141 * The final processor is applied after all regular processors when process_final() is called.
142 * This is essential for operations like normalization, boundary enforcement, format conversion,
143 * or validation that must be applied as the last step in a transformation pipeline, regardless
144 * of the optimization strategies used for the main processing sequence.
145 */
146 void add_final_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
147
148 /**
149 * @brief Checks if a buffer has any processors in its pipeline
150 * @param buffer Buffer to check
151 * @return true if the buffer has processors, false otherwise
152 *
153 * This method enables dynamic pipeline management and optimization decisions
154 * based on the presence of processing stages for specific buffers.
155 */
156 bool has_processors(const std::shared_ptr<Buffer>& buffer) const;
157
158 /**
159 * @brief Gets all processors in a buffer's transformation pipeline
160 * @param buffer Buffer to get processors for
161 * @return Constant reference to the vector of processors
162 *
163 * Returns an empty vector if the buffer has no processors. This provides
164 * access to the processor sequence for analysis, optimization, or debugging
165 * purposes while maintaining the integrity of the processing pipeline.
166 */
167 const std::vector<std::shared_ptr<BufferProcessor>>& get_processors(const std::shared_ptr<Buffer>& buffer) const;
168
169 /**
170 * @brief Gets the entire transformation pipeline structure
171 * @return Map of buffers to their processor sequences
172 *
173 * This provides access to the internal structure of the pipeline, mapping each
174 * buffer to its sequence of transformation processors. Essential for pipeline
175 * analysis, optimization planning, and system introspection capabilities.
176 */
177 inline std::unordered_map<std::shared_ptr<Buffer>, std::vector<std::shared_ptr<BufferProcessor>>> get_chain() const { return m_buffer_processors; }
178
179 /**
180 * @brief Combines another processing pipeline into this one with optimization
181 * @param other Chain to merge into this one
182 *
183 * Adds all processors from the other chain to this one, preserving their buffer
184 * associations and order while performing intelligent optimization. This enables
185 * the composition of complex transformation pipelines from simpler, reusable components:
186 *
187 * - **Compatibility Analysis**: Validates that merged processors are compatible
188 * - **Optimization Opportunities**: Identifies potential performance improvements in the combined pipeline
189 * - **Resource Consolidation**: Optimizes resource usage across the merged processors
190 * - **Backend Harmonization**: Resolves any backend conflicts between the chains
191 */
192 void merge_chain(const std::shared_ptr<BufferProcessingChain>& other);
193
194 /**
195 * @brief Applies the preprocessor to a buffer
196 * @param buffer Buffer to preprocess
197 *
198 * If the buffer has a preprocessor, it is applied before the main processing sequence.
199 * This is useful for initial data preparation steps that must occur prior to the
200 * main transformation sequence, such as format conversion, normalization, or validation.
201 */
202 void preprocess(const std::shared_ptr<Buffer>& buffer);
203
204 /**
205 * @brief Applies the postprocessor to a buffer
206 * @param buffer Buffer to postprocess
207 *
208 * If the buffer has a postprocessor, it is applied after the main processing sequence.
209 * This is useful for final data adjustments that must occur immediately after the
210 * main transformation sequence, such as clamping values, applying effects, or cleanup.
211 */
212 void postprocess(const std::shared_ptr<Buffer>& buffer);
213
214 /**
215 * @brief Applies the final processor to a buffer with guaranteed execution
216 * @param buffer Buffer to process
217 *
218 * If the buffer has a final processor, it is applied with guaranteed execution regardless
219 * of any optimization strategies or backend considerations. This is typically called after
220 * process() to apply final-stage transformations like normalization, boundary enforcement,
221 * or format validation that must complete successfully for pipeline integrity.
222 */
223 void process_final(const std::shared_ptr<Buffer>& buffer);
224
225 /**
226 * @brief Sets the preferred processing token for this chain
227 * @param token Preferred processing token to set
228 *
229 * This method allows the chain to specify a preferred processing domain that influences
230 * how processors are executed, including backend selection and execution strategy.
231 * The token can be used to optimize the entire pipeline based on the expected data type,
232 * processing requirements, and available hardware resources.
233 */
234 inline void set_preferred_token(ProcessingToken token) { m_token_filter_mask = token; }
235
236 /** * @brief Gets the preferred processing token for this chain
237 * @return Current preferred processing token
238 *
239 * Returns the currently set preferred processing token, which can be used by processors
240 * to optimize their execution strategies and backend selections. This token represents the
241 * overall processing domain that the chain aims to optimize for.
242 */
244 {
245 return m_token_filter_mask;
246 }
247
248 /**
249 * @brief Sets the token enforcement strategy for this chain
250 * @param strategy Token enforcement strategy to set
251 *
252 * This method allows the chain to specify how the processing token is enforced across the pipeline,
253 * including whether to filter processors based on their compatibility with the token. The default
254 * strategy is FILTERED, which applies the token only to compatible processors.
255 */
257 {
258 m_enforcement_strategy = strategy;
259 }
260
261 /**
262 * @brief Gets the current token enforcement strategy for this chain
263 * @return Current token enforcement strategy
264 *
265 * Returns the currently set token enforcement strategy, which determines how the processing token
266 * is applied to processors in the pipeline. This can influence whether processors are filtered
267 * based on their compatibility with the preferred processing token.
268 */
270 {
271 return m_enforcement_strategy;
272 }
273
274 /**
275 * @brief Optimizes the processing pipeline for improved performance
276 * @param buffer Buffer to optimize the pipeline for
277 *
278 * Analyzes the current processor sequence and applies various optimization strategies:
279 *
280 * - **Backend Consolidation**: Groups processors by preferred backend for batched execution
281 * - **Parallel Execution Planning**: Identifies processors that can run concurrently
282 * - **Memory Layout Optimization**: Optimizes data access patterns for cache efficiency
283 * - **Resource Balancing**: Balances processor load across available hardware resources
284 */
285 void optimize_for_tokens(const std::shared_ptr<Buffer>& buffer);
286
287 /**
288 * @brief Analyzes token compatibility across all processors in the chain
289 * @return Vector of TokenCompatibilityReport objects summarizing processor compatibility
290 *
291 * This method generates a detailed report on how each processor in the chain
292 * aligns with the preferred processing token, including compatibility status,
293 * enforcement strategy, and any processors that will be skipped or pending removal.
294 * Useful for debugging, optimization planning, and ensuring pipeline integrity.
295 */
296 std::vector<TokenCompatibilityReport> analyze_token_compatibility() const;
297
298 /** * @brief Validates all processors in the chain against the preferred processing token
299 * @param incompatibility_reasons Optional vector to store reasons for any incompatibilities
300 * @return true if all processors are compatible with the preferred token, false otherwise
301 *
302 * This method checks each processor in the chain against the preferred processing token,
303 * ensuring that all processors can execute under the current backend and execution strategy.
304 * If any incompatibilities are found, they can be reported through the provided vector.
305 */
306 bool validate_all_processors(std::vector<std::string>* incompatibility_reasons = nullptr) const;
307
308 /**
309 * @brief Enforces the chain's preferred processing token on all processors
310 *
311 * This method ensures that all processors in the chain are compatible with the
312 * preferred processing token, applying any necessary optimizations or removals
313 * of incompatible processors. It is typically called after setting a new preferred
314 * token or changing the enforcement strategy to ensure the pipeline remains valid.
315 */
316 void enforce_chain_token_on_processors();
317
318 inline bool has_pending_operations() const
319 {
320 return m_pending_count.load(std::memory_order_relaxed) > 0;
321 }
322
323 /**
324 * @brief Gets a processor of a specific type from the buffer's processing pipeline
325 * @tparam T Type of the processor to retrieve
326 * @param buffer Buffer to get the processor from
327 * @return Shared pointer to the processor if found, nullptr otherwise
328 *
329 * This method searches for a processor of the specified type in the buffer's
330 * transformation sequence. If found, it returns a shared pointer to the processor,
331 * allowing type-safe access to specialized functionality.
332 */
333 template <typename T>
334 std::shared_ptr<T> get_processor(const std::shared_ptr<Buffer>& buffer) const
335 {
336 auto processors = get_processors(buffer);
337
338 for (auto& processor : processors) {
339 if (auto t_processor = std::dynamic_pointer_cast<T>(processor)) {
340 return t_processor;
341 }
342 }
343 return nullptr;
344 }
345
346protected:
347 bool add_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason = nullptr);
348 void remove_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
349
350private:
351 /**
352 * @brief Internal processing method for non-owning buffer contexts
353 * @param buffer Buffer to process
354 *
355 * This method is used internally by BufferProcessingChain to process buffers
356 * that are not owned by the chain itself. It ensures that the processor's
357 * processing function is called in a thread-safe manner, managing the
358 * active processing state to prevent concurrent access issues.
359 */
360 void process_non_owning(const std::shared_ptr<Buffer>& buffer);
361
362 /**
363 * @brief Validates the processing token against the chain's preferred token
364 */
365 void cleanup_rejected_processors(const std::shared_ptr<Buffer>& buffer);
366
367 /**
368 * @brief Process pending processor operations
369 */
370 void process_pending_processor_operations();
371
372 bool queue_pending_processor_op(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, bool is_addition, std::string* rejection_reason = nullptr);
373
374 /**
375 * @brief Map of buffers to their processor sequences
376 *
377 * Each buffer has its own vector of processors that are applied in order when
378 * the buffer is processed. The sequence may be optimized for performance while
379 * maintaining the logical order of transformations.
380 */
381 std::unordered_map<std::shared_ptr<Buffer>, std::vector<std::shared_ptr<BufferProcessor>>> m_buffer_processors;
382
383 /**
384 * @brief Map of buffers to processors that are conditionally applied
385 */
386 std::unordered_map<std::shared_ptr<Buffer>, std::unordered_set<std::shared_ptr<BufferProcessor>>> m_conditional_processors;
387
388 /**
389 * @brief Map of buffers to processors pending removal
390 */
391 std::unordered_map<std::shared_ptr<Buffer>, std::unordered_set<std::shared_ptr<BufferProcessor>>> m_pending_removal;
392
393 /**
394 * @brief Map of buffers to their preprocessors
395 *
396 * Each buffer can have one preprocessor that is applied before the main
397 * processing sequence to prepare the data.
398 */
399 std::unordered_map<std::shared_ptr<Buffer>, std::shared_ptr<BufferProcessor>> m_preprocessors;
400
401 /**
402 * @brief Map of buffers to their postprocessors
403 *
404 * Each buffer can have one postprocessor that is applied after the main
405 * processing sequence to finalize the data.
406 */
407 std::unordered_map<std::shared_ptr<Buffer>, std::shared_ptr<BufferProcessor>> m_postprocessors;
408
409 /**
410 * @brief Map of buffers to their final processors
411 *
412 * Each buffer can have one final processor that is applied after the main
413 * processing sequence with guaranteed execution, regardless of optimization strategies.
414 */
415 std::unordered_map<std::shared_ptr<Buffer>, std::shared_ptr<BufferProcessor>> m_final_processors;
416
417 /**
418 * @brief Preferred processing token for this chain
419 *
420 * This token represents the preferred processing domain that influences how processors
421 * are executed, including backend selection and execution strategy. It can be used to
422 * optimize the entire pipeline based on the expected data type, processing requirements,
423 * and available hardware resources.
424 */
425 mutable ProcessingToken m_token_filter_mask { ProcessingToken::AUDIO_BACKEND };
426
427 /**
428 * @brief Token enforcement strategy for this chain
429 *
430 * This strategy determines how the processing token is enforced across the pipeline,
431 * including whether to filter processors based on their compatibility with the token.
432 * The default strategy is FILTERED, which applies the token only to compatible processors.
433 */
434 TokenEnforcementStrategy m_enforcement_strategy { TokenEnforcementStrategy::FILTERED };
435
436 static constexpr size_t MAX_PENDING_PROCESSORS = 32;
437
439 std::atomic<bool> active { false };
440 std::shared_ptr<BufferProcessor> processor;
441 std::shared_ptr<Buffer> buffer;
442 bool is_addition { true }; // true = add, false = remove
443 };
444
445 std::atomic<bool> m_is_processing;
446
447 PendingProcessorOp m_pending_ops[MAX_PENDING_PROCESSORS];
448
449 std::atomic<uint32_t> m_pending_count { 0 };
450};
451}
static MayaFlux::Nodes::ProcessingToken token
Definition Timers.cpp:8
void set_preferred_token(ProcessingToken token)
Sets the preferred processing token for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_final_processors
Map of buffers to their final processors.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_pending_removal
Map of buffers to processors pending removal.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > m_buffer_processors
Map of buffers to their processor sequences.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > get_chain() const
Gets the entire transformation pipeline structure.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_preprocessors
Map of buffers to their preprocessors.
std::shared_ptr< T > get_processor(const std::shared_ptr< Buffer > &buffer) const
Gets a processor of a specific type from the buffer's processing pipeline.
ProcessingToken get_preferred_token() const
Gets the preferred processing token for this chain.
TokenEnforcementStrategy get_enforcement_strategy() const
Gets the current token enforcement strategy for this chain.
void set_enforcement_strategy(TokenEnforcementStrategy strategy)
Sets the token enforcement strategy for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_conditional_processors
Map of buffers to processors that are conditionally applied.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_postprocessors
Map of buffers to their postprocessors.
Advanced pipeline manager for multi-stage buffer transformations with backend optimization.
Central computational transformation interface for continuous buffer processing.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
TokenEnforcementStrategy
Defines how strictly processing token requirements are enforced in buffer processing chains.
void add_processor(const std::shared_ptr< Buffers::BufferProcessor > &processor, const std::shared_ptr< Buffers::Buffer > &buffer, Buffers::ProcessingToken token)
Adds a processor to a specific buffer.
Definition Graph.cpp:116