MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
ComputeOperation.hpp
Go to the documentation of this file.
1#pragma once
2
4
5#include <future>
6
7namespace MayaFlux::Yantra {
8
9class ComputeMatrix;
10
11/**
12 * @class ComputeOperation
13 * @brief Base interface for all computational operations in the processing pipeline
14 *
15 * Defines the core contract for operations that transform data from one type to another.
16 * Operations can be parameterized, validated, and composed into complex processing networks.
17 *
18 * @tparam InputType The data type accepted by this operation
19 * @tparam OutputType The data type produced by this operation, defaults to InputType
20 */
21template <ComputeData InputType = std::vector<Kakshya::DataVariant>, ComputeData OutputType = InputType>
22class MAYAFLUX_API ComputeOperation {
23public:
26
27 /**
28 * @brief Constructor with data type validation warnings
29 */
31 {
32 validate_operation_data_types();
33 }
34
35 /**
36 * @brief Virtual destructor for proper cleanup of derived classes
37 */
38 virtual ~ComputeOperation() = default;
39
40 /**
41 * @brief Public synchronous execution interface
42 */
44 {
45 return apply_operation_internal(input, m_last_execution_context);
46 }
47
48 /**
49 * @brief Applies the operation with dependencies resolved
50 * @param input Input data to process
51 * @return Processed output data
52 *
53 * This method ensures that all dependencies are executed before applying the operation.
54 * It is intended for use in scenarios where the operation is part of a larger processing graph.
55 *
56 * @note: This method is for usage outside of ComputeMatrix, and will not work recursively.
57 For ComputeMatrix, simply use a chain of operations.
58 */
60 {
61 m_last_execution_context.mode = ExecutionMode::DEPENDENCY;
62
63 for (auto& dep : m_dependencies) {
64 if (dep->validate_input(input)) {
65 dep->apply_operation_internal(input, m_last_execution_context);
66 }
67 }
68
69 m_last_execution_context.mode = ExecutionMode::SYNC;
70 return apply_operation_internal(input, m_last_execution_context);
71 }
72
73 /**
74 * @brief Convenience overload for direct data processing (backward compatibility)
75 * @param data Raw data to be processed
76 * @return Transformed output as Datum context
77 */
78 output_type operator()(const InputType& data)
79 {
80 return apply_operation(input_type { data });
81 }
82
83 /**
84 * @brief Convenience overload that extracts just the data from result
85 * @param data Raw data to be processed
86 * @return Just the transformed data (no metadata/recursion)
87 */
88 OutputType apply_to_data(const InputType& data)
89 {
90 return apply_operation(input_type { data }).data;
91 }
92
93 /**
94 * @brief Sets a named parameter that configures the operation's behavior
95 * @param name Parameter identifier
96 * @param value Parameter value stored as std::any
97 */
98 virtual void set_parameter(const std::string& name, std::any value) = 0;
99
100 /**
101 * @brief Retrieves a parameter's current value
102 * @param name Parameter identifier
103 * @return Current parameter value as std::any
104 */
105 [[nodiscard]] virtual std::any get_parameter(const std::string& name) const = 0;
106
107 /**
108 * @brief Retrieves all parameters and their values
109 * @return Map of parameter names to their values
110 */
111 [[nodiscard]] virtual std::map<std::string, std::any> get_all_parameters() const { return {}; }
112
113 /**
114 * @brief Validates if the input data meets the operation's requirements
115 * @param input Data to validate
116 * @return True if input is valid, false otherwise
117 */
118 virtual bool validate_input(const input_type&) const { return true; }
119
120 /**
121 * @brief Get operation name for debugging/introspection
122 */
123 [[nodiscard]] virtual std::string get_name() const { return "ComputeOperation"; }
124
125 /**
126 * @brief Returns the category of this operation for grammar and registry discovery.
127 */
128 [[nodiscard]] virtual OperationType get_operation_type() const = 0;
129
130 /**
131 * @brief OpUnit interface - operations can act as units in dependency graphs
132 */
134 {
135 m_last_execution_context.mode = ExecutionMode::DEPENDENCY;
136 return apply_operation_internal(input, m_last_execution_context);
137 }
138
139 void add_dependency(std::shared_ptr<ComputeOperation> dep)
140 {
141 m_dependencies.push_back(std::move(dep));
142 }
143
144 const auto& get_dependencies() const { return m_dependencies; }
145
146 virtual void set_container_for_regions(const std::shared_ptr<Kakshya::SignalSourceContainer>& container)
147 {
148 m_container = container;
149 }
150
151 [[nodiscard]] virtual const std::shared_ptr<Kakshya::SignalSourceContainer>& get_container_for_regions() const
152 {
153 return m_container;
154 }
155
157 {
158 m_last_execution_context = ctx;
159 }
160
161 [[nodiscard]] const ExecutionContext& get_last_execution_context() const
162 {
163 return m_last_execution_context;
164 }
165
167 {
168 m_last_execution_context.pre_execution_hook = hook;
169 }
170
172 {
173 m_last_execution_context.post_execution_hook = hook;
174 }
175
177 {
178 m_last_execution_context.reconstruction_callback = callback;
179 }
180
181 /**
182 * @brief Attach a GPU execution backend.
183 *
184 * When set and ready, apply_operation_internal delegates to the backend
185 * instead of operation_function. The CPU implementation in operation_function
186 * remains the automatic fallback when no backend is attached or GPU
187 * initialisation has not yet succeeded.
188 *
189 * @param backend Configured GpuExecutionContext instance.
190 */
192 {
193 m_gpu_backend = std::move(backend);
194 }
195
196 [[nodiscard]] bool has_gpu_backend() const { return m_gpu_backend != nullptr; }
197
198protected:
199 /**
200 * @brief Executes the computational transformation on the input data
201 * @param input Data to be processed
202 * @return Transformed output data
203 */
204 virtual output_type operation_function(const input_type& input) = 0;
205
206 /**
207 * @brief Internal execution method - ComputeMatrix can access this
208 * @param input Input data wrapped in IO
209 * @param context Execution context with mode, threading, etc.
210 * @return Processed output
211 */
213 {
214 if (m_gpu_backend && m_gpu_backend->ensure_gpu_ready()) {
215 return m_gpu_backend->execute(input, context);
216 }
217
218 switch (context.mode) {
219 case ExecutionMode::ASYNC:
220 // Return the result of the future (this might need different handling)
221 return apply_operation_async(input).get();
222
223 case ExecutionMode::PARALLEL:
224 return apply_operation_parallel(input, context);
225
226 case ExecutionMode::CHAINED:
227 return apply_operation_chained(input, context);
228
229 case ExecutionMode::DEPENDENCY:
230 case ExecutionMode::SYNC:
231 default:
232 return apply_hooks(input, context);
233 }
234 }
235
236 /**
237 * @brief Optional async implementation - default delegates to operation_function
238 */
239 virtual std::future<output_type> apply_operation_async(const input_type& input)
240 {
241 return std::async(std::launch::async, [this, input]() {
242 return apply_hooks(input, m_last_execution_context);
243 });
244 }
245
246 /**
247 * @brief Optional parallel-aware implementation - default delegates to operation_function
248 */
250 {
251 return apply_hooks(input, ctx);
252 }
253
254 /**
255 * @brief Optional chain-aware implementation - default delegates to operation_function
256 */
258 {
259 return apply_hooks(input, ctx);
260 }
261
262 /**
263 * @brief Convert processed double data back to OutputType using metadata and optional callback
264 */
265 output_type convert_result(std::vector<std::vector<double>>& result_data, DataStructureInfo& metadata)
266 {
267 std::any any_data = metadata;
268
269 if (m_last_execution_context.reconstruction_callback) {
270 auto reconstructed = m_last_execution_context.reconstruction_callback(result_data, any_data);
271 auto result = safe_any_cast<output_type>(reconstructed);
272 if (result) {
273 return *result.value;
274 }
275
276 MF_WARN(
277 Journal::Component::Yantra,
278 Journal::Context::Runtime,
279 "Reconstruction callback type mismatch: {}",
280 result.error);
281 return OperationHelper::reconstruct_from_double<output_type>(result_data, metadata);
282 }
283
284 return OperationHelper::reconstruct_from_double<output_type>(result_data, metadata);
285 }
286
287 std::shared_ptr<Kakshya::SignalSourceContainer> m_container;
288
290
292 {
293 if (context.pre_execution_hook) {
294 std::any input_any = const_cast<input_type&>(input);
295 context.pre_execution_hook(input_any);
296 }
297
298 auto result = operation_function(input);
299
300 if (context.post_execution_hook) {
301 std::any result_any = &result;
302 context.post_execution_hook(result_any);
303 }
304 return result;
305 }
306
307private:
308 std::vector<std::shared_ptr<ComputeOperation>> m_dependencies;
309 std::shared_ptr<GpuExecutionContext<InputType, OutputType>> m_gpu_backend;
310
311 /**
312 * @brief Validate input/output types and warn about marker types
313 */
315 {
316 if constexpr (std::is_same_v<InputType, Kakshya::Region>) {
317 MF_INFO(
318 Journal::Component::Yantra,
319 Journal::Context::Runtime,
320 "InputType 'Region' is an expressive marker, not a data holder. Operations will process coordinate data rather than signal data. Consider using DataVariant or SignalSourceContainer for signal processing.");
321 } else if constexpr (std::is_same_v<InputType, Kakshya::RegionGroup>) {
322 MF_INFO(
323 Journal::Component::Yantra,
324 Journal::Context::Runtime,
325 "InputType 'RegionGroup' is an expressive marker, not a data holder. Operations will process coordinate data rather than signal data. Consider using DataVariant or SignalSourceContainer for signal processing.");
326 } else if constexpr (std::is_same_v<InputType, std::vector<Kakshya::RegionSegment>>) {
327 MF_INFO(
328 Journal::Component::Yantra,
329 Journal::Context::Runtime,
330 "InputType 'RegionSegments' are expressive markers, not primary data holders. Operations will attempt to extract data from segment metadata. Consider using DataVariant or SignalSourceContainer for direct signal processing.");
331 }
332
333 if constexpr (std::is_same_v<OutputType, Kakshya::Region>) {
334 MF_INFO(
335 Journal::Component::Yantra,
336 Journal::Context::Runtime,
337 "OutputType 'Region' is an expressive marker, not a data holder. Operations will create spatial/temporal markers with results as metadata.");
338 } else if constexpr (std::is_same_v<OutputType, Kakshya::RegionGroup>) {
339 MF_INFO(
340 Journal::Component::Yantra,
341 Journal::Context::Runtime,
342 "OutputType 'RegionGroup' is an expressive marker, not a data holder. Operations will organize results into spatial/temporal groups.");
343 } else if constexpr (std::is_same_v<OutputType, std::vector<Kakshya::RegionSegment>>) {
344 MF_INFO(
345 Journal::Component::Yantra,
346 Journal::Context::Runtime,
347 "OutputType 'RegionSegments' is an expressive marker, not a data holder. Operations will create segments with results in metadata. Consider using DataVariant or SignalSourceContainer for direct signal processing.");
348 }
349 }
350
351 friend class ComputeMatrix;
352};
353
354// Type aliases for common operation patterns
360
361}
#define MF_INFO(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
Local execution orchestrator for computational operations.
std::shared_ptr< GpuExecutionContext< InputType, OutputType > > m_gpu_backend
void set_last_execution_context(const ExecutionContext &ctx)
virtual void set_parameter(const std::string &name, std::any value)=0
Sets a named parameter that configures the operation's behavior.
const ExecutionContext & get_last_execution_context() const
virtual ~ComputeOperation()=default
Virtual destructor for proper cleanup of derived classes.
void set_reconstruction_callback(const ReconstructionCallback &callback)
virtual void set_container_for_regions(const std::shared_ptr< Kakshya::SignalSourceContainer > &container)
output_type apply_hooks(const input_type &input, const ExecutionContext &context)
void set_pre_execution_hook(const OperationHookCallback &hook)
virtual std::any get_parameter(const std::string &name) const =0
Retrieves a parameter's current value.
output_type apply_operation(const input_type &input)
Public synchronous execution interface.
void set_gpu_backend(std::shared_ptr< GpuExecutionContext< InputType, OutputType > > backend)
Attach a GPU execution backend.
void validate_operation_data_types() const
Validate input/output types and warn about marker types.
OutputType apply_to_data(const InputType &data)
Convenience overload that extracts just the data from result.
void set_post_execution_hook(const OperationHookCallback &hook)
virtual OperationType get_operation_type() const =0
Returns the category of this operation for grammar and registry discovery.
virtual output_type apply_operation_internal(const input_type &input, const ExecutionContext &context)
Internal execution method - ComputeMatrix can access this.
output_type operator()(const InputType &data)
Convenience overload for direct data processing (backward compatibility)
virtual output_type operation_function(const input_type &input)=0
Executes the computational transformation on the input data.
virtual bool validate_input(const input_type &) const
Validates if the input data meets the operation's requirements.
output_type apply_operation_with_dependencies(const input_type &input)
Applies the operation with dependencies resolved.
virtual output_type apply_operation_chained(const input_type &input, const ExecutionContext &ctx)
Optional chain-aware implementation - default delegates to operation_function.
std::shared_ptr< Kakshya::SignalSourceContainer > m_container
virtual const std::shared_ptr< Kakshya::SignalSourceContainer > & get_container_for_regions() const
std::vector< std::shared_ptr< ComputeOperation > > m_dependencies
virtual std::string get_name() const
Get operation name for debugging/introspection.
output_type convert_result(std::vector< std::vector< double > > &result_data, DataStructureInfo &metadata)
Convert processed double data back to OutputType using metadata and optional callback.
ComputeOperation()
Constructor with data type validation warnings.
virtual std::future< output_type > apply_operation_async(const input_type &input)
Optional async implementation - default delegates to operation_function.
virtual std::map< std::string, std::any > get_all_parameters() const
Retrieves all parameters and their values.
void add_dependency(std::shared_ptr< ComputeOperation > dep)
output_type execute(const input_type &input)
OpUnit interface - operations can act as units in dependency graphs.
virtual output_type apply_operation_parallel(const input_type &input, const ExecutionContext &ctx)
Optional parallel-aware implementation - default delegates to operation_function.
Base interface for all computational operations in the processing pipeline.
Type-parameterised shell over GpuDispatchCore.
@ ComputeMatrix
Compute operations (Yantra - algorithms, matrices, DSP)
std::function< void(std::any &)> OperationHookCallback
Callback type for pre/post operation hooks.
OperationType
Operation categories for organization and discovery.
std::function< std::any(std::vector< std::vector< double > > &, std::any &)> ReconstructionCallback
Callback type for custom reconstruction logic.
Metadata about data structure for reconstruction.
Input/Output container for computation pipeline data flow with structure preservation.
Definition DataIO.hpp:24
ExecutionMode mode
Execution mode controlling scheduling behavior.
OperationHookCallback pre_execution_hook
Optional callback invoked before operation execution.
OperationHookCallback post_execution_hook
Optional callback invoked after operation execution.
Context information controlling how a compute operation executes.