MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferProcessingChain.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "BufferProcessor.hpp"
4
5namespace MayaFlux::Buffers {
6
7/**
8 * @class BufferProcessingChain
9 * @brief Advanced pipeline manager for multi-stage buffer transformations with backend optimization
10 *
11 * BufferProcessingChain organizes multiple BufferProcessor objects into sophisticated transformation
12 * pipelines for one or more buffers. This system enables complex multi-stage computational processes
13 * to be applied to data in a controlled, deterministic order while leveraging the expanded capabilities
14 * of modern BufferProcessors for optimal performance and backend utilization.
15 *
16 * The chain implements an intelligent directed acyclic graph (DAG) of transformations, maintaining
17 * separate processor sequences for each buffer while enabling advanced features:
18 *
19 * **Backend-Aware Processing:**
20 * - Automatic backend optimization based on processor recommendations
21 * - Intelligent batching of compatible processors for parallel execution
22 * - Dynamic backend switching to optimize processing pipelines
23 * - Resource-aware scheduling to prevent backend conflicts
24 *
25 * **Multi-Modal Data Support:**
26 * - Seamless processing of different data types (audio, video, texture) within unified chains
27 * - Type-safe processor assignment and validation
28 * - Cross-domain transformations between different buffer types
29 * - Unified interface for heterogeneous data processing
30 *
31 * **Performance Optimization:**
32 * - Processor compatibility validation and automatic optimization
33 * - Complexity-based scheduling for optimal resource utilization
34 * - Parallel execution of independent processing stages
35 * - Memory layout optimization for improved cache performance
36 *
37 * Key features:
38 * - Enables construction of complex computational pipelines with backend optimization
39 * - Supports both parallel and sequential transformation patterns with automatic selection
40 * - Preserves transformation order while optimizing execution strategy
41 * - Provides special "final" processors for guaranteed post-processing operations
42 * - Allows dynamic reconfiguration of transformation pipelines at runtime
43 * - Leverages processor agency for optimal backend selection and resource utilization
44 */
45class MAYAFLUX_API BufferProcessingChain {
46public:
47 friend class BufferProcessor;
48
49 /**
50 * @brief Adds a processor to the transformation pipeline for a specific buffer
51 * @param processor Processor to add
52 * @param buffer Buffer to associate with this processor
53 * @param rejection_reason Optional reason for rejection if the processor cannot be added
54 * @return true if the processor was successfully added, false if it was rejected
55 *
56 * The processor is added to the end of the transformation sequence for the specified buffer.
57 * If this is the first processor for the buffer, a new sequence is created. The chain
58 * performs intelligent validation and optimization:
59 *
60 * - **Compatibility Validation**: Ensures the processor can handle the buffer's data type
61 * - **Backend Analysis**: Analyzes processor backend preferences for optimization opportunities
62 * - **Pipeline Optimization**: May reorder or batch processors for improved performance
63 * - **Resource Planning**: Allocates necessary resources for the processor's execution
64 */
65 bool add_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason = nullptr);
66
67 /**
68 * @brief Removes a processor from the pipeline for a specific buffer
69 * @param processor Processor to remove
70 * @param buffer Buffer to remove the processor from
71 *
72 * If the processor is found in the buffer's transformation sequence, it is removed
73 * and its on_detach method is called. The chain also performs cleanup optimization:
74 *
75 * - **Resource Cleanup**: Ensures all processor resources are properly released
76 * - **Pipeline Reoptimization**: Rebuilds optimization plans without the removed processor
77 * - **Backend Restoration**: Restores default backends if the processor was overriding them
78 */
79 void remove_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
80
81 /**
82 * @brief Applies the transformation pipeline to a buffer with intelligent execution
83 * @param buffer Buffer to transform
84 *
85 * Applies each processor in the buffer's sequence using an optimized execution strategy.
86 * The chain leverages processor capabilities for maximum performance:
87 *
88 * - **Backend Optimization**: Uses processor-recommended backends when beneficial
89 * - **Parallel Execution**: Executes compatible processors in parallel when possible
90 * - **Resource Management**: Optimally allocates CPU, GPU, and memory resources
91 * - **Error Handling**: Provides robust error recovery and fallback mechanisms
92 *
93 * This does not include the final processor, which must be applied separately with
94 * process_final() to ensure proper pipeline completion.
95 */
96 void process(const std::shared_ptr<Buffer>& buffer);
97
98 /**
99 * @brief Applies preprocessors, processing chain, post processors and final processors sequentially to a buffer
100 * @param buffer Buffer to transform
101 *
102 * Use this when explicit control of order is not needed, and you want to ensure that all stages
103 * of the processing pipeline are applied in a strict sequence. This method guarantees that
104 * preprocessors, main processors, postprocessors, and final processors are executed one after
105 * the other, maintaining the exact order of operations as defined in the chain.
106 */
107 void process_complete(const std::shared_ptr<Buffer>& buffer);
108
109 /**
110 * @brief Sets a preprocessor to be applied before the main pipeline
111 * @param processor Preprocessor to add
112 * @param buffer Buffer to associate with this preprocessor
113 *
114 * The preprocessor is applied before all regular processors when process() is called.
115 * This is useful for initial data preparation steps that must occur prior to the
116 * main transformation sequence, such as format conversion, normalization, or validation.
117 * NOTE: This runs after the buffer's own default processor. If you wish this to be the
118 * preprocessor, remove the default processor first.
119 * This is done to allow buffers to configure their own default processing behavior.
120 * i.e NodeBuffer WILL acquire node data using its default processor before any processing chain preprocessor.
121 */
122 void add_preprocessor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
123
124 /**
125 * @brief Sets a postprocessor to be applied after the main pipeline
126 * @param processor Postprocessor to add
127 * @param buffer Buffer to associate with this postprocessor
128 *
129 * The postprocessor is applied after all regular processors when process() is called.
130 * This is useful for final data adjustments that must occur immediately after the
131 * main transformation sequence, such as clamping values, applying effects, or cleanup.
132 * NOTE: This is different from the final processor, and runs before it.
133 */
134 void add_postprocessor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
135
136 /**
137 * @brief Sets a special processor to be applied after the main pipeline
138 * @param processor Final processor to add
139 * @param buffer Buffer to associate with this final processor
140 *
141 * The final processor is applied after all regular processors when process_final() is called.
142 * This is essential for operations like normalization, boundary enforcement, format conversion,
143 * or validation that must be applied as the last step in a transformation pipeline, regardless
144 * of the optimization strategies used for the main processing sequence.
145 */
146 void add_final_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
147
148 /**
149 * @brief Checks if a buffer has any processors in its pipeline
150 * @param buffer Buffer to check
151 * @return true if the buffer has processors, false otherwise
152 *
153 * This method enables dynamic pipeline management and optimization decisions
154 * based on the presence of processing stages for specific buffers.
155 */
156 bool has_processors(const std::shared_ptr<Buffer>& buffer) const;
157
158 /**
159 * @brief Gets all processors in a buffer's transformation pipeline
160 * @param buffer Buffer to get processors for
161 * @return Constant reference to the vector of processors
162 *
163 * Returns an empty vector if the buffer has no processors. This provides
164 * access to the processor sequence for analysis, optimization, or debugging
165 * purposes while maintaining the integrity of the processing pipeline.
166 */
167 const std::vector<std::shared_ptr<BufferProcessor>>& get_processors(const std::shared_ptr<Buffer>& buffer) const;
168
169 /**
170 * @brief Returns the preprocessor for a buffer, or nullptr if none is set.
171 * @param buffer Buffer to query
172 */
173 [[nodiscard]] std::shared_ptr<BufferProcessor>
174 get_preprocessor(const std::shared_ptr<Buffer>& buffer) const;
175
176 /**
177 * @brief Returns the postprocessor for a buffer, or nullptr if none is set.
178 * @param buffer Buffer to query
179 */
180 [[nodiscard]] std::shared_ptr<BufferProcessor>
181 get_postprocessor(const std::shared_ptr<Buffer>& buffer) const;
182
183 /**
184 * @brief Returns the final processor for a buffer, or nullptr if none is set.
185 * @param buffer Buffer to query
186 */
187 [[nodiscard]] std::shared_ptr<BufferProcessor>
188 get_final_processor(const std::shared_ptr<Buffer>& buffer) const;
189
190 /**
191 * @brief Gets the entire transformation pipeline structure
192 * @return Map of buffers to their processor sequences
193 *
194 * This provides access to the internal structure of the pipeline, mapping each
195 * buffer to its sequence of transformation processors. Essential for pipeline
196 * analysis, optimization planning, and system introspection capabilities.
197 */
198 inline std::unordered_map<std::shared_ptr<Buffer>, std::vector<std::shared_ptr<BufferProcessor>>> get_chain() const { return m_buffer_processors; }
199
200 /**
201 * @brief Combines another processing pipeline into this one with optimization
202 * @param other Chain to merge into this one
203 *
204 * Adds all processors from the other chain to this one, preserving their buffer
205 * associations and order while performing intelligent optimization. This enables
206 * the composition of complex transformation pipelines from simpler, reusable components:
207 *
208 * - **Compatibility Analysis**: Validates that merged processors are compatible
209 * - **Optimization Opportunities**: Identifies potential performance improvements in the combined pipeline
210 * - **Resource Consolidation**: Optimizes resource usage across the merged processors
211 * - **Backend Harmonization**: Resolves any backend conflicts between the chains
212 */
213 void merge_chain(const std::shared_ptr<BufferProcessingChain>& other);
214
215 /**
216 * @brief Applies the preprocessor to a buffer
217 * @param buffer Buffer to preprocess
218 *
219 * If the buffer has a preprocessor, it is applied before the main processing sequence.
220 * This is useful for initial data preparation steps that must occur prior to the
221 * main transformation sequence, such as format conversion, normalization, or validation.
222 */
223 void preprocess(const std::shared_ptr<Buffer>& buffer);
224
225 /**
226 * @brief Applies the postprocessor to a buffer
227 * @param buffer Buffer to postprocess
228 *
229 * If the buffer has a postprocessor, it is applied after the main processing sequence.
230 * This is useful for final data adjustments that must occur immediately after the
231 * main transformation sequence, such as clamping values, applying effects, or cleanup.
232 */
233 void postprocess(const std::shared_ptr<Buffer>& buffer);
234
235 /**
236 * @brief Applies the final processor to a buffer with guaranteed execution
237 * @param buffer Buffer to process
238 *
239 * If the buffer has a final processor, it is applied with guaranteed execution regardless
240 * of any optimization strategies or backend considerations. This is typically called after
241 * process() to apply final-stage transformations like normalization, boundary enforcement,
242 * or format validation that must complete successfully for pipeline integrity.
243 */
244 void process_final(const std::shared_ptr<Buffer>& buffer);
245
246 /**
247 * @brief Sets the preferred processing token for this chain
248 * @param token Preferred processing token to set
249 *
250 * This method allows the chain to specify a preferred processing domain that influences
251 * how processors are executed, including backend selection and execution strategy.
252 * The token can be used to optimize the entire pipeline based on the expected data type,
253 * processing requirements, and available hardware resources.
254 */
255 inline void set_preferred_token(ProcessingToken token) { m_token_filter_mask = token; }
256
257 /** * @brief Gets the preferred processing token for this chain
258 * @return Current preferred processing token
259 *
260 * Returns the currently set preferred processing token, which can be used by processors
261 * to optimize their execution strategies and backend selections. This token represents the
262 * overall processing domain that the chain aims to optimize for.
263 */
265 {
266 return m_token_filter_mask;
267 }
268
269 /**
270 * @brief Sets the token enforcement strategy for this chain
271 * @param strategy Token enforcement strategy to set
272 *
273 * This method allows the chain to specify how the processing token is enforced across the pipeline,
274 * including whether to filter processors based on their compatibility with the token. The default
275 * strategy is FILTERED, which applies the token only to compatible processors.
276 */
278 {
279 m_enforcement_strategy = strategy;
280 }
281
282 /**
283 * @brief Gets the current token enforcement strategy for this chain
284 * @return Current token enforcement strategy
285 *
286 * Returns the currently set token enforcement strategy, which determines how the processing token
287 * is applied to processors in the pipeline. This can influence whether processors are filtered
288 * based on their compatibility with the preferred processing token.
289 */
291 {
292 return m_enforcement_strategy;
293 }
294
295 /**
296 * @brief Optimizes the processing pipeline for improved performance
297 * @param buffer Buffer to optimize the pipeline for
298 *
299 * Analyzes the current processor sequence and applies various optimization strategies:
300 *
301 * - **Backend Consolidation**: Groups processors by preferred backend for batched execution
302 * - **Parallel Execution Planning**: Identifies processors that can run concurrently
303 * - **Memory Layout Optimization**: Optimizes data access patterns for cache efficiency
304 * - **Resource Balancing**: Balances processor load across available hardware resources
305 */
306 void optimize_for_tokens(const std::shared_ptr<Buffer>& buffer);
307
308 /**
309 * @brief Analyzes token compatibility across all processors in the chain
310 * @return Vector of TokenCompatibilityReport objects summarizing processor compatibility
311 *
312 * This method generates a detailed report on how each processor in the chain
313 * aligns with the preferred processing token, including compatibility status,
314 * enforcement strategy, and any processors that will be skipped or pending removal.
315 * Useful for debugging, optimization planning, and ensuring pipeline integrity.
316 */
317 std::vector<TokenCompatibilityReport> analyze_token_compatibility() const;
318
319 /** * @brief Validates all processors in the chain against the preferred processing token
320 * @param incompatibility_reasons Optional vector to store reasons for any incompatibilities
321 * @return true if all processors are compatible with the preferred token, false otherwise
322 *
323 * This method checks each processor in the chain against the preferred processing token,
324 * ensuring that all processors can execute under the current backend and execution strategy.
325 * If any incompatibilities are found, they can be reported through the provided vector.
326 */
327 bool validate_all_processors(std::vector<std::string>* incompatibility_reasons = nullptr) const;
328
329 /**
330 * @brief Enforces the chain's preferred processing token on all processors
331 *
332 * This method ensures that all processors in the chain are compatible with the
333 * preferred processing token, applying any necessary optimizations or removals
334 * of incompatible processors. It is typically called after setting a new preferred
335 * token or changing the enforcement strategy to ensure the pipeline remains valid.
336 */
337 void enforce_chain_token_on_processors();
338
339 inline bool has_pending_operations() const
340 {
341 return m_pending_count.load(std::memory_order_relaxed) > 0;
342 }
343
344 /**
345 * @brief Gets a processor of a specific type from the buffer's processing pipeline
346 * @tparam T Type of the processor to retrieve
347 * @param buffer Buffer to get the processor from
348 * @return Shared pointer to the processor if found, nullptr otherwise
349 *
350 * This method searches for a processor of the specified type in the buffer's
351 * transformation sequence. If found, it returns a shared pointer to the processor,
352 * allowing type-safe access to specialized functionality.
353 */
354 template <typename T>
355 std::shared_ptr<T> get_processor(const std::shared_ptr<Buffer>& buffer) const
356 {
357 auto processors = get_processors(buffer);
358
359 for (auto& processor : processors) {
360 if (auto t_processor = std::dynamic_pointer_cast<T>(processor)) {
361 return t_processor;
362 }
363 }
364 return nullptr;
365 }
366
367protected:
368 bool add_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason = nullptr);
369 void remove_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
370
371private:
372 /**
373 * @brief Internal processing method for non-owning buffer contexts
374 * @param buffer Buffer to process
375 *
376 * This method is used internally by BufferProcessingChain to process buffers
377 * that are not owned by the chain itself. It ensures that the processor's
378 * processing function is called in a thread-safe manner, managing the
379 * active processing state to prevent concurrent access issues.
380 */
381 void process_non_owning(const std::shared_ptr<Buffer>& buffer);
382
383 /**
384 * @brief Validates the processing token against the chain's preferred token
385 */
386 void cleanup_rejected_processors(const std::shared_ptr<Buffer>& buffer);
387
388 /**
389 * @brief Process pending processor operations
390 */
391 void process_pending_processor_operations();
392
393 bool queue_pending_processor_op(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, bool is_addition, std::string* rejection_reason = nullptr);
394
395 /**
396 * @brief Map of buffers to their processor sequences
397 *
398 * Each buffer has its own vector of processors that are applied in order when
399 * the buffer is processed. The sequence may be optimized for performance while
400 * maintaining the logical order of transformations.
401 */
402 std::unordered_map<std::shared_ptr<Buffer>, std::vector<std::shared_ptr<BufferProcessor>>> m_buffer_processors;
403
404 /**
405 * @brief Map of buffers to processors that are conditionally applied
406 */
407 std::unordered_map<std::shared_ptr<Buffer>, std::unordered_set<std::shared_ptr<BufferProcessor>>> m_conditional_processors;
408
409 /**
410 * @brief Map of buffers to processors pending removal
411 */
412 std::unordered_map<std::shared_ptr<Buffer>, std::unordered_set<std::shared_ptr<BufferProcessor>>> m_pending_removal;
413
414 /**
415 * @brief Map of buffers to their preprocessors
416 *
417 * Each buffer can have one preprocessor that is applied before the main
418 * processing sequence to prepare the data.
419 */
420 std::unordered_map<std::shared_ptr<Buffer>, std::shared_ptr<BufferProcessor>> m_preprocessors;
421
422 /**
423 * @brief Map of buffers to their postprocessors
424 *
425 * Each buffer can have one postprocessor that is applied after the main
426 * processing sequence to finalize the data.
427 */
428 std::unordered_map<std::shared_ptr<Buffer>, std::shared_ptr<BufferProcessor>> m_postprocessors;
429
430 /**
431 * @brief Map of buffers to their final processors
432 *
433 * Each buffer can have one final processor that is applied after the main
434 * processing sequence with guaranteed execution, regardless of optimization strategies.
435 */
436 std::unordered_map<std::shared_ptr<Buffer>, std::shared_ptr<BufferProcessor>> m_final_processors;
437
438 /**
439 * @brief Preferred processing token for this chain
440 *
441 * This token represents the preferred processing domain that influences how processors
442 * are executed, including backend selection and execution strategy. It can be used to
443 * optimize the entire pipeline based on the expected data type, processing requirements,
444 * and available hardware resources.
445 */
446 mutable ProcessingToken m_token_filter_mask { ProcessingToken::AUDIO_BACKEND };
447
448 /**
449 * @brief Token enforcement strategy for this chain
450 *
451 * This strategy determines how the processing token is enforced across the pipeline,
452 * including whether to filter processors based on their compatibility with the token.
453 * The default strategy is FILTERED, which applies the token only to compatible processors.
454 */
455 TokenEnforcementStrategy m_enforcement_strategy { TokenEnforcementStrategy::FILTERED };
456
457 static constexpr size_t MAX_PENDING_PROCESSORS = 32;
458
460 std::atomic<bool> active { false };
461 std::shared_ptr<BufferProcessor> processor;
462 std::shared_ptr<Buffer> buffer;
463 bool is_addition { true }; // true = add, false = remove
464 };
465
466 std::atomic<bool> m_is_processing;
467
468 PendingProcessorOp m_pending_ops[MAX_PENDING_PROCESSORS];
469
470 std::atomic<uint32_t> m_pending_count { 0 };
471};
472}
void set_preferred_token(ProcessingToken token)
Sets the preferred processing token for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_final_processors
Map of buffers to their final processors.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_pending_removal
Map of buffers to processors pending removal.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > m_buffer_processors
Map of buffers to their processor sequences.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > get_chain() const
Gets the entire transformation pipeline structure.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_preprocessors
Map of buffers to their preprocessors.
std::shared_ptr< T > get_processor(const std::shared_ptr< Buffer > &buffer) const
Gets a processor of a specific type from the buffer's processing pipeline.
ProcessingToken get_preferred_token() const
Gets the preferred processing token for this chain.
TokenEnforcementStrategy get_enforcement_strategy() const
Gets the current token enforcement strategy for this chain.
void set_enforcement_strategy(TokenEnforcementStrategy strategy)
Sets the token enforcement strategy for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_conditional_processors
Map of buffers to processors that are conditionally applied.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_postprocessors
Map of buffers to their postprocessors.
Advanced pipeline manager for multi-stage buffer transformations with backend optimization.
Central computational transformation interface for continuous buffer processing.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
TokenEnforcementStrategy
Defines how strictly processing token requirements are enforced in buffer processing chains.
void remove_processor(const std::shared_ptr< Buffers::BufferProcessor > &processor, const std::shared_ptr< Buffers::Buffer > &buffer)
Removes a processor from a specific buffer.
Definition Graph.cpp:152
void add_processor(const std::shared_ptr< Buffers::BufferProcessor > &processor, const std::shared_ptr< Buffers::Buffer > &buffer, Buffers::ProcessingToken token)
Adds a processor to a specific buffer.
Definition Graph.cpp:137