21template <ComputeData InputType = std::vector<Kakshya::DataVariant>, ComputeData OutputType = InputType>
32 validate_operation_data_types();
45 return apply_operation_internal(input, m_last_execution_context);
61 m_last_execution_context.mode = ExecutionMode::DEPENDENCY;
63 for (
auto& dep : m_dependencies) {
64 if (dep->validate_input(input)) {
65 dep->apply_operation_internal(input, m_last_execution_context);
69 m_last_execution_context.mode = ExecutionMode::SYNC;
70 return apply_operation_internal(input, m_last_execution_context);
90 return apply_operation(
input_type { data }).data;
98 virtual void set_parameter(
const std::string& name, std::any value) = 0;
105 [[nodiscard]]
virtual std::any
get_parameter(
const std::string& name)
const = 0;
123 [[nodiscard]]
virtual std::string
get_name()
const {
return "ComputeOperation"; }
135 m_last_execution_context.mode = ExecutionMode::DEPENDENCY;
136 return apply_operation_internal(input, m_last_execution_context);
141 m_dependencies.push_back(std::move(dep));
148 m_container = container;
158 m_last_execution_context = ctx;
163 return m_last_execution_context;
168 m_last_execution_context.pre_execution_hook = hook;
173 m_last_execution_context.post_execution_hook = hook;
178 m_last_execution_context.reconstruction_callback = callback;
193 m_gpu_backend = std::move(backend);
214 if (m_gpu_backend && m_gpu_backend->ensure_gpu_ready()) {
215 return m_gpu_backend->execute(input, context);
218 switch (context.
mode) {
219 case ExecutionMode::ASYNC:
221 return apply_operation_async(input).get();
223 case ExecutionMode::PARALLEL:
224 return apply_operation_parallel(input, context);
226 case ExecutionMode::CHAINED:
227 return apply_operation_chained(input, context);
229 case ExecutionMode::DEPENDENCY:
230 case ExecutionMode::SYNC:
232 return apply_hooks(input, context);
241 return std::async(std::launch::async, [
this, input]() {
242 return apply_hooks(input, m_last_execution_context);
251 return apply_hooks(input, ctx);
259 return apply_hooks(input, ctx);
267 std::any any_data = metadata;
269 if (m_last_execution_context.reconstruction_callback) {
270 auto reconstructed = m_last_execution_context.reconstruction_callback(result_data, any_data);
271 auto result = safe_any_cast<output_type>(reconstructed);
273 return *result.value;
277 Journal::Component::Yantra,
278 Journal::Context::Runtime,
279 "Reconstruction callback type mismatch: {}",
281 return OperationHelper::reconstruct_from_double<output_type>(result_data, metadata);
284 return OperationHelper::reconstruct_from_double<output_type>(result_data, metadata);
294 std::any input_any =
const_cast<input_type&
>(input);
298 auto result = operation_function(input);
301 std::any result_any = &result;
316 if constexpr (std::is_same_v<InputType, Kakshya::Region>) {
318 Journal::Component::Yantra,
319 Journal::Context::Runtime,
320 "InputType 'Region' is an expressive marker, not a data holder. Operations will process coordinate data rather than signal data. Consider using DataVariant or SignalSourceContainer for signal processing.");
321 }
else if constexpr (std::is_same_v<InputType, Kakshya::RegionGroup>) {
323 Journal::Component::Yantra,
324 Journal::Context::Runtime,
325 "InputType 'RegionGroup' is an expressive marker, not a data holder. Operations will process coordinate data rather than signal data. Consider using DataVariant or SignalSourceContainer for signal processing.");
326 }
else if constexpr (std::is_same_v<InputType, std::vector<Kakshya::RegionSegment>>) {
328 Journal::Component::Yantra,
329 Journal::Context::Runtime,
330 "InputType 'RegionSegments' are expressive markers, not primary data holders. Operations will attempt to extract data from segment metadata. Consider using DataVariant or SignalSourceContainer for direct signal processing.");
333 if constexpr (std::is_same_v<OutputType, Kakshya::Region>) {
335 Journal::Component::Yantra,
336 Journal::Context::Runtime,
337 "OutputType 'Region' is an expressive marker, not a data holder. Operations will create spatial/temporal markers with results as metadata.");
338 }
else if constexpr (std::is_same_v<OutputType, Kakshya::RegionGroup>) {
340 Journal::Component::Yantra,
341 Journal::Context::Runtime,
342 "OutputType 'RegionGroup' is an expressive marker, not a data holder. Operations will organize results into spatial/temporal groups.");
343 }
else if constexpr (std::is_same_v<OutputType, std::vector<Kakshya::RegionSegment>>) {
345 Journal::Component::Yantra,
346 Journal::Context::Runtime,
347 "OutputType 'RegionSegments' is an expressive marker, not a data holder. Operations will create segments with results in metadata. Consider using DataVariant or SignalSourceContainer for direct signal processing.");
#define MF_INFO(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
Local execution orchestrator for computational operations.
std::shared_ptr< GpuExecutionContext< InputType, OutputType > > m_gpu_backend
void set_last_execution_context(const ExecutionContext &ctx)
virtual void set_parameter(const std::string &name, std::any value)=0
Sets a named parameter that configures the operation's behavior.
const ExecutionContext & get_last_execution_context() const
virtual ~ComputeOperation()=default
Virtual destructor for proper cleanup of derived classes.
void set_reconstruction_callback(const ReconstructionCallback &callback)
virtual void set_container_for_regions(const std::shared_ptr< Kakshya::SignalSourceContainer > &container)
output_type apply_hooks(const input_type &input, const ExecutionContext &context)
void set_pre_execution_hook(const OperationHookCallback &hook)
virtual std::any get_parameter(const std::string &name) const =0
Retrieves a parameter's current value.
output_type apply_operation(const input_type &input)
Public synchronous execution interface.
void set_gpu_backend(std::shared_ptr< GpuExecutionContext< InputType, OutputType > > backend)
Attach a GPU execution backend.
void validate_operation_data_types() const
Validate input/output types and warn about marker types.
OutputType apply_to_data(const InputType &data)
Convenience overload that extracts just the data from result.
void set_post_execution_hook(const OperationHookCallback &hook)
ExecutionContext m_last_execution_context
virtual OperationType get_operation_type() const =0
Returns the category of this operation for grammar and registry discovery.
virtual output_type apply_operation_internal(const input_type &input, const ExecutionContext &context)
Internal execution method - ComputeMatrix can access this.
output_type operator()(const InputType &data)
Convenience overload for direct data processing (backward compatibility)
virtual output_type operation_function(const input_type &input)=0
Executes the computational transformation on the input data.
virtual bool validate_input(const input_type &) const
Validates if the input data meets the operation's requirements.
output_type apply_operation_with_dependencies(const input_type &input)
Applies the operation with dependencies resolved.
virtual output_type apply_operation_chained(const input_type &input, const ExecutionContext &ctx)
Optional chain-aware implementation - default delegates to operation_function.
bool has_gpu_backend() const
std::shared_ptr< Kakshya::SignalSourceContainer > m_container
virtual const std::shared_ptr< Kakshya::SignalSourceContainer > & get_container_for_regions() const
std::vector< std::shared_ptr< ComputeOperation > > m_dependencies
virtual std::string get_name() const
Get operation name for debugging/introspection.
const auto & get_dependencies() const
output_type convert_result(std::vector< std::vector< double > > &result_data, DataStructureInfo &metadata)
Convert processed double data back to OutputType using metadata and optional callback.
ComputeOperation()
Constructor with data type validation warnings.
virtual std::future< output_type > apply_operation_async(const input_type &input)
Optional async implementation - default delegates to operation_function.
virtual std::map< std::string, std::any > get_all_parameters() const
Retrieves all parameters and their values.
void add_dependency(std::shared_ptr< ComputeOperation > dep)
output_type execute(const input_type &input)
OpUnit interface - operations can act as units in dependency graphs.
virtual output_type apply_operation_parallel(const input_type &input, const ExecutionContext &ctx)
Optional parallel-aware implementation - default delegates to operation_function.
Base interface for all computational operations in the processing pipeline.
Type-parameterised shell over GpuDispatchCore.
@ ComputeMatrix
Compute operations (Yantra - algorithms, matrices, DSP)
std::function< void(std::any &)> OperationHookCallback
Callback type for pre/post operation hooks.
OperationType
Operation categories for organization and discovery.
std::function< std::any(std::vector< std::vector< double > > &, std::any &)> ReconstructionCallback
Callback type for custom reconstruction logic.
Metadata about data structure for reconstruction.
Input/Output container for computation pipeline data flow with structure preservation.
ExecutionMode mode
Execution mode controlling scheduling behavior.
OperationHookCallback pre_execution_hook
Optional callback invoked before operation execution.
OperationHookCallback post_execution_hook
Optional callback invoked after operation execution.
Context information controlling how a compute operation executes.