40class MAYAFLUX_API
ComputeMatrix :
public std::enable_shared_from_this<ComputeMatrix> {
45 static std::shared_ptr<ComputeMatrix>
create()
47 return std::make_shared<ComputeMatrix>();
57 template <
typename OpClass>
58 bool add_operation(
const std::string& name, std::shared_ptr<OpClass> operation)
62 return m_operations.add(name, operation);
73 template <
typename OpClass,
typename... Args>
76 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
77 if (m_operations.add(name, operation)) {
89 template <
typename OpClass>
92 return m_operations.get<OpClass>(name);
102 return m_operations.remove(name);
111 return m_operations.list_names();
119 m_operations.clear();
134 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
135 return execute_operation<OpClass, InputType, OutputType>(operation, input);
147 template <
typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
150 auto operation = m_operations.get<OpClass>(name);
153 return execute_operation<OpClass, InputType, OutputType>(operation, input);
165 template <
typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
168 return execute_operation<OpClass, InputType, OutputType>(operation, input);
183 return std::async(std::launch::async, [
this, input, args...]() {
184 return execute<OpClass, InputType, OutputType>(input, args...);
191 template <
typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
194 return std::async(std::launch::async, [
this, name, input]() {
195 return execute_named<OpClass, InputType, OutputType>(name, input);
206 template <
ComputeData InputType,
typename... OpClasses>
209 return std::make_tuple(
210 execute_async<OpClasses, InputType>(input).get()...);
222 template <
typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
224 const std::vector<std::string>& names,
227 std::vector<std::future<std::optional<Datum<OutputType>>>> futures;
228 futures.reserve(names.size());
230 for (
const auto& name : names) {
231 futures.push_back(execute_named_async<OpClass, InputType, OutputType>(name, input));
234 std::vector<std::optional<Datum<OutputType>>> results;
235 results.reserve(futures.size());
237 for (
auto& future : futures) {
238 results.push_back(future.get());
254 template <
typename FirstOp,
typename SecondOp,
260 auto first_result = execute<FirstOp, InputType, IntermediateType>(input);
264 return execute<SecondOp, IntermediateType, OutputType>(*first_result);
270 template <
typename FirstOp,
typename SecondOp,
275 const std::string& first_name,
276 const std::string& second_name,
279 auto first_result = execute_named<FirstOp, InputType, IntermediateType>(first_name, input);
283 return execute_named<SecondOp, IntermediateType, OutputType>(second_name, *first_result);
300 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
302 std::vector<std::optional<Datum<OutputType>>> results;
303 results.reserve(inputs.size());
305 for (
const auto& input : inputs) {
306 results.push_back(execute_operation<OpClass, InputType, OutputType>(operation, input));
320 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
322 std::vector<std::optional<Datum<OutputType>>> results(inputs.size());
324 MayaFlux::Parallel::transform(MayaFlux::Parallel::par_unseq,
325 inputs.begin(), inputs.end(),
328 return execute_operation<OpClass, InputType, OutputType>(operation, input);
340 template <ComputeData StartType>
346 template <ComputeData StartType>
356 template <ComputeData StartType>
362 template <ComputeData StartType>
400 template <ComputeData StartType,
typename ChainFunc,
typename CompleteFn>
403 auto self = shared_from_this();
404 register_async(std::async(std::launch::async,
406 input = std::move(input),
407 chain = std::forward<ChainFunc>(chain),
408 on_complete = std::forward<CompleteFn>(on_complete)]()
mutable {
414 template <ComputeData StartType,
typename ChainFunc,
typename CompleteFn>
415 void with_async(StartType input, ChainFunc&& chain, CompleteFn&& on_complete)
418 std::forward<ChainFunc>(chain),
419 std::forward<CompleteFn>(on_complete));
428 m_execution_policy = policy;
436 return m_execution_policy;
444 m_profiling_enabled = enabled;
452 auto stats = m_operations.get_statistics();
453 stats[
"total_executions"] = m_total_executions.load();
454 stats[
"failed_executions"] = m_failed_executions.load();
455 if (m_profiling_enabled) {
456 stats[
"average_execution_time_ms"] = m_average_execution_time.load();
466 std::function<
void(
ExecutionContext&,
const std::type_index&)> configurator)
468 m_context_configurator = std::move(configurator);
477 m_default_timeout = timeout;
486 template <
typename OpClass, ComputeData InputType, ComputeData OutputType>
488 std::shared_ptr<OpClass> operation,
494 m_total_executions++;
498 configure_execution_context(ctx, std::type_index(
typeid(OpClass)));
500 auto start = std::chrono::steady_clock::now();
502 auto result = operation->apply_operation_internal(input, ctx);
504 if (m_profiling_enabled) {
505 auto end = std::chrono::steady_clock::now();
506 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
507 update_execution_time(
duration.count());
512 }
catch (
const std::exception& e) {
513 m_failed_executions++;
514 handle_execution_error(e, std::type_index(
typeid(OpClass)));
524 switch (m_execution_policy) {
525 case ExecutionPolicy::CONSERVATIVE:
526 case ExecutionPolicy::BALANCED:
527 ctx.
mode = ExecutionMode::SYNC;
529 case ExecutionPolicy::AGGRESSIVE:
530 ctx.
mode = ExecutionMode::PARALLEL;
534 ctx.
timeout = m_default_timeout;
536 if (m_context_configurator) {
537 m_context_configurator(ctx, op_type);
548 m_last_error = e.what();
549 m_last_error_type = op_type;
551 if (m_error_callback) {
552 m_error_callback(e, op_type);
561 double current_avg = m_average_execution_time.load();
562 double total_execs = m_total_executions.load();
563 double new_avg = (current_avg * (total_execs - 1) + ms) / total_execs;
564 m_average_execution_time.store(new_avg);
569 std::lock_guard lk(m_async_mtx);
571 std::erase_if(m_async_futures, [](std::future<void>& f) {
572 return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
574 m_async_futures.push_back(std::move(f));
580 std::chrono::milliseconds m_default_timeout { 0 };
583 std::atomic<size_t> m_total_executions { 0 };
584 std::atomic<size_t> m_failed_executions { 0 };
585 std::atomic<double> m_average_execution_time { 0.0 };
586 bool m_profiling_enabled =
false;
592 std::type_index m_last_error_type {
typeid(void) };
600 std::function<
void(
const std::exception&,
const std::type_index&)> callback)
602 m_error_callback = std::move(callback);
621 std::lock_guard lk(m_async_mtx);
622 for (
auto& f : m_async_futures) {
626 m_async_futures.clear();
std::optional< double > duration
std::unordered_map< std::string, std::any > get_statistics() const
Get execution statistics.
std::vector< std::optional< Datum< OutputType > > > execute_batch(const std::vector< Datum< InputType > > &inputs, Args &&... args)
Execute operation on multiple inputs.
std::vector< std::optional< Datum< OutputType > > > execute_parallel_named(const std::vector< std::string > &names, const Datum< InputType > &input)
Execute multiple named operations in parallel.
ExecutionPolicy get_execution_policy() const
Get current execution policy.
void update_execution_time(double ms)
Update execution time statistics.
std::future< std::optional< Datum< OutputType > > > execute_named_async(const std::string &name, const Datum< InputType > &input)
Execute named operation asynchronously.
FluentExecutor< ComputeMatrix, StartType > with(StartType &&input)
std::optional< Datum< OutputType > > execute_named(const std::string &name, const Datum< InputType > &input)
Execute a named operation from the pool.
std::optional< Datum< OutputType > > execute_chain(const Datum< InputType > &input)
Execute operations in sequence (type-safe chain)
void set_context_configurator(std::function< void(ExecutionContext &, const std::type_index &)> configurator)
Set custom context configurator.
std::shared_ptr< OpClass > get_operation(const std::string &name)
Get a named operation from this matrix.
OperationPool m_operations
void with_async(StartType input, ChainFunc &&chain, CompleteFn &&on_complete)
std::future< std::optional< Datum< OutputType > > > execute_async(const Datum< InputType > &input, Args &&... args)
Execute operation asynchronously.
void set_error_callback(std::function< void(const std::exception &, const std::type_index &)> callback)
Set error callback.
std::string get_last_error() const
Get last error message.
void configure_execution_context(ExecutionContext &ctx, const std::type_index &op_type)
Configure execution context based on operation type and policy.
std::vector< std::optional< Datum< OutputType > > > execute_batch_parallel(const std::vector< Datum< InputType > > &inputs, Args &&... args)
Execute operation on multiple inputs in parallel.
std::function< void(const std::exception &, const std::type_index &)> m_error_callback
FluentExecutor< ComputeMatrix, StartType > with(const Datum< StartType > &input)
Create a fluent executor for chaining operations.
std::vector< std::string > list_operations() const
List all operation names in this matrix.
void set_profiling(bool enabled)
Enable/disable execution profiling.
std::vector< std::future< void > > m_async_futures
std::shared_ptr< OpClass > create_operation(const std::string &name, Args &&... args)
Create and add an operation to this matrix.
std::optional< Datum< OutputType > > execute_with(std::shared_ptr< OpClass > operation, const Datum< InputType > &input)
Execute with provided operation instance.
void handle_execution_error(const std::exception &e, const std::type_index &op_type)
Handle execution errors.
void set_default_timeout(std::chrono::milliseconds timeout)
Set default execution timeout.
void set_execution_policy(ExecutionPolicy policy)
Set execution policy for this matrix.
std::optional< Datum< OutputType > > execute(const Datum< InputType > &input, Args &&... args)
Execute an operation by creating a new instance.
void register_async(std::future< void > f)
std::optional< Datum< OutputType > > execute_chain_named(const std::string &first_name, const std::string &second_name, const Datum< InputType > &input)
Execute named operations in sequence.
bool add_operation(const std::string &name, std::shared_ptr< OpClass > operation)
Add a pre-configured operation instance to this matrix.
void clear_operations()
Clear all operations from this matrix.
FluentExecutor< ComputeMatrix, StartType > with(Datum< StartType > &&input)
Create a fluent executor with move semantics.
void with_async(Datum< StartType > input, ChainFunc &&chain, CompleteFn &&on_complete)
Execute an operation chain asynchronously.
std::function< void(ExecutionContext &, const std::type_index &)> m_context_configurator
auto execute_parallel(const Datum< InputType > &input)
Execute multiple operations in parallel.
FluentExecutor< ComputeMatrix, StartType > with(const StartType &input)
std::optional< Datum< OutputType > > execute_operation(std::shared_ptr< OpClass > operation, const Datum< InputType > &input)
Core execution implementation.
void drain_async()
Block until all in-flight async chains complete.
bool remove_operation(const std::string &name)
Remove a named operation from this matrix.
static std::shared_ptr< ComputeMatrix > create()
Create a new ComputeMatrix instance.
Local execution orchestrator for computational operations.
Fluent interface for chaining operations on any executor.
Thread-safe pool for managing named operation instances.
Universal concept for types that can be used as data in compute operations.
ExecutionPolicy
Policy for execution strategy selection.
Input/Output container for computation pipeline data flow with structure preservation.
ExecutionMode mode
Execution mode controlling scheduling behavior.
std::chrono::milliseconds timeout
Optional timeout for operation execution.
Context information controlling how a compute operation executes.