MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferProcessingChain.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "BufferProcessor.hpp"
4
5namespace MayaFlux::Buffers {
6
7/**
8 * @class BufferProcessingChain
9 * @brief Advanced pipeline manager for multi-stage buffer transformations with backend optimization
10 *
11 * BufferProcessingChain organizes multiple BufferProcessor objects into sophisticated transformation
12 * pipelines for one or more buffers. This system enables complex multi-stage computational processes
13 * to be applied to data in a controlled, deterministic order while leveraging the expanded capabilities
14 * of modern BufferProcessors for optimal performance and backend utilization.
15 *
16 * The chain implements an intelligent directed acyclic graph (DAG) of transformations, maintaining
17 * separate processor sequences for each buffer while enabling advanced features:
18 *
19 * **Backend-Aware Processing:**
20 * - Automatic backend optimization based on processor recommendations
21 * - Intelligent batching of compatible processors for parallel execution
22 * - Dynamic backend switching to optimize processing pipelines
23 * - Resource-aware scheduling to prevent backend conflicts
24 *
25 * **Multi-Modal Data Support:**
26 * - Seamless processing of different data types (audio, video, texture) within unified chains
27 * - Type-safe processor assignment and validation
28 * - Cross-domain transformations between different buffer types
29 * - Unified interface for heterogeneous data processing
30 *
31 * **Performance Optimization:**
32 * - Processor compatibility validation and automatic optimization
33 * - Complexity-based scheduling for optimal resource utilization
34 * - Parallel execution of independent processing stages
35 * - Memory layout optimization for improved cache performance
36 *
37 * Key features:
38 * - Enables construction of complex computational pipelines with backend optimization
39 * - Supports both parallel and sequential transformation patterns with automatic selection
40 * - Preserves transformation order while optimizing execution strategy
41 * - Provides special "final" processors for guaranteed post-processing operations
42 * - Allows dynamic reconfiguration of transformation pipelines at runtime
43 * - Leverages processor agency for optimal backend selection and resource utilization
44 */
45class MAYAFLUX_API BufferProcessingChain {
46public:
47 friend class BufferProcessor;
48
49 /**
50 * @brief Adds a processor to the transformation pipeline for a specific buffer
51 * @param processor Processor to add
52 * @param buffer Buffer to associate with this processor
53 * @param rejection_reason Optional reason for rejection if the processor cannot be added
54 * @return true if the processor was successfully added, false if it was rejected
55 *
56 * The processor is added to the end of the transformation sequence for the specified buffer.
57 * If this is the first processor for the buffer, a new sequence is created. The chain
58 * performs intelligent validation and optimization:
59 *
60 * - **Compatibility Validation**: Ensures the processor can handle the buffer's data type
61 * - **Backend Analysis**: Analyzes processor backend preferences for optimization opportunities
62 * - **Pipeline Optimization**: May reorder or batch processors for improved performance
63 * - **Resource Planning**: Allocates necessary resources for the processor's execution
64 */
65 bool add_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason = nullptr);
66
67 /**
68 * @brief Removes a processor from the pipeline for a specific buffer
69 * @param processor Processor to remove
70 * @param buffer Buffer to remove the processor from
71 *
72 * If the processor is found in the buffer's transformation sequence, it is removed
73 * and its on_detach method is called. The chain also performs cleanup optimization:
74 *
75 * - **Resource Cleanup**: Ensures all processor resources are properly released
76 * - **Pipeline Reoptimization**: Rebuilds optimization plans without the removed processor
77 * - **Backend Restoration**: Restores default backends if the processor was overriding them
78 */
79 void remove_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
80
81 /**
82 * @brief Applies the transformation pipeline to a buffer with intelligent execution
83 * @param buffer Buffer to transform
84 *
85 * Applies each processor in the buffer's sequence using an optimized execution strategy.
86 * The chain leverages processor capabilities for maximum performance:
87 *
88 * - **Backend Optimization**: Uses processor-recommended backends when beneficial
89 * - **Parallel Execution**: Executes compatible processors in parallel when possible
90 * - **Resource Management**: Optimally allocates CPU, GPU, and memory resources
91 * - **Error Handling**: Provides robust error recovery and fallback mechanisms
92 *
93 * This does not include the final processor, which must be applied separately with
94 * process_final() to ensure proper pipeline completion.
95 */
96 void process(const std::shared_ptr<Buffer>& buffer);
97
98 /**
99 * @brief Sets a special processor to be applied after the main pipeline
100 * @param processor Final processor to add
101 * @param buffer Buffer to associate with this final processor
102 *
103 * The final processor is applied after all regular processors when process_final() is called.
104 * This is essential for operations like normalization, boundary enforcement, format conversion,
105 * or validation that must be applied as the last step in a transformation pipeline, regardless
106 * of the optimization strategies used for the main processing sequence.
107 */
108 void add_final_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
109
110 /**
111 * @brief Checks if a buffer has any processors in its pipeline
112 * @param buffer Buffer to check
113 * @return true if the buffer has processors, false otherwise
114 *
115 * This method enables dynamic pipeline management and optimization decisions
116 * based on the presence of processing stages for specific buffers.
117 */
118 bool has_processors(const std::shared_ptr<Buffer>& buffer) const;
119
120 /**
121 * @brief Gets all processors in a buffer's transformation pipeline
122 * @param buffer Buffer to get processors for
123 * @return Constant reference to the vector of processors
124 *
125 * Returns an empty vector if the buffer has no processors. This provides
126 * access to the processor sequence for analysis, optimization, or debugging
127 * purposes while maintaining the integrity of the processing pipeline.
128 */
129 const std::vector<std::shared_ptr<BufferProcessor>>& get_processors(const std::shared_ptr<Buffer>& buffer) const;
130
131 /**
132 * @brief Gets the entire transformation pipeline structure
133 * @return Map of buffers to their processor sequences
134 *
135 * This provides access to the internal structure of the pipeline, mapping each
136 * buffer to its sequence of transformation processors. Essential for pipeline
137 * analysis, optimization planning, and system introspection capabilities.
138 */
139 inline std::unordered_map<std::shared_ptr<Buffer>, std::vector<std::shared_ptr<BufferProcessor>>> get_chain() const { return m_buffer_processors; }
140
141 /**
142 * @brief Combines another processing pipeline into this one with optimization
143 * @param other Chain to merge into this one
144 *
145 * Adds all processors from the other chain to this one, preserving their buffer
146 * associations and order while performing intelligent optimization. This enables
147 * the composition of complex transformation pipelines from simpler, reusable components:
148 *
149 * - **Compatibility Analysis**: Validates that merged processors are compatible
150 * - **Optimization Opportunities**: Identifies potential performance improvements in the combined pipeline
151 * - **Resource Consolidation**: Optimizes resource usage across the merged processors
152 * - **Backend Harmonization**: Resolves any backend conflicts between the chains
153 */
154 void merge_chain(const std::shared_ptr<BufferProcessingChain>& other);
155
156 /**
157 * @brief Applies the final processor to a buffer with guaranteed execution
158 * @param buffer Buffer to process
159 *
160 * If the buffer has a final processor, it is applied with guaranteed execution regardless
161 * of any optimization strategies or backend considerations. This is typically called after
162 * process() to apply final-stage transformations like normalization, boundary enforcement,
163 * or format validation that must complete successfully for pipeline integrity.
164 */
165 void process_final(const std::shared_ptr<Buffer>& buffer);
166
167 /**
168 * @brief Sets the preferred processing token for this chain
169 * @param token Preferred processing token to set
170 *
171 * This method allows the chain to specify a preferred processing domain that influences
172 * how processors are executed, including backend selection and execution strategy.
173 * The token can be used to optimize the entire pipeline based on the expected data type,
174 * processing requirements, and available hardware resources.
175 */
176 inline void set_preferred_token(ProcessingToken token) { m_token_filter_mask = token; }
177
178 /** * @brief Gets the preferred processing token for this chain
179 * @return Current preferred processing token
180 *
181 * Returns the currently set preferred processing token, which can be used by processors
182 * to optimize their execution strategies and backend selections. This token represents the
183 * overall processing domain that the chain aims to optimize for.
184 */
186 {
187 return m_token_filter_mask;
188 }
189
190 /**
191 * @brief Sets the token enforcement strategy for this chain
192 * @param strategy Token enforcement strategy to set
193 *
194 * This method allows the chain to specify how the processing token is enforced across the pipeline,
195 * including whether to filter processors based on their compatibility with the token. The default
196 * strategy is FILTERED, which applies the token only to compatible processors.
197 */
199 {
200 m_enforcement_strategy = strategy;
201 }
202
203 /**
204 * @brief Gets the current token enforcement strategy for this chain
205 * @return Current token enforcement strategy
206 *
207 * Returns the currently set token enforcement strategy, which determines how the processing token
208 * is applied to processors in the pipeline. This can influence whether processors are filtered
209 * based on their compatibility with the preferred processing token.
210 */
212 {
213 return m_enforcement_strategy;
214 }
215
216 /**
217 * @brief Optimizes the processing pipeline for improved performance
218 * @param buffer Buffer to optimize the pipeline for
219 *
220 * Analyzes the current processor sequence and applies various optimization strategies:
221 *
222 * - **Backend Consolidation**: Groups processors by preferred backend for batched execution
223 * - **Parallel Execution Planning**: Identifies processors that can run concurrently
224 * - **Memory Layout Optimization**: Optimizes data access patterns for cache efficiency
225 * - **Resource Balancing**: Balances processor load across available hardware resources
226 */
227 void optimize_for_tokens(const std::shared_ptr<Buffer>& buffer);
228
229 /**
230 * @brief Analyzes token compatibility across all processors in the chain
231 * @return Vector of TokenCompatibilityReport objects summarizing processor compatibility
232 *
233 * This method generates a detailed report on how each processor in the chain
234 * aligns with the preferred processing token, including compatibility status,
235 * enforcement strategy, and any processors that will be skipped or pending removal.
236 * Useful for debugging, optimization planning, and ensuring pipeline integrity.
237 */
238 std::vector<TokenCompatibilityReport> analyze_token_compatibility() const;
239
240 /** * @brief Validates all processors in the chain against the preferred processing token
241 * @param incompatibility_reasons Optional vector to store reasons for any incompatibilities
242 * @return true if all processors are compatible with the preferred token, false otherwise
243 *
244 * This method checks each processor in the chain against the preferred processing token,
245 * ensuring that all processors can execute under the current backend and execution strategy.
246 * If any incompatibilities are found, they can be reported through the provided vector.
247 */
248 bool validate_all_processors(std::vector<std::string>* incompatibility_reasons = nullptr) const;
249
250 /**
251 * @brief Enforces the chain's preferred processing token on all processors
252 *
253 * This method ensures that all processors in the chain are compatible with the
254 * preferred processing token, applying any necessary optimizations or removals
255 * of incompatible processors. It is typically called after setting a new preferred
256 * token or changing the enforcement strategy to ensure the pipeline remains valid.
257 */
258 void enforce_chain_token_on_processors();
259
260 inline bool has_pending_operations() const
261 {
262 return m_pending_count.load(std::memory_order_relaxed) > 0;
263 }
264
265 /**
266 * @brief Gets a processor of a specific type from the buffer's processing pipeline
267 * @tparam T Type of the processor to retrieve
268 * @param buffer Buffer to get the processor from
269 * @return Shared pointer to the processor if found, nullptr otherwise
270 *
271 * This method searches for a processor of the specified type in the buffer's
272 * transformation sequence. If found, it returns a shared pointer to the processor,
273 * allowing type-safe access to specialized functionality.
274 */
275 template <typename T>
276 std::shared_ptr<T> get_processor(const std::shared_ptr<Buffer>& buffer) const
277 {
278 auto processors = get_processors(buffer);
279
280 for (auto& processor : processors) {
281 if (auto t_processor = std::dynamic_pointer_cast<T>(processor)) {
282 return t_processor;
283 }
284 }
285 return nullptr;
286 }
287
288protected:
289 bool add_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason = nullptr);
290 void remove_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer);
291
292private:
293 /**
294 * @brief Internal processing method for non-owning buffer contexts
295 * @param buffer Buffer to process
296 *
297 * This method is used internally by BufferProcessingChain to process buffers
298 * that are not owned by the chain itself. It ensures that the processor's
299 * processing function is called in a thread-safe manner, managing the
300 * active processing state to prevent concurrent access issues.
301 */
302 void process_non_owning(const std::shared_ptr<Buffer>& buffer);
303
304 /**
305 * @brief Validates the processing token against the chain's preferred token
306 */
307 void cleanup_rejected_processors(const std::shared_ptr<Buffer>& buffer);
308
309 /**
310 * @brief Process pending processor operations
311 */
312 void process_pending_processor_operations();
313
314 bool queue_pending_processor_op(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, bool is_addition, std::string* rejection_reason = nullptr);
315
316 /**
317 * @brief Map of buffers to their processor sequences
318 *
319 * Each buffer has its own vector of processors that are applied in order when
320 * the buffer is processed. The sequence may be optimized for performance while
321 * maintaining the logical order of transformations.
322 */
323 std::unordered_map<std::shared_ptr<Buffer>, std::vector<std::shared_ptr<BufferProcessor>>> m_buffer_processors;
324
325 /**
326 * @brief Map of buffers to processors that are conditionally applied
327 */
328 std::unordered_map<std::shared_ptr<Buffer>, std::unordered_set<std::shared_ptr<BufferProcessor>>> m_conditional_processors;
329
330 /**
331 * @brief Map of buffers to processors pending removal
332 */
333 std::unordered_map<std::shared_ptr<Buffer>, std::unordered_set<std::shared_ptr<BufferProcessor>>> m_pending_removal;
334
335 /**
336 * @brief Map of buffers to their final processors
337 *
338 * Each buffer can have one final processor that is applied after the main
339 * processing sequence with guaranteed execution, regardless of optimization strategies.
340 */
341 std::unordered_map<std::shared_ptr<Buffer>, std::shared_ptr<BufferProcessor>> m_final_processors;
342
343 /**
344 * @brief Preferred processing token for this chain
345 *
346 * This token represents the preferred processing domain that influences how processors
347 * are executed, including backend selection and execution strategy. It can be used to
348 * optimize the entire pipeline based on the expected data type, processing requirements,
349 * and available hardware resources.
350 */
351 mutable ProcessingToken m_token_filter_mask { ProcessingToken::AUDIO_BACKEND };
352
353 /**
354 * @brief Token enforcement strategy for this chain
355 *
356 * This strategy determines how the processing token is enforced across the pipeline,
357 * including whether to filter processors based on their compatibility with the token.
358 * The default strategy is FILTERED, which applies the token only to compatible processors.
359 */
360 TokenEnforcementStrategy m_enforcement_strategy { TokenEnforcementStrategy::FILTERED };
361
362 static constexpr size_t MAX_PENDING_PROCESSORS = 32;
363
365 std::atomic<bool> active { false };
366 std::shared_ptr<BufferProcessor> processor;
367 std::shared_ptr<Buffer> buffer;
368 bool is_addition { true }; // true = add, false = remove
369 };
370
371 std::atomic<bool> m_is_processing;
372
373 PendingProcessorOp m_pending_ops[MAX_PENDING_PROCESSORS];
374
375 std::atomic<uint32_t> m_pending_count { 0 };
376
378};
379}
static MayaFlux::Nodes::ProcessingToken token
Definition Timers.cpp:8
void set_preferred_token(ProcessingToken token)
Sets the preferred processing token for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_final_processors
Map of buffers to their final processors.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_pending_removal
Map of buffers to processors pending removal.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > m_buffer_processors
Map of buffers to their processor sequences.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > get_chain() const
Gets the entire transformation pipeline structure.
std::shared_ptr< T > get_processor(const std::shared_ptr< Buffer > &buffer) const
Gets a processor of a specific type from the buffer's processing pipeline.
ProcessingToken get_preferred_token() const
Gets the preferred processing token for this chain.
TokenEnforcementStrategy get_enforcement_strategy() const
Gets the current token enforcement strategy for this chain.
void set_enforcement_strategy(TokenEnforcementStrategy strategy)
Sets the token enforcement strategy for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_conditional_processors
Map of buffers to processors that are conditionally applied.
Advanced pipeline manager for multi-stage buffer transformations with backend optimization.
Central computational transformation interface for continuous buffer processing.
A buffer processor that uses a FileToStreamChain to process audio data.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
TokenEnforcementStrategy
Defines how strictly processing token requirements are enforced in buffer processing chains.
void add_processor(const std::shared_ptr< Buffers::BufferProcessor > &processor, const std::shared_ptr< Buffers::Buffer > &buffer, Buffers::ProcessingToken token)
Adds a processor to a specific buffer.
Definition Graph.cpp:86