MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
ComputeMatrix.hpp
Go to the documentation of this file.
1#pragma once
2
6
8
9namespace MayaFlux::Yantra {
10
11/**
12 * @enum ExecutionPolicy
13 * @brief Policy for execution strategy selection
14 */
15enum class ExecutionPolicy : uint8_t {
16 CONSERVATIVE, // Prefer safety and predictability
17 BALANCED, // Balance between performance and safety
18 AGGRESSIVE // Maximize performance
19};
20
21/**
22 * @class ComputeMatrix
23 * @brief Local execution orchestrator for computational operations
24 *
25 * ComputeMatrix provides a self-contained execution environment for operations.
26 * It maintains its own operation instances and execution strategies without
27 * relying on any global registries. Each matrix instance is independent.
28 *
29 * Key Design Principles:
30 * - Instance-local operation management
31 * - Focus on execution patterns and strategies
32 * - Clean separation from registration concerns
33 *
34 * Core Responsibilities:
35 * - Execute operations with various strategies (sync, async, parallel, chain)
36 * - Manage instance-local named operations
37 * - Provide fluent interface through FluentExecutor
38 * - Configure execution contexts for optimization
39 */
40class MAYAFLUX_API ComputeMatrix : public std::enable_shared_from_this<ComputeMatrix> {
41public:
42 /**
43 * @brief Create a new ComputeMatrix instance
44 */
45 static std::shared_ptr<ComputeMatrix> create()
46 {
47 return std::make_shared<ComputeMatrix>();
48 }
49
50 /**
51 * @brief Add a pre-configured operation instance to this matrix
52 * @tparam OpClass Operation class type
53 * @param name Unique name within this matrix
54 * @param operation Shared pointer to the operation
55 * @return true if added successfully
56 */
57 template <typename OpClass>
58 bool add_operation(const std::string& name, std::shared_ptr<OpClass> operation)
59 {
60 if (!operation)
61 return false;
62 return m_operations.add(name, operation);
63 }
64
65 /**
66 * @brief Create and add an operation to this matrix
67 * @tparam OpClass Operation class type
68 * @tparam Args Constructor argument types
69 * @param name Unique name within this matrix
70 * @param args Constructor arguments
71 * @return Shared pointer to the created operation
72 */
73 template <typename OpClass, typename... Args>
74 std::shared_ptr<OpClass> create_operation(const std::string& name, Args&&... args)
75 {
76 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
77 if (m_operations.add(name, operation)) {
78 return operation;
79 }
80 return nullptr;
81 }
82
83 /**
84 * @brief Get a named operation from this matrix
85 * @tparam OpClass Expected operation type
86 * @param name Operation name
87 * @return Shared pointer to operation or nullptr
88 */
89 template <typename OpClass>
90 std::shared_ptr<OpClass> get_operation(const std::string& name)
91 {
92 return m_operations.get<OpClass>(name);
93 }
94
95 /**
96 * @brief Remove a named operation from this matrix
97 * @param name Operation name to remove
98 * @return true if removed successfully
99 */
100 bool remove_operation(const std::string& name)
101 {
102 return m_operations.remove(name);
103 }
104
105 /**
106 * @brief List all operation names in this matrix
107 * @return Vector of operation names
108 */
109 std::vector<std::string> list_operations() const
110 {
111 return m_operations.list_names();
112 }
113
114 /**
115 * @brief Clear all operations from this matrix
116 */
118 {
119 m_operations.clear();
120 }
121
122 /**
123 * @brief Execute an operation by creating a new instance
124 * @tparam OpClass Operation class to instantiate and execute
125 * @tparam Datum InputType Input data type
126 * @tparam Datum OutputType Output data type
127 * @param input Input data
128 * @param args Constructor arguments for the operation
129 * @return Optional containing result or nullopt on failure
130 */
131 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType, typename... Args>
132 std::optional<Datum<OutputType>> execute(const Datum<InputType>& input, Args&&... args)
133 {
134 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
135 return execute_operation<OpClass, InputType, OutputType>(operation, input);
136 }
137
138 /**
139 * @brief Execute a named operation from the pool
140 * @tparam OpClass Operation class type
141 * @tparam Datum InputType Input data type
142 * @tparam Datum OutputType Output data type
143 * @param name Name of the operation
144 * @param input Input data
145 * @return Optional containing result or nullopt on failure
146 */
147 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
148 std::optional<Datum<OutputType>> execute_named(const std::string& name, const Datum<InputType>& input)
149 {
150 auto operation = m_operations.get<OpClass>(name);
151 if (!operation)
152 return std::nullopt;
153 return execute_operation<OpClass, InputType, OutputType>(operation, input);
154 }
155
156 /**
157 * @brief Execute with provided operation instance
158 * @tparam OpClass Operation class type
159 * @tparam Datum InputType Input data type
160 * @tparam Datum OutputType Output data type
161 * @param operation Operation instance to execute
162 * @param input Input data
163 * @return Optional containing result or nullopt on failure
164 */
165 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
166 std::optional<Datum<OutputType>> execute_with(std::shared_ptr<OpClass> operation, const Datum<InputType>& input)
167 {
168 return execute_operation<OpClass, InputType, OutputType>(operation, input);
169 }
170
171 /**
172 * @brief Execute operation asynchronously
173 * @tparam OpClass Operation class type
174 * @tparam InputType Input data type
175 * @tparam OutputType Output data type
176 * @param input Input data
177 * @param args Constructor arguments
178 * @return Future containing the result
179 */
180 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType, typename... Args>
181 std::future<std::optional<Datum<OutputType>>> execute_async(const Datum<InputType>& input, Args&&... args)
182 {
183 return std::async(std::launch::async, [this, input, args...]() {
184 return execute<OpClass, InputType, OutputType>(input, args...);
185 });
186 }
187
188 /**
189 * @brief Execute named operation asynchronously
190 */
191 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
192 std::future<std::optional<Datum<OutputType>>> execute_named_async(const std::string& name, const Datum<InputType>& input)
193 {
194 return std::async(std::launch::async, [this, name, input]() {
195 return execute_named<OpClass, InputType, OutputType>(name, input);
196 });
197 }
198
199 /**
200 * @brief Execute multiple operations in parallel
201 * @tparam InputType Input data type
202 * @tparam OpClasses Operation classes to execute
203 * @param input Input data
204 * @return Tuple of results from each operation
205 */
206 template <ComputeData InputType, typename... OpClasses>
208 {
209 return std::make_tuple(
210 execute_async<OpClasses, InputType>(input).get()...);
211 }
212
213 /**
214 * @brief Execute multiple named operations in parallel
215 * @tparam OpClass Base operation class
216 * @tparam InputType Input data type
217 * @tparam OutputType Output data type
218 * @param names Vector of operation names
219 * @param input Input data
220 * @return Vector of results
221 */
222 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
223 std::vector<std::optional<Datum<OutputType>>> execute_parallel_named(
224 const std::vector<std::string>& names,
225 const Datum<InputType>& input)
226 {
227 std::vector<std::future<std::optional<Datum<OutputType>>>> futures;
228 futures.reserve(names.size());
229
230 for (const auto& name : names) {
231 futures.push_back(execute_named_async<OpClass, InputType, OutputType>(name, input));
232 }
233
234 std::vector<std::optional<Datum<OutputType>>> results;
235 results.reserve(futures.size());
236
237 for (auto& future : futures) {
238 results.push_back(future.get());
239 }
240
241 return results;
242 }
243
244 /**
245 * @brief Execute operations in sequence (type-safe chain)
246 * @tparam FirstOp First operation type
247 * @tparam SecondOp Second operation type
248 * @tparam InputType Initial input type
249 * @tparam IntermediateType Type between operations
250 * @tparam OutputType Final output type
251 * @param input Initial input
252 * @return Optional containing final result
253 */
254 template <typename FirstOp, typename SecondOp,
255 ComputeData InputType,
256 ComputeData IntermediateType,
257 ComputeData OutputType>
258 std::optional<Datum<OutputType>> execute_chain(const Datum<InputType>& input)
259 {
260 auto first_result = execute<FirstOp, InputType, IntermediateType>(input);
261 if (!first_result)
262 return std::nullopt;
263
264 return execute<SecondOp, IntermediateType, OutputType>(*first_result);
265 }
266
267 /**
268 * @brief Execute named operations in sequence
269 */
270 template <typename FirstOp, typename SecondOp,
271 ComputeData InputType,
272 ComputeData IntermediateType,
273 ComputeData OutputType>
274 std::optional<Datum<OutputType>> execute_chain_named(
275 const std::string& first_name,
276 const std::string& second_name,
277 const Datum<InputType>& input)
278 {
279 auto first_result = execute_named<FirstOp, InputType, IntermediateType>(first_name, input);
280 if (!first_result)
281 return std::nullopt;
282
283 return execute_named<SecondOp, IntermediateType, OutputType>(second_name, *first_result);
284 }
285
286 /**
287 * @brief Execute operation on multiple inputs
288 * @tparam OpClass Operation class
289 * @tparam InputType Input data type
290 * @tparam OutputType Output data type
291 * @param inputs Vector of inputs
292 * @param args Constructor arguments for operation
293 * @return Vector of results
294 */
295 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType, typename... Args>
296 std::vector<std::optional<Datum<OutputType>>> execute_batch(
297 const std::vector<Datum<InputType>>& inputs,
298 Args&&... args)
299 {
300 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
301
302 std::vector<std::optional<Datum<OutputType>>> results;
303 results.reserve(inputs.size());
304
305 for (const auto& input : inputs) {
306 results.push_back(execute_operation<OpClass, InputType, OutputType>(operation, input));
307 }
308
309 return results;
310 }
311
312 /**
313 * @brief Execute operation on multiple inputs in parallel
314 */
315 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType, typename... Args>
316 std::vector<std::optional<Datum<OutputType>>> execute_batch_parallel(
317 const std::vector<Datum<InputType>>& inputs,
318 Args&&... args)
319 {
320 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
321
322 std::vector<std::optional<Datum<OutputType>>> results(inputs.size());
323
324 MayaFlux::Parallel::transform(MayaFlux::Parallel::par_unseq,
325 inputs.begin(), inputs.end(),
326 results.begin(),
327 [this, operation](const Datum<InputType>& input) {
328 return execute_operation<OpClass, InputType, OutputType>(operation, input);
329 });
330
331 return results;
332 }
333
334 /**
335 * @brief Create a fluent executor for chaining operations
336 * @tparam StartType Initial data type
337 * @param input Initial input data
338 * @return FluentExecutor for this matrix
339 */
340 template <ComputeData StartType>
342 {
343 return FluentExecutor<ComputeMatrix, StartType>(shared_from_this(), input);
344 }
345
346 template <ComputeData StartType>
348 {
349 return with(Datum<StartType>(input));
350 }
351
352 /**
353 * @brief Create a fluent executor with move semantics
354 */
355
356 template <ComputeData StartType>
358 {
359 return FluentExecutor<ComputeMatrix, StartType>(shared_from_this(), std::forward<Datum<StartType>>(input));
360 }
361
362 template <ComputeData StartType>
364 {
365 return with(Datum<StartType>(std::forward<StartType>(input)));
366 }
367
368 /**
369 * @brief Execute an operation chain asynchronously.
370 *
371 * @p chain receives a FluentExecutor seeded with @p input and must return
372 * the terminal Datum. The entire sequence runs on a background thread.
373 * @p on_complete is invoked with the result on that same thread.
374 *
375 * The future is owned by this matrix instance. drain_async() and the
376 * destructor guarantee all in-flight chains finish before the matrix
377 * is destroyed. No thread management is required by the caller.
378 *
379 * @code
380 * matrix->with_async(make_granular_input(container),
381 * [](auto chain) {
382 * return chain.then<SegmentOp>("segment")
383 * .then<AttributeOp>("attribute")
384 * .then<SortOp>("sort")
385 * .to_io();
386 * },
387 * [ex](Datum<Kakshya::RegionGroup> result) {
388 * ex->grains = std::move(result);
389 * ex->ready = true;
390 * });
391 * @endcode
392 *
393 * @tparam StartType Input data type.
394 * @tparam ChainFunc Callable: (FluentExecutor<ComputeMatrix, StartType>) -> Datum<ResultType>.
395 * @tparam CompleteFn Callable: (Datum<R>) -> void, where R is deduced from ChainFunc.
396 * @param input Seed datum for the chain.
397 * @param chain Lambda describing the full operation sequence.
398 * @param on_complete Called with the final Datum on completion.
399 */
400 template <ComputeData StartType, typename ChainFunc, typename CompleteFn>
401 void with_async(Datum<StartType> input, ChainFunc&& chain, CompleteFn&& on_complete)
402 {
403 auto self = shared_from_this();
404 register_async(std::async(std::launch::async,
405 [self,
406 input = std::move(input),
407 chain = std::forward<ChainFunc>(chain),
408 on_complete = std::forward<CompleteFn>(on_complete)]() mutable {
409 on_complete(chain(
410 FluentExecutor<ComputeMatrix, StartType>(self, std::move(input))));
411 }));
412 }
413
414 template <ComputeData StartType, typename ChainFunc, typename CompleteFn>
415 void with_async(StartType input, ChainFunc&& chain, CompleteFn&& on_complete)
416 {
417 with_async(Datum<StartType>(std::move(input)),
418 std::forward<ChainFunc>(chain),
419 std::forward<CompleteFn>(on_complete));
420 }
421
422 /**
423 * @brief Set execution policy for this matrix
424 * @param policy Execution policy to use
425 */
427 {
428 m_execution_policy = policy;
429 }
430
431 /**
432 * @brief Get current execution policy
433 */
435 {
436 return m_execution_policy;
437 }
438
439 /**
440 * @brief Enable/disable execution profiling
441 */
442 void set_profiling(bool enabled)
443 {
444 m_profiling_enabled = enabled;
445 }
446
447 /**
448 * @brief Get execution statistics
449 */
450 std::unordered_map<std::string, std::any> get_statistics() const
451 {
452 auto stats = m_operations.get_statistics();
453 stats["total_executions"] = m_total_executions.load();
454 stats["failed_executions"] = m_failed_executions.load();
455 if (m_profiling_enabled) {
456 stats["average_execution_time_ms"] = m_average_execution_time.load();
457 }
458 return stats;
459 }
460
461 /**
462 * @brief Set custom context configurator
463 * @param configurator Function to configure execution contexts
464 */
466 std::function<void(ExecutionContext&, const std::type_index&)> configurator)
467 {
468 m_context_configurator = std::move(configurator);
469 }
470
471 /**
472 * @brief Set default execution timeout
473 * @param timeout Timeout duration
474 */
475 void set_default_timeout(std::chrono::milliseconds timeout)
476 {
477 m_default_timeout = timeout;
478 }
479
480 ComputeMatrix() = default;
481
482private:
483 /**
484 * @brief Core execution implementation
485 */
486 template <typename OpClass, ComputeData InputType, ComputeData OutputType>
487 std::optional<Datum<OutputType>> execute_operation(
488 std::shared_ptr<OpClass> operation,
489 const Datum<InputType>& input)
490 {
491 if (!operation)
492 return std::nullopt;
493
494 m_total_executions++;
495
496 try {
498 configure_execution_context(ctx, std::type_index(typeid(OpClass)));
499
500 auto start = std::chrono::steady_clock::now();
501
502 auto result = operation->apply_operation_internal(input, ctx);
503
504 if (m_profiling_enabled) {
505 auto end = std::chrono::steady_clock::now();
506 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
507 update_execution_time(duration.count());
508 }
509
510 return result;
511
512 } catch (const std::exception& e) {
513 m_failed_executions++;
514 handle_execution_error(e, std::type_index(typeid(OpClass)));
515 return std::nullopt;
516 }
517 }
518
519 /**
520 * @brief Configure execution context based on operation type and policy
521 */
522 void configure_execution_context(ExecutionContext& ctx, const std::type_index& op_type)
523 {
524 switch (m_execution_policy) {
525 case ExecutionPolicy::CONSERVATIVE:
526 case ExecutionPolicy::BALANCED:
527 ctx.mode = ExecutionMode::SYNC;
528 break;
529 case ExecutionPolicy::AGGRESSIVE:
530 ctx.mode = ExecutionMode::PARALLEL;
531 break;
532 }
533
534 ctx.timeout = m_default_timeout;
535
536 if (m_context_configurator) {
537 m_context_configurator(ctx, op_type);
538 }
539
540 // TODO: Add thread pool assignment, GPU context, etc.
541 }
542
543 /**
544 * @brief Handle execution errors
545 */
546 void handle_execution_error(const std::exception& e, const std::type_index& op_type)
547 {
548 m_last_error = e.what();
549 m_last_error_type = op_type;
550
551 if (m_error_callback) {
552 m_error_callback(e, op_type);
553 }
554 }
555
556 /**
557 * @brief Update execution time statistics
558 */
559 void update_execution_time(double ms)
560 {
561 double current_avg = m_average_execution_time.load();
562 double total_execs = m_total_executions.load();
563 double new_avg = (current_avg * (total_execs - 1) + ms) / total_execs;
564 m_average_execution_time.store(new_avg);
565 }
566
567 void register_async(std::future<void> f)
568 {
569 std::lock_guard lk(m_async_mtx);
570
571 std::erase_if(m_async_futures, [](std::future<void>& f) {
572 return f.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
573 });
574 m_async_futures.push_back(std::move(f));
575 }
576
578
579 ExecutionPolicy m_execution_policy = ExecutionPolicy::BALANCED;
580 std::chrono::milliseconds m_default_timeout { 0 };
581 std::function<void(ExecutionContext&, const std::type_index&)> m_context_configurator;
582
583 std::atomic<size_t> m_total_executions { 0 };
584 std::atomic<size_t> m_failed_executions { 0 };
585 std::atomic<double> m_average_execution_time { 0.0 };
586 bool m_profiling_enabled = false;
587
588 std::mutex m_async_mtx;
589 std::vector<std::future<void>> m_async_futures;
590
591 std::string m_last_error;
592 std::type_index m_last_error_type { typeid(void) };
593 std::function<void(const std::exception&, const std::type_index&)> m_error_callback;
594
595public:
596 /**
597 * @brief Set error callback
598 */
600 std::function<void(const std::exception&, const std::type_index&)> callback)
601 {
602 m_error_callback = std::move(callback);
603 }
604
605 /**
606 * @brief Get last error message
607 */
608 std::string get_last_error() const
609 {
610 return m_last_error;
611 }
612
613 /**
614 * @brief Block until all in-flight async chains complete.
615 *
616 * Called automatically by the destructor. May also be called explicitly
617 * before teardown of any resource a chain callback might write into.
618 */
620 {
621 std::lock_guard lk(m_async_mtx);
622 for (auto& f : m_async_futures) {
623 f.wait();
624 }
625
626 m_async_futures.clear();
627 }
628
630 {
631 drain_async();
632 }
633};
634
635} // namespace MayaFlux::Yantra
std::optional< double > duration
std::unordered_map< std::string, std::any > get_statistics() const
Get execution statistics.
std::vector< std::optional< Datum< OutputType > > > execute_batch(const std::vector< Datum< InputType > > &inputs, Args &&... args)
Execute operation on multiple inputs.
std::vector< std::optional< Datum< OutputType > > > execute_parallel_named(const std::vector< std::string > &names, const Datum< InputType > &input)
Execute multiple named operations in parallel.
ExecutionPolicy get_execution_policy() const
Get current execution policy.
void update_execution_time(double ms)
Update execution time statistics.
std::future< std::optional< Datum< OutputType > > > execute_named_async(const std::string &name, const Datum< InputType > &input)
Execute named operation asynchronously.
FluentExecutor< ComputeMatrix, StartType > with(StartType &&input)
std::optional< Datum< OutputType > > execute_named(const std::string &name, const Datum< InputType > &input)
Execute a named operation from the pool.
std::optional< Datum< OutputType > > execute_chain(const Datum< InputType > &input)
Execute operations in sequence (type-safe chain)
void set_context_configurator(std::function< void(ExecutionContext &, const std::type_index &)> configurator)
Set custom context configurator.
std::shared_ptr< OpClass > get_operation(const std::string &name)
Get a named operation from this matrix.
void with_async(StartType input, ChainFunc &&chain, CompleteFn &&on_complete)
std::future< std::optional< Datum< OutputType > > > execute_async(const Datum< InputType > &input, Args &&... args)
Execute operation asynchronously.
void set_error_callback(std::function< void(const std::exception &, const std::type_index &)> callback)
Set error callback.
std::string get_last_error() const
Get last error message.
void configure_execution_context(ExecutionContext &ctx, const std::type_index &op_type)
Configure execution context based on operation type and policy.
std::vector< std::optional< Datum< OutputType > > > execute_batch_parallel(const std::vector< Datum< InputType > > &inputs, Args &&... args)
Execute operation on multiple inputs in parallel.
std::function< void(const std::exception &, const std::type_index &)> m_error_callback
FluentExecutor< ComputeMatrix, StartType > with(const Datum< StartType > &input)
Create a fluent executor for chaining operations.
std::vector< std::string > list_operations() const
List all operation names in this matrix.
void set_profiling(bool enabled)
Enable/disable execution profiling.
std::vector< std::future< void > > m_async_futures
std::shared_ptr< OpClass > create_operation(const std::string &name, Args &&... args)
Create and add an operation to this matrix.
std::optional< Datum< OutputType > > execute_with(std::shared_ptr< OpClass > operation, const Datum< InputType > &input)
Execute with provided operation instance.
void handle_execution_error(const std::exception &e, const std::type_index &op_type)
Handle execution errors.
void set_default_timeout(std::chrono::milliseconds timeout)
Set default execution timeout.
void set_execution_policy(ExecutionPolicy policy)
Set execution policy for this matrix.
std::optional< Datum< OutputType > > execute(const Datum< InputType > &input, Args &&... args)
Execute an operation by creating a new instance.
void register_async(std::future< void > f)
std::optional< Datum< OutputType > > execute_chain_named(const std::string &first_name, const std::string &second_name, const Datum< InputType > &input)
Execute named operations in sequence.
bool add_operation(const std::string &name, std::shared_ptr< OpClass > operation)
Add a pre-configured operation instance to this matrix.
void clear_operations()
Clear all operations from this matrix.
FluentExecutor< ComputeMatrix, StartType > with(Datum< StartType > &&input)
Create a fluent executor with move semantics.
void with_async(Datum< StartType > input, ChainFunc &&chain, CompleteFn &&on_complete)
Execute an operation chain asynchronously.
std::function< void(ExecutionContext &, const std::type_index &)> m_context_configurator
auto execute_parallel(const Datum< InputType > &input)
Execute multiple operations in parallel.
FluentExecutor< ComputeMatrix, StartType > with(const StartType &input)
std::optional< Datum< OutputType > > execute_operation(std::shared_ptr< OpClass > operation, const Datum< InputType > &input)
Core execution implementation.
void drain_async()
Block until all in-flight async chains complete.
bool remove_operation(const std::string &name)
Remove a named operation from this matrix.
static std::shared_ptr< ComputeMatrix > create()
Create a new ComputeMatrix instance.
Local execution orchestrator for computational operations.
Fluent interface for chaining operations on any executor.
Thread-safe pool for managing named operation instances.
Universal concept for types that can be used as data in compute operations.
Definition DataSpec.hpp:37
ExecutionPolicy
Policy for execution strategy selection.
Input/Output container for computation pipeline data flow with structure preservation.
Definition DataIO.hpp:24
ExecutionMode mode
Execution mode controlling scheduling behavior.
std::chrono::milliseconds timeout
Optional timeout for operation execution.
Context information controlling how a compute operation executes.