MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
ComputeOperation.hpp
Go to the documentation of this file.
1#pragma once
2
5
6#include "Data/DataIO.hpp"
7
8#include <future>
9
10namespace MayaFlux::Yantra {
11
12class ComputeMatrix;
13
14/**
15 * @class ComputeOperation
16 * @brief Base interface for all computational operations in the processing pipeline
17 *
18 * Defines the core contract for operations that transform data from one type to another.
19 * Operations can be parameterized, validated, and composed into complex processing networks.
20 *
21 * @tparam InputType The data type accepted by this operation
22 * @tparam OutputType The data type produced by this operation, defaults to InputType
23 */
24template <ComputeData InputType = std::vector<Kakshya::DataVariant>, ComputeData OutputType = InputType>
25class MAYAFLUX_API ComputeOperation {
26public:
29
30 /**
31 * @brief Constructor with data type validation warnings
32 */
34 {
35 validate_operation_data_types();
36 }
37
38 /**
39 * @brief Virtual destructor for proper cleanup of derived classes
40 */
41 virtual ~ComputeOperation() = default;
42
43 /**
44 * @brief Public synchronous execution interface
45 */
47 {
48 return apply_operation_internal(input, m_last_execution_context);
49 }
50
51 /**
52 * @brief Applies the operation with dependencies resolved
53 * @param input Input data to process
54 * @return Processed output data
55 *
56 * This method ensures that all dependencies are executed before applying the operation.
57 * It is intended for use in scenarios where the operation is part of a larger processing graph.
58 *
59 * @note: This method is for usage outside of ComputeMatrix, and will not work recursively.
60 For ComputeMatrix, simply use a chain of operations.
61 */
63 {
64 m_last_execution_context.mode = ExecutionMode::DEPENDENCY;
65
66 for (auto& dep : m_dependencies) {
67 if (dep->validate_input(input)) {
68 dep->apply_operation_internal(input, m_last_execution_context);
69 }
70 }
71
72 m_last_execution_context.mode = ExecutionMode::SYNC;
73 return apply_operation_internal(input, m_last_execution_context);
74 }
75
76 /**
77 * @brief Convenience overload for direct data processing (backward compatibility)
78 * @param data Raw data to be processed
79 * @return Transformed output as IO context
80 */
81 output_type operator()(const InputType& data)
82 {
83 return apply_operation(input_type { data });
84 }
85
86 /**
87 * @brief Convenience overload that extracts just the data from result
88 * @param data Raw data to be processed
89 * @return Just the transformed data (no metadata/recursion)
90 */
91 OutputType apply_to_data(const InputType& data)
92 {
93 return apply_operation(input_type { data }).data;
94 }
95
96 /**
97 * @brief Sets a named parameter that configures the operation's behavior
98 * @param name Parameter identifier
99 * @param value Parameter value stored as std::any
100 */
101 virtual void set_parameter(const std::string& name, std::any value) = 0;
102
103 /**
104 * @brief Retrieves a parameter's current value
105 * @param name Parameter identifier
106 * @return Current parameter value as std::any
107 */
108 [[nodiscard]] virtual std::any get_parameter(const std::string& name) const = 0;
109
110 /**
111 * @brief Retrieves all parameters and their values
112 * @return Map of parameter names to their values
113 */
114 [[nodiscard]] virtual std::map<std::string, std::any> get_all_parameters() const { return {}; }
115
116 /**
117 * @brief Validates if the input data meets the operation's requirements
118 * @param input Data to validate
119 * @return True if input is valid, false otherwise
120 */
121 virtual bool validate_input(const input_type&) const { return true; }
122
123 /**
124 * @brief Get operation name for debugging/introspection
125 */
126 [[nodiscard]] virtual std::string get_name() const { return "ComputeOperation"; }
127
128 /**
129 * @brief OpUnit interface - operations can act as units in dependency graphs
130 */
132 {
133 m_last_execution_context.mode = ExecutionMode::DEPENDENCY;
134 return apply_operation_internal(input, m_last_execution_context);
135 }
136
137 void add_dependency(std::shared_ptr<ComputeOperation> dep)
138 {
139 m_dependencies.push_back(std::move(dep));
140 }
141
142 const auto& get_dependencies() const { return m_dependencies; }
143
144 virtual void set_container_for_regions(const std::shared_ptr<Kakshya::SignalSourceContainer>& container)
145 {
146 m_container = container;
147 }
148
149 [[nodiscard]] virtual const std::shared_ptr<Kakshya::SignalSourceContainer>& get_container_for_regions() const
150 {
151 return m_container;
152 }
153
155 {
156 m_last_execution_context = ctx;
157 }
158
159 [[nodiscard]] const ExecutionContext& get_last_execution_context() const
160 {
161 return m_last_execution_context;
162 }
163
165 {
166 m_last_execution_context.pre_execution_hook = hook;
167 }
168
170 {
171 m_last_execution_context.post_execution_hook = hook;
172 }
173
175 {
176 m_last_execution_context.reconstruction_callback = callback;
177 }
178
179protected:
180 /**
181 * @brief Executes the computational transformation on the input data
182 * @param input Data to be processed
183 * @return Transformed output data
184 */
185 virtual output_type operation_function(const input_type& input) = 0;
186
187 /**
188 * @brief Internal execution method - ComputeMatrix can access this
189 * @param input Input data wrapped in IO
190 * @param context Execution context with mode, threading, etc.
191 * @return Processed output
192 */
194 {
195 switch (context.mode) {
196 case ExecutionMode::ASYNC:
197 // Return the result of the future (this might need different handling)
198 return apply_operation_async(input).get();
199
200 case ExecutionMode::PARALLEL:
201 return apply_operation_parallel(input, context);
202
203 case ExecutionMode::CHAINED:
204 return apply_operation_chained(input, context);
205
206 case ExecutionMode::DEPENDENCY:
207 case ExecutionMode::SYNC:
208 default:
209 return apply_hooks(input, context);
210 }
211 }
212
213 /**
214 * @brief Optional async implementation - default delegates to operation_function
215 */
216 virtual std::future<output_type> apply_operation_async(const input_type& input)
217 {
218 return std::async(std::launch::async, [this, input]() {
219 return apply_hooks(input, m_last_execution_context);
220 });
221 }
222
223 /**
224 * @brief Optional parallel-aware implementation - default delegates to operation_function
225 */
227 {
228 return apply_hooks(input, ctx);
229 }
230
231 /**
232 * @brief Optional chain-aware implementation - default delegates to operation_function
233 */
235 {
236 return apply_hooks(input, ctx);
237 }
238
239 /**
240 * @brief Convert processed double data back to OutputType using metadata and optional callback
241 */
242 output_type convert_result(std::vector<std::vector<double>>& result_data, DataStructureInfo& metadata)
243 {
244 std::any any_data = metadata;
245 if (m_last_execution_context.reconstruction_callback) {
246 auto reconstructed = m_last_execution_context.reconstruction_callback(result_data, any_data);
247 try {
248 return std::any_cast<output_type>(reconstructed);
249 } catch (const std::bad_any_cast&) {
250 std::cerr << "Reconstruction callback did not return the correct output type\n";
251 return OperationHelper::reconstruct_from_double<output_type>(result_data, metadata);
252 }
253 }
254 return OperationHelper::reconstruct_from_double<output_type>(result_data, metadata);
255 }
256
257 std::shared_ptr<Kakshya::SignalSourceContainer> m_container;
258
260
261private:
262 std::vector<std::shared_ptr<ComputeOperation>> m_dependencies;
263
264 /**
265 * @brief Validate input/output types and warn about marker types
266 */
268 {
269 if constexpr (std::is_same_v<InputType, Kakshya::Region>) {
270 std::cerr << "OPERATION WARNING: InputType 'Region' is an expressive marker, not a data holder.\n"
271 << "Operations will process coordinate data rather than signal data.\n"
272 << "Consider using DataVariant or SignalSourceContainer for signal processing.\n";
273 } else if constexpr (std::is_same_v<InputType, Kakshya::RegionGroup>) {
274 std::cerr << "OPERATION WARNING: InputType 'RegionGroup' is an expressive marker, not a data holder.\n"
275 << "Operations will process coordinate data rather than signal data.\n"
276 << "Consider using DataVariant or SignalSourceContainer for signal processing.\n";
277 } else if constexpr (std::is_same_v<InputType, std::vector<Kakshya::RegionSegment>>) {
278 std::cerr << "OPERATION WARNING: InputType 'RegionSegments' are expressive markers, not primary data holders.\n"
279 << "Operations will attempt to extract data from segment metadata.\n"
280 << "Consider using DataVariant or SignalSourceContainer for direct signal processing.\n";
281 }
282
283 if constexpr (std::is_same_v<OutputType, Kakshya::Region>) {
284 std::cerr << "OPERATION INFO: OutputType 'Region' will create spatial/temporal markers with results as metadata.\n";
285 } else if constexpr (std::is_same_v<OutputType, Kakshya::RegionGroup>) {
286 std::cerr << "OPERATION INFO: OutputType 'RegionGroup' will organize results into spatial/temporal groups.\n";
287 } else if constexpr (std::is_same_v<OutputType, std::vector<Kakshya::RegionSegment>>) {
288 std::cerr << "OPERATION INFO: OutputType 'RegionSegments' will create segments with results in metadata.\n";
289 }
290 }
291
293 {
294 if (context.pre_execution_hook) {
295 std::any input_any = const_cast<input_type&>(input);
296 context.pre_execution_hook(input_any);
297 }
298
299 auto result = operation_function(input);
300
301 if (context.post_execution_hook) {
302 std::any result_any = &result;
303 context.post_execution_hook(result_any);
304 }
305 return result;
306 }
307
308 friend class ComputeMatrix;
309};
310
311// Type aliases for common operation patterns
317
318}
Local execution orchestrator for computational operations.
void set_last_execution_context(const ExecutionContext &ctx)
virtual void set_parameter(const std::string &name, std::any value)=0
Sets a named parameter that configures the operation's behavior.
const ExecutionContext & get_last_execution_context() const
virtual ~ComputeOperation()=default
Virtual destructor for proper cleanup of derived classes.
void set_reconstruction_callback(const ReconstructionCallback &callback)
virtual void set_container_for_regions(const std::shared_ptr< Kakshya::SignalSourceContainer > &container)
output_type apply_hooks(const input_type &input, const ExecutionContext &context)
virtual std::any get_parameter(const std::string &name) const =0
Retrieves a parameter's current value.
void set_pre_execution_hook(const OpererationHookCallback &hook)
output_type apply_operation(const input_type &input)
Public synchronous execution interface.
void validate_operation_data_types() const
Validate input/output types and warn about marker types.
OutputType apply_to_data(const InputType &data)
Convenience overload that extracts just the data from result.
virtual output_type apply_operation_internal(const input_type &input, const ExecutionContext &context)
Internal execution method - ComputeMatrix can access this.
output_type operator()(const InputType &data)
Convenience overload for direct data processing (backward compatibility)
virtual output_type operation_function(const input_type &input)=0
Executes the computational transformation on the input data.
virtual bool validate_input(const input_type &) const
Validates if the input data meets the operation's requirements.
output_type apply_operation_with_dependencies(const input_type &input)
Applies the operation with dependencies resolved.
virtual output_type apply_operation_chained(const input_type &input, const ExecutionContext &ctx)
Optional chain-aware implementation - default delegates to operation_function.
std::shared_ptr< Kakshya::SignalSourceContainer > m_container
virtual const std::shared_ptr< Kakshya::SignalSourceContainer > & get_container_for_regions() const
std::vector< std::shared_ptr< ComputeOperation > > m_dependencies
virtual std::string get_name() const
Get operation name for debugging/introspection.
void set_post_execution_hook(const OpererationHookCallback &hook)
output_type convert_result(std::vector< std::vector< double > > &result_data, DataStructureInfo &metadata)
Convert processed double data back to OutputType using metadata and optional callback.
ComputeOperation()
Constructor with data type validation warnings.
virtual std::future< output_type > apply_operation_async(const input_type &input)
Optional async implementation - default delegates to operation_function.
virtual std::map< std::string, std::any > get_all_parameters() const
Retrieves all parameters and their values.
void add_dependency(std::shared_ptr< ComputeOperation > dep)
output_type execute(const input_type &input)
OpUnit interface - operations can act as units in dependency graphs.
virtual output_type apply_operation_parallel(const input_type &input, const ExecutionContext &ctx)
Optional parallel-aware implementation - default delegates to operation_function.
Base interface for all computational operations in the processing pipeline.
@ ComputeMatrix
Compute operations (Yantra - algorithms, matrices, DSP)
std::function< void(std::any &)> OpererationHookCallback
Callback type for pre/post operation hooks.
std::function< std::any(std::vector< std::vector< double > > &, std::any &)> ReconstructionCallback
Callback type for custom reconstruction logic.
Metadata about data structure for reconstruction.
OpererationHookCallback post_execution_hook
OpererationHookCallback pre_execution_hook
Context information for operation execution.
Input/Output container for computation pipeline data flow with structure preservation.
Definition DataIO.hpp:24