40class MAYAFLUX_API
ComputeMatrix :
public std::enable_shared_from_this<ComputeMatrix> {
45 static std::shared_ptr<ComputeMatrix>
create()
47 return std::make_shared<ComputeMatrix>();
57 template <
typename OpClass>
58 bool add_operation(
const std::string& name, std::shared_ptr<OpClass> operation)
62 return m_operations.add(name, operation);
73 template <
typename OpClass,
typename... Args>
76 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
77 if (m_operations.add(name, operation)) {
89 template <
typename OpClass>
92 return m_operations.get<OpClass>(name);
102 return m_operations.remove(name);
111 return m_operations.list_names();
119 m_operations.clear();
132 std::optional<IO<OutputType>>
execute(
const InputType& input, Args&&... args)
134 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
135 return execute_operation<OpClass, InputType, OutputType>(operation, input);
147 template <
typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
148 std::optional<IO<OutputType>>
execute_named(
const std::string& name,
const InputType& input)
150 auto operation = m_operations.get<OpClass>(name);
153 return execute_operation<OpClass, InputType, OutputType>(operation, input);
165 template <
typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
166 std::optional<IO<OutputType>>
execute_with(std::shared_ptr<OpClass> operation,
const InputType& input)
168 return execute_operation<OpClass, InputType, OutputType>(operation, input);
181 std::future<std::optional<IO<OutputType>>>
execute_async(
const InputType& input, Args&&... args)
183 return std::async(std::launch::async, [
this, input, args...]() {
184 return execute<OpClass, InputType, OutputType>(input, args...);
191 template <
typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
192 std::future<std::optional<IO<OutputType>>>
execute_named_async(
const std::string& name,
const InputType& input)
194 return std::async(std::launch::async, [
this, name, input]() {
195 return execute_named<OpClass, InputType, OutputType>(name, input);
206 template <
ComputeData InputType,
typename... OpClasses>
209 return std::make_tuple(
210 execute_async<OpClasses, InputType>(input).get()...);
222 template <
typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
224 const std::vector<std::string>& names,
225 const InputType& input)
228 std::vector<std::future<std::optional<IO<OutputType>>>> futures;
229 futures.reserve(names.size());
231 for (
const auto& name : names) {
232 futures.push_back(execute_named_async<OpClass, InputType, OutputType>(name, input));
235 std::vector<std::optional<IO<OutputType>>> results;
236 results.reserve(futures.size());
238 for (
auto& future : futures) {
239 results.push_back(future.get());
255 template <
typename FirstOp,
typename SecondOp,
261 auto first_result = execute<FirstOp, InputType, IntermediateType>(input);
265 return execute<SecondOp, IntermediateType, OutputType>(first_result->data);
271 template <
typename FirstOp,
typename SecondOp,
276 const std::string& first_name,
277 const std::string& second_name,
278 const InputType& input)
281 auto first_result = execute_named<FirstOp, InputType, IntermediateType>(first_name, input);
285 return execute_named<SecondOp, IntermediateType, OutputType>(second_name, first_result->data);
299 const std::vector<InputType>& inputs,
303 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
305 std::vector<std::optional<IO<OutputType>>> results;
306 results.reserve(inputs.size());
308 for (
const auto& input : inputs) {
309 results.push_back(execute_operation<OpClass, InputType, OutputType>(operation, input));
320 const std::vector<InputType>& inputs,
324 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
326 std::vector<std::optional<IO<OutputType>>> results(inputs.size());
328 MayaFlux::Parallel::transform(MayaFlux::Parallel::par_unseq,
329 inputs.begin(), inputs.end(),
331 [
this, operation](
const InputType& input) {
332 return execute_operation<OpClass, InputType, OutputType>(operation, input);
344 template <ComputeData StartType>
353 template <ComputeData StartType>
365 m_execution_policy = policy;
373 return m_execution_policy;
381 m_profiling_enabled = enabled;
389 auto stats = m_operations.get_statistics();
390 stats[
"total_executions"] = m_total_executions.load();
391 stats[
"failed_executions"] = m_failed_executions.load();
392 if (m_profiling_enabled) {
393 stats[
"average_execution_time_ms"] = m_average_execution_time.load();
403 std::function<
void(
ExecutionContext&,
const std::type_index&)> configurator)
405 m_context_configurator = std::move(configurator);
414 m_default_timeout = timeout;
423 template <
typename OpClass, ComputeData InputType, ComputeData OutputType>
425 std::shared_ptr<OpClass> operation,
426 const InputType& input)
431 m_total_executions++;
437 configure_execution_context(ctx, std::type_index(
typeid(OpClass)));
439 auto start = std::chrono::steady_clock::now();
441 auto result = operation->apply_operation_internal(input_wrapper, ctx);
443 if (m_profiling_enabled) {
444 auto end = std::chrono::steady_clock::now();
445 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
446 update_execution_time(duration.count());
451 }
catch (
const std::exception& e) {
452 m_failed_executions++;
453 handle_execution_error(e, std::type_index(
typeid(OpClass)));
463 switch (m_execution_policy) {
464 case ExecutionPolicy::CONSERVATIVE:
465 case ExecutionPolicy::BALANCED:
466 ctx.
mode = ExecutionMode::SYNC;
468 case ExecutionPolicy::AGGRESSIVE:
469 ctx.
mode = ExecutionMode::PARALLEL;
473 ctx.
timeout = m_default_timeout;
475 if (m_context_configurator) {
476 m_context_configurator(ctx, op_type);
487 m_last_error = e.what();
488 m_last_error_type = op_type;
490 if (m_error_callback) {
491 m_error_callback(e, op_type);
500 double current_avg = m_average_execution_time.load();
501 double total_execs = m_total_executions.load();
502 double new_avg = (current_avg * (total_execs - 1) + ms) / total_execs;
503 m_average_execution_time.store(new_avg);
509 std::chrono::milliseconds m_default_timeout { 0 };
512 std::atomic<size_t> m_total_executions { 0 };
513 std::atomic<size_t> m_failed_executions { 0 };
514 std::atomic<double> m_average_execution_time { 0.0 };
515 bool m_profiling_enabled =
false;
518 std::type_index m_last_error_type {
typeid(void) };
526 std::function<
void(
const std::exception&,
const std::type_index&)> callback)
528 m_error_callback = std::move(callback);
std::unordered_map< std::string, std::any > get_statistics() const
Get execution statistics.
ExecutionPolicy get_execution_policy() const
Get current execution policy.
std::future< std::optional< IO< OutputType > > > execute_async(const InputType &input, Args &&... args)
Execute operation asynchronously.
void update_execution_time(double ms)
Update execution time statistics.
FluentExecutor< ComputeMatrix, StartType > with(StartType &&input)
Create a fluent executor with move semantics.
std::vector< std::optional< IO< OutputType > > > execute_batch(const std::vector< InputType > &inputs, Args &&... args)
Execute operation on multiple inputs.
std::optional< IO< OutputType > > execute_operation(std::shared_ptr< OpClass > operation, const InputType &input)
Core execution implementation.
std::optional< IO< OutputType > > execute(const InputType &input, Args &&... args)
Execute an operation by creating a new instance.
void set_context_configurator(std::function< void(ExecutionContext &, const std::type_index &)> configurator)
Set custom context configurator.
std::shared_ptr< OpClass > get_operation(const std::string &name)
Get a named operation from this matrix.
OperationPool m_operations
void set_error_callback(std::function< void(const std::exception &, const std::type_index &)> callback)
Set error callback.
std::string get_last_error() const
Get last error message.
void configure_execution_context(ExecutionContext &ctx, const std::type_index &op_type)
Configure execution context based on operation type and policy.
std::function< void(const std::exception &, const std::type_index &)> m_error_callback
std::vector< std::string > list_operations() const
List all operation names in this matrix.
void set_profiling(bool enabled)
Enable/disable execution profiling.
std::shared_ptr< OpClass > create_operation(const std::string &name, Args &&... args)
Create and add an operation to this matrix.
void handle_execution_error(const std::exception &e, const std::type_index &op_type)
Handle execution errors.
void set_default_timeout(std::chrono::milliseconds timeout)
Set default execution timeout.
std::vector< std::optional< IO< OutputType > > > execute_parallel_named(const std::vector< std::string > &names, const InputType &input)
Execute multiple named operations in parallel.
void set_execution_policy(ExecutionPolicy policy)
Set execution policy for this matrix.
std::future< std::optional< IO< OutputType > > > execute_named_async(const std::string &name, const InputType &input)
Execute named operation asynchronously.
std::optional< IO< OutputType > > execute_with(std::shared_ptr< OpClass > operation, const InputType &input)
Execute with provided operation instance.
bool add_operation(const std::string &name, std::shared_ptr< OpClass > operation)
Add a pre-configured operation instance to this matrix.
void clear_operations()
Clear all operations from this matrix.
std::optional< IO< OutputType > > execute_chain_named(const std::string &first_name, const std::string &second_name, const InputType &input)
Execute named operations in sequence.
std::vector< std::optional< IO< OutputType > > > execute_batch_parallel(const std::vector< InputType > &inputs, Args &&... args)
Execute operation on multiple inputs in parallel.
std::function< void(ExecutionContext &, const std::type_index &)> m_context_configurator
FluentExecutor< ComputeMatrix, StartType > with(const StartType &input)
Create a fluent executor for chaining operations.
std::optional< IO< OutputType > > execute_chain(const InputType &input)
Execute operations in sequence (type-safe chain)
auto execute_parallel(const InputType &input)
Execute multiple operations in parallel.
std::optional< IO< OutputType > > execute_named(const std::string &name, const InputType &input)
Execute a named operation from the pool.
bool remove_operation(const std::string &name)
Remove a named operation from this matrix.
static std::shared_ptr< ComputeMatrix > create()
Create a new ComputeMatrix instance.
Local execution orchestrator for computational operations.
Fluent interface for chaining operations on any executor.
Thread-safe pool for managing named operation instances.
Universal concept for types that can be used as data in compute operations.
ExecutionPolicy
Policy for execution strategy selection.
std::chrono::milliseconds timeout
Context information for operation execution.
Input/Output container for computation pipeline data flow with structure preservation.