MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
ComputeMatrix.hpp
Go to the documentation of this file.
1#pragma once
2
6
8
9namespace MayaFlux::Yantra {
10
11/**
12 * @enum ExecutionPolicy
13 * @brief Policy for execution strategy selection
14 */
15enum class ExecutionPolicy : uint8_t {
16 CONSERVATIVE, // Prefer safety and predictability
17 BALANCED, // Balance between performance and safety
18 AGGRESSIVE // Maximize performance
19};
20
21/**
22 * @class ComputeMatrix
23 * @brief Local execution orchestrator for computational operations
24 *
25 * ComputeMatrix provides a self-contained execution environment for operations.
26 * It maintains its own operation instances and execution strategies without
27 * relying on any global registries. Each matrix instance is independent.
28 *
29 * Key Design Principles:
30 * - Instance-local operation management
31 * - Focus on execution patterns and strategies
32 * - Clean separation from registration concerns
33 *
34 * Core Responsibilities:
35 * - Execute operations with various strategies (sync, async, parallel, chain)
36 * - Manage instance-local named operations
37 * - Provide fluent interface through FluentExecutor
38 * - Configure execution contexts for optimization
39 */
40class MAYAFLUX_API ComputeMatrix : public std::enable_shared_from_this<ComputeMatrix> {
41public:
42 /**
43 * @brief Create a new ComputeMatrix instance
44 */
45 static std::shared_ptr<ComputeMatrix> create()
46 {
47 return std::make_shared<ComputeMatrix>();
48 }
49
50 /**
51 * @brief Add a pre-configured operation instance to this matrix
52 * @tparam OpClass Operation class type
53 * @param name Unique name within this matrix
54 * @param operation Shared pointer to the operation
55 * @return true if added successfully
56 */
57 template <typename OpClass>
58 bool add_operation(const std::string& name, std::shared_ptr<OpClass> operation)
59 {
60 if (!operation)
61 return false;
62 return m_operations.add(name, operation);
63 }
64
65 /**
66 * @brief Create and add an operation to this matrix
67 * @tparam OpClass Operation class type
68 * @tparam Args Constructor argument types
69 * @param name Unique name within this matrix
70 * @param args Constructor arguments
71 * @return Shared pointer to the created operation
72 */
73 template <typename OpClass, typename... Args>
74 std::shared_ptr<OpClass> create_operation(const std::string& name, Args&&... args)
75 {
76 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
77 if (m_operations.add(name, operation)) {
78 return operation;
79 }
80 return nullptr;
81 }
82
83 /**
84 * @brief Get a named operation from this matrix
85 * @tparam OpClass Expected operation type
86 * @param name Operation name
87 * @return Shared pointer to operation or nullptr
88 */
89 template <typename OpClass>
90 std::shared_ptr<OpClass> get_operation(const std::string& name)
91 {
92 return m_operations.get<OpClass>(name);
93 }
94
95 /**
96 * @brief Remove a named operation from this matrix
97 * @param name Operation name to remove
98 * @return true if removed successfully
99 */
100 bool remove_operation(const std::string& name)
101 {
102 return m_operations.remove(name);
103 }
104
105 /**
106 * @brief List all operation names in this matrix
107 * @return Vector of operation names
108 */
109 std::vector<std::string> list_operations() const
110 {
111 return m_operations.list_names();
112 }
113
114 /**
115 * @brief Clear all operations from this matrix
116 */
118 {
119 m_operations.clear();
120 }
121
122 /**
123 * @brief Execute an operation by creating a new instance
124 * @tparam OpClass Operation class to instantiate and execute
125 * @tparam InputType Input data type
126 * @tparam OutputType Output data type
127 * @param input Input data
128 * @param args Constructor arguments for the operation
129 * @return Optional containing result or nullopt on failure
130 */
131 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType, typename... Args>
132 std::optional<IO<OutputType>> execute(const InputType& input, Args&&... args)
133 {
134 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
135 return execute_operation<OpClass, InputType, OutputType>(operation, input);
136 }
137
138 /**
139 * @brief Execute a named operation from the pool
140 * @tparam OpClass Operation class type
141 * @tparam InputType Input data type
142 * @tparam OutputType Output data type
143 * @param name Name of the operation
144 * @param input Input data
145 * @return Optional containing result or nullopt on failure
146 */
147 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
148 std::optional<IO<OutputType>> execute_named(const std::string& name, const InputType& input)
149 {
150 auto operation = m_operations.get<OpClass>(name);
151 if (!operation)
152 return std::nullopt;
153 return execute_operation<OpClass, InputType, OutputType>(operation, input);
154 }
155
156 /**
157 * @brief Execute with provided operation instance
158 * @tparam OpClass Operation class type
159 * @tparam InputType Input data type
160 * @tparam OutputType Output data type
161 * @param operation Operation instance to execute
162 * @param input Input data
163 * @return Optional containing result or nullopt on failure
164 */
165 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
166 std::optional<IO<OutputType>> execute_with(std::shared_ptr<OpClass> operation, const InputType& input)
167 {
168 return execute_operation<OpClass, InputType, OutputType>(operation, input);
169 }
170
171 /**
172 * @brief Execute operation asynchronously
173 * @tparam OpClass Operation class type
174 * @tparam InputType Input data type
175 * @tparam OutputType Output data type
176 * @param input Input data
177 * @param args Constructor arguments
178 * @return Future containing the result
179 */
180 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType, typename... Args>
181 std::future<std::optional<IO<OutputType>>> execute_async(const InputType& input, Args&&... args)
182 {
183 return std::async(std::launch::async, [this, input, args...]() {
184 return execute<OpClass, InputType, OutputType>(input, args...);
185 });
186 }
187
188 /**
189 * @brief Execute named operation asynchronously
190 */
191 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
192 std::future<std::optional<IO<OutputType>>> execute_named_async(const std::string& name, const InputType& input)
193 {
194 return std::async(std::launch::async, [this, name, input]() {
195 return execute_named<OpClass, InputType, OutputType>(name, input);
196 });
197 }
198
199 /**
200 * @brief Execute multiple operations in parallel
201 * @tparam InputType Input data type
202 * @tparam OpClasses Operation classes to execute
203 * @param input Input data
204 * @return Tuple of results from each operation
205 */
206 template <ComputeData InputType, typename... OpClasses>
207 auto execute_parallel(const InputType& input)
208 {
209 return std::make_tuple(
210 execute_async<OpClasses, InputType>(input).get()...);
211 }
212
213 /**
214 * @brief Execute multiple named operations in parallel
215 * @tparam OpClass Base operation class
216 * @tparam InputType Input data type
217 * @tparam OutputType Output data type
218 * @param names Vector of operation names
219 * @param input Input data
220 * @return Vector of results
221 */
222 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType>
223 std::vector<std::optional<IO<OutputType>>> execute_parallel_named(
224 const std::vector<std::string>& names,
225 const InputType& input)
226 {
227
228 std::vector<std::future<std::optional<IO<OutputType>>>> futures;
229 futures.reserve(names.size());
230
231 for (const auto& name : names) {
232 futures.push_back(execute_named_async<OpClass, InputType, OutputType>(name, input));
233 }
234
235 std::vector<std::optional<IO<OutputType>>> results;
236 results.reserve(futures.size());
237
238 for (auto& future : futures) {
239 results.push_back(future.get());
240 }
241
242 return results;
243 }
244
245 /**
246 * @brief Execute operations in sequence (type-safe chain)
247 * @tparam FirstOp First operation type
248 * @tparam SecondOp Second operation type
249 * @tparam InputType Initial input type
250 * @tparam IntermediateType Type between operations
251 * @tparam OutputType Final output type
252 * @param input Initial input
253 * @return Optional containing final result
254 */
255 template <typename FirstOp, typename SecondOp,
256 ComputeData InputType,
257 ComputeData IntermediateType,
258 ComputeData OutputType>
259 std::optional<IO<OutputType>> execute_chain(const InputType& input)
260 {
261 auto first_result = execute<FirstOp, InputType, IntermediateType>(input);
262 if (!first_result)
263 return std::nullopt;
264
265 return execute<SecondOp, IntermediateType, OutputType>(first_result->data);
266 }
267
268 /**
269 * @brief Execute named operations in sequence
270 */
271 template <typename FirstOp, typename SecondOp,
272 ComputeData InputType,
273 ComputeData IntermediateType,
274 ComputeData OutputType>
275 std::optional<IO<OutputType>> execute_chain_named(
276 const std::string& first_name,
277 const std::string& second_name,
278 const InputType& input)
279 {
280
281 auto first_result = execute_named<FirstOp, InputType, IntermediateType>(first_name, input);
282 if (!first_result)
283 return std::nullopt;
284
285 return execute_named<SecondOp, IntermediateType, OutputType>(second_name, first_result->data);
286 }
287
288 /**
289 * @brief Execute operation on multiple inputs
290 * @tparam OpClass Operation class
291 * @tparam InputType Input data type
292 * @tparam OutputType Output data type
293 * @param inputs Vector of inputs
294 * @param args Constructor arguments for operation
295 * @return Vector of results
296 */
297 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType, typename... Args>
298 std::vector<std::optional<IO<OutputType>>> execute_batch(
299 const std::vector<InputType>& inputs,
300 Args&&... args)
301 {
302
303 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
304
305 std::vector<std::optional<IO<OutputType>>> results;
306 results.reserve(inputs.size());
307
308 for (const auto& input : inputs) {
309 results.push_back(execute_operation<OpClass, InputType, OutputType>(operation, input));
310 }
311
312 return results;
313 }
314
315 /**
316 * @brief Execute operation on multiple inputs in parallel
317 */
318 template <typename OpClass, ComputeData InputType, ComputeData OutputType = InputType, typename... Args>
319 std::vector<std::optional<IO<OutputType>>> execute_batch_parallel(
320 const std::vector<InputType>& inputs,
321 Args&&... args)
322 {
323
324 auto operation = std::make_shared<OpClass>(std::forward<Args>(args)...);
325
326 std::vector<std::optional<IO<OutputType>>> results(inputs.size());
327
328 MayaFlux::Parallel::transform(MayaFlux::Parallel::par_unseq,
329 inputs.begin(), inputs.end(),
330 results.begin(),
331 [this, operation](const InputType& input) {
332 return execute_operation<OpClass, InputType, OutputType>(operation, input);
333 });
334
335 return results;
336 }
337
338 /**
339 * @brief Create a fluent executor for chaining operations
340 * @tparam StartType Initial data type
341 * @param input Initial input data
342 * @return FluentExecutor for this matrix
343 */
344 template <ComputeData StartType>
346 {
347 return FluentExecutor<ComputeMatrix, StartType>(shared_from_this(), input);
348 }
349
350 /**
351 * @brief Create a fluent executor with move semantics
352 */
353 template <ComputeData StartType>
355 {
356 return FluentExecutor<ComputeMatrix, StartType>(shared_from_this(), std::forward<StartType>(input));
357 }
358
359 /**
360 * @brief Set execution policy for this matrix
361 * @param policy Execution policy to use
362 */
364 {
365 m_execution_policy = policy;
366 }
367
368 /**
369 * @brief Get current execution policy
370 */
372 {
373 return m_execution_policy;
374 }
375
376 /**
377 * @brief Enable/disable execution profiling
378 */
379 void set_profiling(bool enabled)
380 {
381 m_profiling_enabled = enabled;
382 }
383
384 /**
385 * @brief Get execution statistics
386 */
387 std::unordered_map<std::string, std::any> get_statistics() const
388 {
389 auto stats = m_operations.get_statistics();
390 stats["total_executions"] = m_total_executions.load();
391 stats["failed_executions"] = m_failed_executions.load();
392 if (m_profiling_enabled) {
393 stats["average_execution_time_ms"] = m_average_execution_time.load();
394 }
395 return stats;
396 }
397
398 /**
399 * @brief Set custom context configurator
400 * @param configurator Function to configure execution contexts
401 */
403 std::function<void(ExecutionContext&, const std::type_index&)> configurator)
404 {
405 m_context_configurator = std::move(configurator);
406 }
407
408 /**
409 * @brief Set default execution timeout
410 * @param timeout Timeout duration
411 */
412 void set_default_timeout(std::chrono::milliseconds timeout)
413 {
414 m_default_timeout = timeout;
415 }
416
417 ComputeMatrix() = default;
418
419private:
420 /**
421 * @brief Core execution implementation
422 */
423 template <typename OpClass, ComputeData InputType, ComputeData OutputType>
424 std::optional<IO<OutputType>> execute_operation(
425 std::shared_ptr<OpClass> operation,
426 const InputType& input)
427 {
428 if (!operation)
429 return std::nullopt;
430
431 m_total_executions++;
432
433 try {
434 IO<InputType> input_wrapper(input);
435
437 configure_execution_context(ctx, std::type_index(typeid(OpClass)));
438
439 auto start = std::chrono::steady_clock::now();
440
441 auto result = operation->apply_operation_internal(input_wrapper, ctx);
442
443 if (m_profiling_enabled) {
444 auto end = std::chrono::steady_clock::now();
445 auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
446 update_execution_time(duration.count());
447 }
448
449 return result;
450
451 } catch (const std::exception& e) {
452 m_failed_executions++;
453 handle_execution_error(e, std::type_index(typeid(OpClass)));
454 return std::nullopt;
455 }
456 }
457
458 /**
459 * @brief Configure execution context based on operation type and policy
460 */
461 void configure_execution_context(ExecutionContext& ctx, const std::type_index& op_type)
462 {
463 switch (m_execution_policy) {
464 case ExecutionPolicy::CONSERVATIVE:
465 case ExecutionPolicy::BALANCED:
466 ctx.mode = ExecutionMode::SYNC;
467 break;
468 case ExecutionPolicy::AGGRESSIVE:
469 ctx.mode = ExecutionMode::PARALLEL;
470 break;
471 }
472
473 ctx.timeout = m_default_timeout;
474
475 if (m_context_configurator) {
476 m_context_configurator(ctx, op_type);
477 }
478
479 // TODO: Add thread pool assignment, GPU context, etc.
480 }
481
482 /**
483 * @brief Handle execution errors
484 */
485 void handle_execution_error(const std::exception& e, const std::type_index& op_type)
486 {
487 m_last_error = e.what();
488 m_last_error_type = op_type;
489
490 if (m_error_callback) {
491 m_error_callback(e, op_type);
492 }
493 }
494
495 /**
496 * @brief Update execution time statistics
497 */
498 void update_execution_time(double ms)
499 {
500 double current_avg = m_average_execution_time.load();
501 double total_execs = m_total_executions.load();
502 double new_avg = (current_avg * (total_execs - 1) + ms) / total_execs;
503 m_average_execution_time.store(new_avg);
504 }
505
507
508 ExecutionPolicy m_execution_policy = ExecutionPolicy::BALANCED;
509 std::chrono::milliseconds m_default_timeout { 0 };
510 std::function<void(ExecutionContext&, const std::type_index&)> m_context_configurator;
511
512 std::atomic<size_t> m_total_executions { 0 };
513 std::atomic<size_t> m_failed_executions { 0 };
514 std::atomic<double> m_average_execution_time { 0.0 };
515 bool m_profiling_enabled = false;
516
517 std::string m_last_error;
518 std::type_index m_last_error_type { typeid(void) };
519 std::function<void(const std::exception&, const std::type_index&)> m_error_callback;
520
521public:
522 /**
523 * @brief Set error callback
524 */
526 std::function<void(const std::exception&, const std::type_index&)> callback)
527 {
528 m_error_callback = std::move(callback);
529 }
530
531 /**
532 * @brief Get last error message
533 */
534 std::string get_last_error() const
535 {
536 return m_last_error;
537 }
538};
539
540} // namespace MayaFlux::Yantra
std::unordered_map< std::string, std::any > get_statistics() const
Get execution statistics.
ExecutionPolicy get_execution_policy() const
Get current execution policy.
std::future< std::optional< IO< OutputType > > > execute_async(const InputType &input, Args &&... args)
Execute operation asynchronously.
void update_execution_time(double ms)
Update execution time statistics.
FluentExecutor< ComputeMatrix, StartType > with(StartType &&input)
Create a fluent executor with move semantics.
std::vector< std::optional< IO< OutputType > > > execute_batch(const std::vector< InputType > &inputs, Args &&... args)
Execute operation on multiple inputs.
std::optional< IO< OutputType > > execute_operation(std::shared_ptr< OpClass > operation, const InputType &input)
Core execution implementation.
std::optional< IO< OutputType > > execute(const InputType &input, Args &&... args)
Execute an operation by creating a new instance.
void set_context_configurator(std::function< void(ExecutionContext &, const std::type_index &)> configurator)
Set custom context configurator.
std::shared_ptr< OpClass > get_operation(const std::string &name)
Get a named operation from this matrix.
void set_error_callback(std::function< void(const std::exception &, const std::type_index &)> callback)
Set error callback.
std::string get_last_error() const
Get last error message.
void configure_execution_context(ExecutionContext &ctx, const std::type_index &op_type)
Configure execution context based on operation type and policy.
std::function< void(const std::exception &, const std::type_index &)> m_error_callback
std::vector< std::string > list_operations() const
List all operation names in this matrix.
void set_profiling(bool enabled)
Enable/disable execution profiling.
std::shared_ptr< OpClass > create_operation(const std::string &name, Args &&... args)
Create and add an operation to this matrix.
void handle_execution_error(const std::exception &e, const std::type_index &op_type)
Handle execution errors.
void set_default_timeout(std::chrono::milliseconds timeout)
Set default execution timeout.
std::vector< std::optional< IO< OutputType > > > execute_parallel_named(const std::vector< std::string > &names, const InputType &input)
Execute multiple named operations in parallel.
void set_execution_policy(ExecutionPolicy policy)
Set execution policy for this matrix.
std::future< std::optional< IO< OutputType > > > execute_named_async(const std::string &name, const InputType &input)
Execute named operation asynchronously.
std::optional< IO< OutputType > > execute_with(std::shared_ptr< OpClass > operation, const InputType &input)
Execute with provided operation instance.
bool add_operation(const std::string &name, std::shared_ptr< OpClass > operation)
Add a pre-configured operation instance to this matrix.
void clear_operations()
Clear all operations from this matrix.
std::optional< IO< OutputType > > execute_chain_named(const std::string &first_name, const std::string &second_name, const InputType &input)
Execute named operations in sequence.
std::vector< std::optional< IO< OutputType > > > execute_batch_parallel(const std::vector< InputType > &inputs, Args &&... args)
Execute operation on multiple inputs in parallel.
std::function< void(ExecutionContext &, const std::type_index &)> m_context_configurator
FluentExecutor< ComputeMatrix, StartType > with(const StartType &input)
Create a fluent executor for chaining operations.
std::optional< IO< OutputType > > execute_chain(const InputType &input)
Execute operations in sequence (type-safe chain)
auto execute_parallel(const InputType &input)
Execute multiple operations in parallel.
std::optional< IO< OutputType > > execute_named(const std::string &name, const InputType &input)
Execute a named operation from the pool.
bool remove_operation(const std::string &name)
Remove a named operation from this matrix.
static std::shared_ptr< ComputeMatrix > create()
Create a new ComputeMatrix instance.
Local execution orchestrator for computational operations.
Fluent interface for chaining operations on any executor.
Thread-safe pool for managing named operation instances.
Universal concept for types that can be used as data in compute operations.
Definition DataSpec.hpp:37
ExecutionPolicy
Policy for execution strategy selection.
std::chrono::milliseconds timeout
Context information for operation execution.
Input/Output container for computation pipeline data flow with structure preservation.
Definition DataIO.hpp:24