MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
ComputePipeline.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "ComputeGrammar.hpp"
4#include "ComputeMatrix.hpp"
5
6namespace MayaFlux::Yantra {
7
8/**
9 * @class ComputationPipeline
10 * @brief Pipeline that uses grammar rules for operation composition
11 * @tparam InputType Input data type (defaults to std::vector<Kakshya::DataVariant>)
12 * @tparam OutputType Output data type (defaults to InputType)
13 *
14 * The ComputationPipeline provides a flexible, grammar-aware system for chaining
15 * computational operations in sequence. Unlike traditional pipelines that execute
16 * operations in a fixed order, this pipeline can dynamically select and apply
17 * operations based on grammar rules that match input data characteristics and
18 * execution context.
19 *
20 * ## Key Features
21 *
22 * **Grammar Integration**: Uses ComputationGrammar to intelligently select and
23 * configure operations based on input data properties and context.
24 *
25 * **Type Safety**: Template-based design ensures type compatibility between
26 * pipeline stages while supporting different input/output types.
27 *
28 * **Dynamic Configuration**: Operations can be added, configured, and removed
29 * at runtime, enabling adaptive processing workflows.
30 *
31 * **Error Handling**: Comprehensive error handling with operation-specific
32 * error reporting and graceful degradation.
33 *
34 * ## Usage Patterns
35 *
36 * ### Basic Pipeline Construction
37 * ```cpp
38 * auto pipeline = std::make_shared<ComputationPipeline<std::vector<Kakshya::DataVariant>>>();
39 *
40 * // Add operations with names for later reference
41 * pipeline->create_operation<MathematicalTransformer<>>("gain_stage")
42 * ->create_operation<SpectralTransformer<>>("frequency_processing")
43 * ->create_operation<TemporalTransformer<>>("time_effects");
44 * ```
45 *
46 * ### Grammar-Driven Processing
47 * ```cpp
48 * // Pipeline automatically applies grammar rules before operation chain
49 * ExecutionContext context;
50 * context.execution_metadata["processing_mode"] = std::string("high_quality");
51 *
52 * auto result = pipeline->process(input_data, context);
53 * // Grammar rules are evaluated first, then operation chain executes
54 * ```
55 *
56 * ### Dynamic Operation Configuration
57 * ```cpp
58 * // Configure specific operations by name
59 * pipeline->configure_operation<MathematicalTransformer<>>("gain_stage",
60 * [](auto op) {
61 * op->set_parameter("operation", "gain");
62 * op->set_parameter("gain_factor", 2.0);
63 * });
64 * ```
65 */
66template <ComputeData InputType = std::vector<Kakshya::DataVariant>, ComputeData OutputType = InputType>
67class MAYAFLUX_API ComputationPipeline {
68public:
71
72 /**
73 * @brief Constructor with optional grammar
74 * @param grammar Shared pointer to ComputationGrammar instance (creates new if nullptr)
75 *
76 * Creates a pipeline with the specified grammar instance. If no grammar is provided,
77 * creates a new empty grammar that can be populated with rules later.
78 */
79 explicit ComputationPipeline(std::shared_ptr<ComputationGrammar> grammar = nullptr)
80 : m_grammar(grammar ? std::move(grammar) : std::make_shared<ComputationGrammar>())
81 {
82 }
83
84 /**
85 * @brief Add a concrete operation instance to the pipeline
86 * @tparam ConcreteOpType The concrete operation type (must derive from ComputeOperation)
87 * @param operation Shared pointer to the operation instance
88 * @param name Optional name for the operation (used for later reference)
89 * @return Reference to this pipeline for method chaining
90 *
91 * Adds an existing operation instance to the pipeline. The operation will be executed
92 * in the order it was added. Names are optional but recommended for later configuration
93 * and debugging.
94 *
95 * @note The operation type must be compatible with the pipeline's input/output types
96 */
97 template <typename ConcreteOpType>
98 ComputationPipeline& add_operation(std::shared_ptr<ConcreteOpType> operation, const std::string& name = "")
99 {
100 static_assert(std::is_base_of_v<ComputeOperation<InputType, OutputType>, ConcreteOpType>,
101 "Operation must derive from ComputeOperation");
102
103 m_operations.emplace_back(std::static_pointer_cast<ComputeOperation<InputType, OutputType>>(operation), name);
104 return *this;
105 }
106
107 /**
108 * @brief Create and add operation by type
109 * @tparam ConcreteOpType The concrete operation type to create
110 * @tparam Args Constructor argument types
111 * @param name Optional name for the operation
112 * @param args Constructor arguments for the operation
113 * @return Reference to this pipeline for method chaining
114 *
115 * Creates a new instance of the specified operation type and adds it to the pipeline.
116 * This is the most convenient way to add operations when you don't need to configure
117 * them before adding to the pipeline.
118 *
119 * Example:
120 * ```cpp
121 * pipeline->create_operation<MathematicalTransformer<>>("gain")
122 * ->create_operation<SpectralTransformer<>>("pitch_shift",
123 * SpectralOperation::PITCH_SHIFT);
124 * ```
125 */
126 template <typename ConcreteOpType, typename... Args>
127 ComputationPipeline& create_operation(const std::string& name = "", Args&&... args)
128 {
129 auto operation = std::make_shared<ConcreteOpType>(std::forward<Args>(args)...);
130 return add_operation(operation, name);
131 }
132
133 /**
134 * @brief Execute the pipeline with grammar rule application
135 * @param input Input data to process through the pipeline
136 * @param context Execution context containing parameters and metadata
137 * @return Processed output data
138 *
139 * Executes the complete pipeline processing workflow:
140 *
141 * 1. **Grammar Rule Application**: Searches for grammar rules that match the input
142 * data and execution context. If a matching rule is found, applies it first.
143 *
144 * 2. **Operation Chain Execution**: Executes all operations in the pipeline in
145 * the order they were added, passing output from each stage as input to the next.
146 *
147 * 3. **Type Conversion**: Handles type conversion between InputType and OutputType
148 * when they differ.
149 *
150 * The pipeline provides comprehensive error handling with operation-specific error
151 * messages that include the operation name for debugging.
152 *
153 * @throws std::runtime_error If any operation in the pipeline fails
154 */
155 output_type process(const input_type& input, const ExecutionContext& context = {})
156 {
157 input_type current_data = input;
158 ExecutionContext current_context = context;
159
160 if (auto best_rule = m_grammar->find_best_match(current_data, current_context)) {
161 if (auto rule_result = m_grammar->execute_rule(best_rule->name, current_data, current_context)) {
162 try {
163 current_data = std::any_cast<input_type>(*rule_result);
164 } catch (const std::bad_any_cast&) {
165 // Continue with original data if conversion fails
166 }
167 }
168 }
169
170 for (const auto& [operation, name] : m_operations) {
171 try {
172 auto result = operation->apply_operation(current_data);
173 current_data = result;
174 } catch (const std::exception& e) {
175 throw std::runtime_error("Pipeline operation failed: " + name + " - " + e.what());
176 }
177 }
178
179 if constexpr (std::is_same_v<InputType, OutputType>) {
180 return current_data;
181 } else {
182 output_type result;
183 return result;
184 }
185 }
186
187 /**
188 * @brief Get the grammar instance
189 * @return Shared pointer to the current ComputationGrammar
190 *
191 * Provides access to the pipeline's grammar instance for adding rules,
192 * querying existing rules, or integrating with other grammar-aware components.
193 */
194 [[nodiscard]] std::shared_ptr<ComputationGrammar> get_grammar() const
195 {
196 return m_grammar;
197 }
198
199 /**
200 * @brief Set grammar instance
201 * @param grammar New grammar instance to use
202 *
203 * Replaces the current grammar with a new instance. Useful for switching
204 * between different rule sets or sharing grammars between multiple pipelines.
205 */
206 void set_grammar(std::shared_ptr<ComputationGrammar> grammar)
207 {
208 m_grammar = std::move(grammar);
209 }
210
211 /**
212 * @brief Get operation by name
213 * @tparam ConcreteOpType The expected concrete operation type
214 * @param name Name of the operation to retrieve
215 * @return Shared pointer to the operation, or nullptr if not found or wrong type
216 *
217 * Retrieves a named operation from the pipeline with automatic type casting.
218 * Returns nullptr if no operation with the given name exists or if the
219 * operation is not of the expected type.
220 *
221 * Example:
222 * ```cpp
223 * auto gain_op = pipeline->get_operation<MathematicalTransformer<>>("gain_stage");
224 * if (gain_op) {
225 * gain_op->set_parameter("gain_factor", 1.5);
226 * }
227 * ```
228 */
229 template <typename ConcreteOpType>
230 std::shared_ptr<ConcreteOpType> get_operation(const std::string& name) const
231 {
232 for (const auto& [operation, op_name] : m_operations) {
233 if (op_name == name) {
234 return std::dynamic_pointer_cast<ConcreteOpType>(operation);
235 }
236 }
237 return nullptr;
238 }
239
240 /**
241 * @brief Configure operation by name
242 * @tparam ConcreteOpType The expected concrete operation type
243 * @param name Name of the operation to configure
244 * @param configurator Function that configures the operation
245 * @return True if operation was found and configured, false otherwise
246 *
247 * Provides a safe way to configure named operations in the pipeline. The
248 * configurator function is only called if an operation with the given name
249 * exists and is of the expected type.
250 *
251 * Example:
252 * ```cpp
253 * bool configured = pipeline->configure_operation<SpectralTransformer<>>("pitch_shift",
254 * [](auto op) {
255 * op->set_parameter("pitch_ratio", 1.5);
256 * op->set_parameter("window_size", 2048);
257 * });
258 * ```
259 */
260 template <typename ConcreteOpType>
261 bool configure_operation(const std::string& name,
262 const std::function<void(std::shared_ptr<ConcreteOpType>)>& configurator)
263 {
264 for (auto& [operation, op_name] : m_operations) {
265 if (op_name == name) {
266 if (auto concrete_op = std::dynamic_pointer_cast<ConcreteOpType>(operation)) {
267 configurator(concrete_op);
268 return true;
269 }
270 }
271 }
272 return false;
273 }
274
275 /**
276 * @brief Get number of operations in pipeline
277 * @return Number of operations currently in the pipeline
278 */
279 [[nodiscard]] size_t operation_count() const
280 {
281 return m_operations.size();
282 }
283
284 /**
285 * @brief Clear all operations
286 *
287 * Removes all operations from the pipeline, leaving it empty.
288 * The grammar instance is preserved.
289 */
291 {
292 m_operations.clear();
293 }
294
295 /**
296 * @brief Get all operation names in the pipeline
297 * @return Vector of operation names in execution order
298 *
299 * Returns the names of all operations in the pipeline in the order they
300 * will be executed. Unnamed operations appear as empty strings.
301 */
302 [[nodiscard]] std::vector<std::string> get_operation_names() const
303 {
304 std::vector<std::string> names;
305 names.reserve(m_operations.size());
306 std::ranges::transform(m_operations, std::back_inserter(names),
307 [](const auto& op_pair) { return op_pair.second; });
308 return names;
309 }
310
311 /**
312 * @brief Remove operation by name
313 * @param name Name of the operation to remove
314 * @return True if operation was found and removed, false otherwise
315 *
316 * Removes the first operation with the given name from the pipeline.
317 * If multiple operations have the same name, only the first one is removed.
318 */
319 bool remove_operation(const std::string& name)
320 {
321 auto it = std::ranges::find_if(m_operations,
322 [&name](const auto& op_pair) { return op_pair.second == name; });
323
324 if (it != m_operations.end()) {
325 m_operations.erase(it);
326 return true;
327 }
328 return false;
329 }
330
331private:
332 std::shared_ptr<ComputationGrammar> m_grammar; ///< Grammar instance for rule-based operation selection
333 std::vector<std::pair<std::shared_ptr<ComputeOperation<InputType, OutputType>>, std::string>> m_operations; ///< Operations and their names in execution order
334};
335
336/**
337 * @brief Factory functions for common pipeline configurations
338 *
339 * The PipelineFactory namespace provides convenience functions for creating
340 * pre-configured pipelines for common use cases. These factories set up
341 * typical operation chains and grammar rules for specific domains.
342 */
343namespace PipelineFactory {
344
345 /**
346 * @brief Create an audio processing pipeline
347 * @tparam DataType The data type for the pipeline (defaults to std::vector<Kakshya::DataVariant>)
348 * @return Shared pointer to a configured audio processing pipeline
349 *
350 * Creates a pipeline pre-configured for audio processing workflows with
351 * typical operations for gain control, temporal effects, and spectral processing.
352 * The returned pipeline is ready to use but can be further customized.
353 *
354 * Example:
355 * ```cpp
356 * auto audio_pipeline = PipelineFactory::create_audio_pipeline<std::vector<Kakshya::DataVariant>>();
357 * auto result = audio_pipeline->process(audio_data, context);
358 * ```
359 */
360 template <ComputeData DataType = std::vector<Kakshya::DataVariant>>
361 std::shared_ptr<ComputationPipeline<DataType>> create_audio_pipeline()
362 {
363 auto pipeline = std::make_shared<ComputationPipeline<DataType>>();
364
365 // Add common audio operations
366 // pipeline->create_operation<MathematicalTransformer<DataType>>("gain");
367 // pipeline->create_operation<TemporalTransformer<DataType>>("time_effects");
368 // pipeline->create_operation<SpectralTransformer<DataType>>("frequency_effects");
369
370 return pipeline;
371 }
372
373 /**
374 * @brief Create an analysis pipeline
375 * @tparam DataType The data type for the pipeline (defaults to std::vector<Kakshya::DataVariant>)
376 * @return Shared pointer to a configured analysis pipeline
377 *
378 * Creates a pipeline pre-configured for data analysis workflows with
379 * operations for feature extraction, statistical analysis, and result processing.
380 * Suitable for machine learning preprocessing and data science workflows.
381 *
382 * Example:
383 * ```cpp
384 * auto analysis_pipeline = PipelineFactory::create_analysis_pipeline<>();
385 * auto features = analysis_pipeline->process(raw_data, analysis_context);
386 * ```
387 */
388 template <ComputeData DataType = std::vector<Kakshya::DataVariant>>
389 std::shared_ptr<ComputationPipeline<DataType>> create_analysis_pipeline()
390 {
391 auto pipeline = std::make_shared<ComputationPipeline<DataType>>();
392
393 // Add analysis operations (examples)
394 // pipeline->create_operation<FeatureExtractor<DataType>>("feature_extract");
395 // pipeline->create_operation<StandardSorter<DataType>>("sort_results");
396
397 return pipeline;
398 }
399
400} // namespace PipelineFactory
401
402/**
403 * @class GrammarAwareComputeMatrix
404 * @brief ComputeMatrix extension that integrates grammar-based operation selection
405 *
406 * The GrammarAwareComputeMatrix extends the base ComputeMatrix functionality with
407 * grammar-based rule processing. This allows for intelligent operation selection
408 * and preprocessing based on input data characteristics and execution context.
409 *
410 * Unlike pipelines that execute operations in sequence, the grammar-aware matrix
411 * can dynamically select which operations to apply based on the current data
412 * and context, making it suitable for adaptive and conditional processing workflows.
413 *
414 * ## Usage Patterns
415 *
416 * ### Basic Grammar Integration
417 * ```cpp
418 * auto grammar = std::make_shared<ComputationGrammar>();
419 * auto matrix = std::make_unique<GrammarAwareComputeMatrix>(grammar);
420 *
421 * // Grammar rules are applied before any matrix operations
422 * auto result = matrix->execute_with_grammar(input_data, context);
423 * ```
424 *
425 * ### Dynamic Operation Selection
426 * ```cpp
427 * // Grammar rules can dynamically select operations based on data properties
428 * ExecutionContext context;
429 * context.execution_metadata["processing_quality"] = std::string("high");
430 * context.execution_metadata["data_size"] = input_data.size();
431 *
432 * auto processed_data = matrix->execute_with_grammar(input_data, context);
433 * // Appropriate operations selected based on quality requirements and data size
434 * ```
435 */
436class MAYAFLUX_API GrammarAwareComputeMatrix : public ComputeMatrix {
437private:
438 std::shared_ptr<ComputationGrammar> m_grammar; ///< Grammar instance for rule-based operation selection
439
440public:
441 /**
442 * @brief Constructor with optional grammar
443 * @param grammar Shared pointer to ComputationGrammar instance (creates new if nullptr)
444 *
445 * Creates a grammar-aware compute matrix with the specified grammar instance.
446 * If no grammar is provided, creates a new empty grammar that can be populated
447 * with rules later.
448 */
449 explicit GrammarAwareComputeMatrix(std::shared_ptr<ComputationGrammar> grammar = nullptr)
450 : m_grammar(grammar ? std::move(grammar) : std::make_shared<ComputationGrammar>())
451 {
452 }
453
454 /**
455 * @brief Execute operations with grammar rule pre-processing
456 * @tparam InputType The input data type
457 * @param input Input data to process
458 * @param context Execution context containing parameters and metadata
459 * @return Processed input data after grammar rule application
460 *
461 * Applies grammar rules to the input data before any matrix operations.
462 * This allows for intelligent preprocessing, operation selection, and
463 * parameter configuration based on the input characteristics and context.
464 *
465 * The process:
466 * 1. Wraps input data in IO structure
467 * 2. Searches for matching grammar rules
468 * 3. Applies the best matching rule if found
469 * 4. Returns processed data or original data if no rules match
470 *
471 * @note This method focuses on grammar rule application. Use the base
472 * ComputeMatrix methods for actual operation execution.
473 */
474 template <ComputeData InputType>
475 auto execute_with_grammar(const InputType& input, const ExecutionContext& context = {})
476 {
477 IO<InputType> input_data { input };
478
479 if (auto best_rule = m_grammar->find_best_match(input_data, context)) {
480 if (auto rule_result = m_grammar->execute_rule(best_rule->name, input_data, context)) {
481 try {
482 input_data = std::any_cast<IO<InputType>>(*rule_result);
483 } catch (const std::bad_any_cast&) {
484 // Continue with original data if conversion fails
485 }
486 }
487 }
488
489 return input_data;
490 }
491
492 /**
493 * @brief Get the grammar instance
494 * @return Shared pointer to the current ComputationGrammar
495 *
496 * Provides access to the grammar instance for adding rules, querying existing
497 * rules, or integrating with other grammar-aware components.
498 */
499 std::shared_ptr<ComputationGrammar> get_grammar() const
500 {
501 return m_grammar;
502 }
503
504 /**
505 * @brief Set grammar instance
506 * @param grammar New grammar instance to use
507 *
508 * Replaces the current grammar with a new instance. Useful for switching
509 * between different rule sets or sharing grammars between multiple components.
510 */
511 void set_grammar(std::shared_ptr<ComputationGrammar> grammar)
512 {
513 m_grammar = std::move(grammar);
514 }
515
516 /**
517 * @brief Add a grammar rule directly to the matrix
518 * @param rule Rule to add to the grammar
519 *
520 * Convenience method to add rules directly to the matrix's grammar without
521 * needing to access the grammar instance separately. Useful for quick
522 * rule addition during matrix configuration.
523 */
525 {
526 m_grammar->add_rule(std::move(rule));
527 }
528
529 /**
530 * @brief Create a rule builder for this matrix's grammar
531 * @param name Unique name for the rule
532 * @return RuleBuilder instance for method chaining
533 *
534 * Provides direct access to the grammar's rule building interface,
535 * allowing for fluent rule creation without explicit grammar access.
536 *
537 * Example:
538 * ```cpp
539 * matrix->create_grammar_rule("auto_gain")
540 * .with_context(ComputationContext::TEMPORAL)
541 * .matches_type<std::vector<double>>()
542 * .executes([](const auto& input, const auto& ctx) { return input; })
543 * .build();
544 * ```
545 */
547 {
548 return m_grammar->create_rule(name);
549 }
550};
551
552} // namespace MayaFlux::Yantra
Fluent interface for building rules with method chaining.
Core grammar system for rule-based computation in Maya Flux.
std::shared_ptr< ConcreteOpType > get_operation(const std::string &name) const
Get operation by name.
ComputationPipeline & create_operation(const std::string &name="", Args &&... args)
Create and add operation by type.
bool remove_operation(const std::string &name)
Remove operation by name.
std::shared_ptr< ComputationGrammar > get_grammar() const
Get the grammar instance.
void clear_operations()
Clear all operations.
size_t operation_count() const
Get number of operations in pipeline.
void set_grammar(std::shared_ptr< ComputationGrammar > grammar)
Set grammar instance.
output_type process(const input_type &input, const ExecutionContext &context={})
Execute the pipeline with grammar rule application.
std::vector< std::pair< std::shared_ptr< ComputeOperation< InputType, OutputType > >, std::string > > m_operations
Operations and their names in execution order.
std::shared_ptr< ComputationGrammar > m_grammar
Grammar instance for rule-based operation selection.
ComputationPipeline(std::shared_ptr< ComputationGrammar > grammar=nullptr)
Constructor with optional grammar.
std::vector< std::string > get_operation_names() const
Get all operation names in the pipeline.
bool configure_operation(const std::string &name, const std::function< void(std::shared_ptr< ConcreteOpType >)> &configurator)
Configure operation by name.
ComputationPipeline & add_operation(std::shared_ptr< ConcreteOpType > operation, const std::string &name="")
Add a concrete operation instance to the pipeline.
Pipeline that uses grammar rules for operation composition.
Local execution orchestrator for computational operations.
Base interface for all computational operations in the processing pipeline.
std::shared_ptr< ComputationGrammar > m_grammar
Grammar instance for rule-based operation selection.
auto execute_with_grammar(const InputType &input, const ExecutionContext &context={})
Execute operations with grammar rule pre-processing.
void add_grammar_rule(ComputationGrammar::Rule rule)
Add a grammar rule directly to the matrix.
std::shared_ptr< ComputationGrammar > get_grammar() const
Get the grammar instance.
GrammarAwareComputeMatrix(std::shared_ptr< ComputationGrammar > grammar=nullptr)
Constructor with optional grammar.
void set_grammar(std::shared_ptr< ComputationGrammar > grammar)
Set grammar instance.
ComputationGrammar::RuleBuilder create_grammar_rule(const std::string &name)
Create a rule builder for this matrix's grammar.
ComputeMatrix extension that integrates grammar-based operation selection.
std::shared_ptr< ComputationPipeline< DataType > > create_audio_pipeline()
Create an audio processing pipeline.
std::shared_ptr< ComputationPipeline< DataType > > create_analysis_pipeline()
Create an analysis pipeline.
Represents a computation rule with matching and execution logic.
Context information for operation execution.
Input/Output container for computation pipeline data flow with structure preservation.
Definition DataIO.hpp:24