MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
OperationChain.hpp
Go to the documentation of this file.
1#pragma once
2
4
5namespace MayaFlux::Yantra {
6
7/**
8 * @class FluentExecutor
9 * @brief Fluent interface for chaining operations on any executor
10 *
11 * Provides a composable, type-safe way to chain operations together.
12 * This class is executor-agnostic and can work with any executor that
13 * exposes execute<OpClass, InputType, OutputType>(Datum<InputType>) and
14 * execute_named<OpClass, InputType, OutputType>(name, Datum<InputType>).
15 *
16 * Internal storage is always Datum<DataType>, preserving container
17 * references and structural metadata across every link in the chain.
18 * Entry points (constructors, make_fluent) accept raw DataType as a
19 * convenience and wrap it immediately.
20 *
21 * Key Features:
22 * - Type-safe operation chaining with compile-time verification
23 * - Support for both type-based and named operations
24 * - Container and metadata preservation through the full chain
25 * - Custom function application within the chain
26 * - Multiple terminal operations for different use cases
27 * - Error accumulation and reporting
28 *
29 * Usage:
30 * ```cpp
31 * auto result = matrix->with(input_datum)
32 * .then<SegmentOp>("segment")
33 * .then<AttributeOp>("attribute")
34 * .then<SortOp>("sort")
35 * .get();
36 * ```
37 */
38template <typename Executor, ComputeData DataType>
39class MAYAFLUX_API FluentExecutor {
40public:
41 using executor_type = Executor;
42 using data_type = DataType;
44
45 // ------------------------------------------------------------------
46 // Constructors — Datum overloads are primary; raw overloads wrap
47 // ------------------------------------------------------------------
48
49 /**
50 * @brief Construct with executor and Datum input
51 * @param executor Shared pointer to the executor instance
52 * @param input Datum carrying data, container, and structural metadata
53 */
54 FluentExecutor(std::shared_ptr<Executor> executor, const Datum<DataType>& input)
55 : m_executor(std::move(executor))
56 , m_data(input)
57 , m_successful(true)
58 {
59 if (!m_executor) {
60 error<std::invalid_argument>(
61 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
62 std::source_location::current(),
63 "FluentExecutor requires non-null executor");
64 }
65 }
66
67 /**
68 * @brief Construct with executor and Datum input (move)
69 * @param executor Shared pointer to the executor instance
70 * @param input Datum to move into the chain
71 */
72 FluentExecutor(std::shared_ptr<Executor> executor, Datum<DataType>&& input)
73 : m_executor(std::move(executor))
74 , m_data(std::move(input))
75 , m_successful(true)
76 {
77 if (!m_executor) {
78 error<std::invalid_argument>(
79 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
80 std::source_location::current(),
81 "FluentExecutor requires non-null executor");
82 }
83 }
84
85 /**
86 * @brief Construct with executor and raw data (convenience entry point)
87 *
88 * Wraps data in a Datum immediately. No container is attached; use the
89 * Datum overload when a container must be preserved through the chain.
90 *
91 * @param executor Shared pointer to the executor instance
92 * @param input Raw data to wrap and process
93 */
94 FluentExecutor(std::shared_ptr<Executor> executor, const DataType& input)
95 : FluentExecutor(std::move(executor), Datum<DataType>(input))
96 {
97 }
98
99 /**
100 * @brief Construct with executor and raw data (move, convenience entry point)
101 *
102 * Wraps data in a Datum immediately. No container is attached; use the
103 * Datum overload when a container must be preserved through the chain.
104 *
105 * @param executor Shared pointer to the executor instance
106 * @param input Raw data to move and wrap
107 */
108 FluentExecutor(std::shared_ptr<Executor> executor, DataType&& input)
109 : FluentExecutor(std::move(executor), Datum<DataType>(std::move(input)))
110 {
111 }
112
113 // ------------------------------------------------------------------
114 // Chain operations
115 // ------------------------------------------------------------------
116
117 /**
118 * @brief Chain an anonymous operation instance by type
119 *
120 * Constructs a new instance of OpClass and executes it on the current
121 * Datum. The full Datum (data + container + metadata) is passed through.
122 *
123 * @tparam OpClass Operation class to instantiate and execute
124 * @tparam OutputType Expected output type (defaults to current DataType)
125 * @return New FluentExecutor carrying the result Datum
126 */
127 template <typename OpClass, ComputeData OutputType = DataType>
129 {
130 if (!m_successful) {
131 error<std::runtime_error>(
132 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
133 std::source_location::current(),
134 "Cannot continue chain after failed operation");
135 }
136
137 try {
138 auto result = m_executor->template execute<OpClass, DataType, OutputType>(m_data);
139 if (!result) {
140 m_successful = false;
141 record_error("Operation " + std::string(typeid(OpClass).name()) + " failed");
142 error<std::runtime_error>(
143 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
144 std::source_location::current(),
145 "Operation failed in fluent chain: {}", std::string(typeid(OpClass).name()));
146 }
147
148 auto next = FluentExecutor<Executor, OutputType>(m_executor, std::move(*result));
149 next.m_operation_history = m_operation_history;
150 next.m_operation_history.push_back(typeid(OpClass).name());
151 return next;
152 } catch (const std::exception& e) {
153 m_successful = false;
154 record_error(e.what());
155 error_rethrow(
156 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
157 std::source_location::current(),
158 "Exception in fluent chain: " + std::string(e.what()));
159 }
160 }
161
162 /**
163 * @brief Chain a named operation fetched from the executor's pool
164 *
165 * Retrieves the pre-configured operation instance registered under @p name
166 * and executes it on the current Datum. The full Datum is passed through,
167 * preserving container and metadata.
168 *
169 * @tparam OpClass Operation class type
170 * @tparam OutputType Expected output type (defaults to current DataType)
171 * @param name Name of the operation in the pool
172 * @return New FluentExecutor carrying the result Datum
173 */
174 template <typename OpClass, ComputeData OutputType = DataType>
176 {
177 if (!m_successful) {
178 error<std::runtime_error>(
179 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
180 std::source_location::current(),
181 "Cannot continue chain after failed operation: {}", name);
182 }
183
184 try {
185 auto result = m_executor->template execute_named<OpClass, DataType, OutputType>(name, m_data);
186 if (!result) {
187 m_successful = false;
188 record_error("Named operation '" + name + "' failed");
189 error<std::runtime_error>(
190 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
191 std::source_location::current(),
192 "Named operation failed in fluent chain: {}", name);
193 }
194
195 auto next = FluentExecutor<Executor, OutputType>(m_executor, std::move(*result));
196 next.m_operation_history = m_operation_history;
197 next.m_operation_history.push_back(name);
198 return next;
199 } catch (const std::exception& e) {
200 m_successful = false;
201 record_error(e.what());
202 error_rethrow(
203 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
204 std::source_location::current(),
205 "Exception in named operation '{}': {}", name, e.what());
206 }
207 }
208
209 /**
210 * @brief Apply a custom transformation function within the chain
211 *
212 * @p func receives the raw @c DataType (not the Datum) and returns a
213 * value of any ComputeData type. The result is immediately wrapped in a
214 * fresh Datum. Container from the current Datum is not carried forward
215 * since the function may change the data type entirely; attach a container
216 * explicitly via tap() before apply() if it must survive.
217 *
218 * @tparam Func Callable type: (const DataType&) -> U
219 * @param func Transformation to apply
220 * @return New FluentExecutor whose DataType is the return type of @p func
221 */
222 template <typename Func>
223 requires std::invocable<Func, const DataType&>
224 auto apply(Func&& func)
225 {
226 if (!m_successful) {
227 error<std::runtime_error>(
228 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
229 std::source_location::current(),
230 "Cannot continue chain after failed operation in apply");
231 }
232
233 using ResultType = std::invoke_result_t<Func, const DataType&>;
234
235 try {
236 auto result = std::forward<Func>(func)(m_data.data);
238 m_executor, Datum<ResultType>(std::move(result)));
239 next.m_operation_history = m_operation_history;
240 next.m_operation_history.push_back("custom_function");
241 return next;
242 } catch (const std::exception& e) {
243 m_successful = false;
244 record_error(std::string("Custom function failed: ") + e.what());
245 error_rethrow(
246 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
247 std::source_location::current(),
248 "Exception in custom function: " + std::string(e.what()));
249 }
250 }
251
252 /**
253 * @brief Apply a side-effect function without changing data or type
254 *
255 * @p func receives a mutable reference to the raw @c DataType. Useful
256 * for logging, validation, or in-place mutation that does not change type.
257 * Does not affect the Datum wrapper or container.
258 *
259 * @tparam Func Callable type: (DataType&) -> void
260 * @param func Function to apply for side effects
261 * @return Reference to this executor for continued chaining
262 */
263 template <typename Func>
264 requires std::invocable<Func, DataType&>
265 FluentExecutor& tap(Func&& func)
266 {
267 if (!m_successful) {
268 error<std::runtime_error>(
269 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
270 std::source_location::current(),
271 "Cannot continue chain after failed operation in tap");
272 }
273
274 try {
275 std::forward<Func>(func)(m_data.data);
276 m_operation_history.emplace_back("tap");
277 return *this;
278 } catch (const std::exception& e) {
279 m_successful = false;
280 record_error(std::string("Tap function failed: ") + e.what());
281 error_rethrow(
282 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
283 std::source_location::current(),
284 "Exception in tap function: " + std::string(e.what()));
285 }
286 }
287
288 // ------------------------------------------------------------------
289 // Conditional execution
290 // ------------------------------------------------------------------
291
292 /**
293 * @brief Execute an operation conditionally on a boolean flag
294 * @tparam OpClass Operation to execute if @p condition is true
295 * @param condition If true, executes the operation; otherwise passes through
296 * @return Reference to this executor for continued chaining
297 */
298 template <typename OpClass>
299 FluentExecutor& when(bool condition)
300 {
301 if (condition && m_successful)
302 return then<OpClass>();
303 return *this;
304 }
305
306 /**
307 * @brief Execute an operation conditionally on a predicate over the raw data
308 * @tparam OpClass Operation to execute if @p predicate returns true
309 * @tparam Pred Callable type: (const DataType&) -> bool
310 * @param predicate Function evaluated against the current raw data
311 * @return Reference to this executor for continued chaining
312 */
313 template <typename OpClass, typename Pred>
314 requires std::predicate<Pred, const DataType&>
315 FluentExecutor& when(Pred&& predicate)
316 {
317 if (m_successful && std::forward<Pred>(predicate)(m_data.data))
318 return then<OpClass>();
319 return *this;
320 }
321
322 // ------------------------------------------------------------------
323 // Parallel fork
324 // ------------------------------------------------------------------
325
326 /**
327 * @brief Fork execution into multiple independent parallel paths
328 *
329 * Executes each operation in @p OpClasses against the current Datum
330 * concurrently and returns a tuple of results. Each result is an
331 * @c std::optional<Datum<DataType>>.
332 *
333 * @tparam OpClasses Operation classes to execute in parallel
334 * @return std::tuple of optional Datum results, one per OpClass
335 */
336 template <typename... OpClasses>
337 auto fork()
338 {
339 if (!m_successful) {
340 error<std::runtime_error>(
341 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
342 std::source_location::current(),
343 "Cannot fork after failed operation");
344 }
345
346 return std::make_tuple(
347 m_executor->template execute<OpClasses, DataType>(m_data)...);
348 }
349
350 // ------------------------------------------------------------------
351 // Terminal accessors
352 // ------------------------------------------------------------------
353
354 /**
355 * @brief Get the final raw result by const reference
356 * @return Const reference to the processed DataType
357 */
358 [[nodiscard]] const DataType& get() const
359 {
360 if (!m_successful) {
361 error<std::runtime_error>(
362 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
363 std::source_location::current(),
364 "Cannot get result from failed chain");
365 }
366 return m_data.data;
367 }
368
369 /**
370 * @brief Get a mutable reference to the final raw result
371 * @return Mutable reference to the processed DataType
372 */
373 DataType& get_mutable()
374 {
375 if (!m_successful) {
376 error<std::runtime_error>(
377 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
378 std::source_location::current(),
379 "Cannot get mutable result from failed chain");
380 }
381 return m_data.data;
382 }
383
384 /**
385 * @brief Move the final raw result out of the chain
386 * @return Moved DataType
387 */
388 DataType consume() &&
389 {
390 if (!m_successful) {
391 error<std::runtime_error>(
392 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
393 std::source_location::current(),
394 "Cannot consume result from failed chain");
395 }
396 return std::move(m_data.data);
397 }
398
399 /**
400 * @brief Get the full result Datum by const reference
401 *
402 * Prefer this over get() when the container or metadata must be
403 * inspected or passed to a subsequent non-chain call.
404 *
405 * @return Const reference to the result Datum
406 */
407 [[nodiscard]] const Datum<DataType>& get_datum() const
408 {
409 if (!m_successful) {
410 error<std::runtime_error>(
411 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
412 std::source_location::current(),
413 "Cannot get datum from failed chain");
414 }
415 return m_data;
416 }
417
418 /**
419 * @brief Move the full result Datum out of the chain
420 * @return Moved Datum including container and metadata
421 */
423 {
424 if (!m_successful) {
425 error<std::runtime_error>(
426 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
427 std::source_location::current(),
428 "Cannot consume datum from failed chain");
429 }
430 return std::move(m_data);
431 }
432
433 /**
434 * @brief Return the result Datum with execution history appended to metadata
435 *
436 * Equivalent to get_datum() but stamps execution_history, successful,
437 * and errors into the Datum metadata before returning.
438 *
439 * @return Datum copy with execution metadata attached
440 */
441 [[nodiscard]] Datum<DataType> to_io() const
442 {
443 Datum<DataType> result = m_data;
444 result.metadata["execution_history"] = m_operation_history;
445 result.metadata["successful"] = m_successful;
446 if (!m_errors.empty())
447 result.metadata["errors"] = m_errors;
448 return result;
449 }
450
451 /**
452 * @brief Return the raw result or a fallback value if the chain failed
453 * @param default_value Value returned when the chain is in a failed state
454 * @return Processed DataType or @p default_value
455 */
456 [[nodiscard]] DataType get_or(const DataType& default_value) const
457 {
458 return m_successful ? m_data.data : default_value;
459 }
460
461 /**
462 * @brief Return the raw result or invoke a generator if the chain failed
463 * @tparam Generator Callable type: () -> DataType
464 * @param generator Invoked to produce a fallback when the chain failed
465 * @return Processed DataType or result of @p generator
466 */
467 template <typename Generator>
468 requires std::invocable<Generator>
469 [[nodiscard]] DataType get_or_else(Generator&& generator) const
470 {
471 return m_successful ? m_data.data : std::forward<Generator>(generator)();
472 }
473
474 // ------------------------------------------------------------------
475 // Reset
476 // ------------------------------------------------------------------
477
478 /**
479 * @brief Reset the chain with a new Datum, clearing history and errors
480 *
481 * Primary reset overload. Replaces internal state with @p new_datum,
482 * restoring the chain to a successful state ready for reuse.
483 *
484 * @param new_datum Datum to restart the chain from
485 * @return Reference to this executor for continued chaining
486 */
488 {
489 m_data = new_datum;
490 m_successful = true;
491 m_operation_history.clear();
492 m_errors.clear();
493 return *this;
494 }
495
496 /**
497 * @brief Reset the chain with raw data (convenience overload)
498 *
499 * Wraps @p new_data in a Datum and delegates to the primary reset.
500 * No container is attached; use the Datum overload if a container
501 * must be present from the start of the next chain.
502 *
503 * @param new_data Raw data to wrap and restart the chain from
504 * @return Reference to this executor for continued chaining
505 */
506 FluentExecutor& reset(const DataType& new_data)
507 {
508 return reset(Datum<DataType>(new_data));
509 }
510
511 // ------------------------------------------------------------------
512 // Accessors
513 // ------------------------------------------------------------------
514
515 /**
516 * @brief Check whether all operations in the chain have succeeded
517 * @return true if no operation has failed
518 */
519 [[nodiscard]] bool is_successful() const { return m_successful; }
520
521 /**
522 * @brief Get the ordered list of operation names executed so far
523 * @return Const reference to the operation history vector
524 */
525 [[nodiscard]] const std::vector<std::string>& get_history() const { return m_operation_history; }
526
527 /**
528 * @brief Get all errors accumulated during the chain
529 * @return Const reference to the error message vector
530 */
531 [[nodiscard]] const std::vector<std::string>& get_errors() const { return m_errors; }
532
533 /**
534 * @brief Get the underlying executor
535 * @return Shared pointer to the executor instance
536 */
537 [[nodiscard]] std::shared_ptr<Executor> get_executor() const { return m_executor; }
538
539private:
540 std::shared_ptr<Executor> m_executor;
543 std::vector<std::string> m_operation_history;
544 std::vector<std::string> m_errors;
545
546 template <typename E, ComputeData U>
547 friend class FluentExecutor;
548
549 void record_error(const std::string& err) { m_errors.push_back(err); }
550};
551
552// ------------------------------------------------------------------
553// make_fluent — free function entry points
554// ------------------------------------------------------------------
555
556/**
557 * @brief Construct a FluentExecutor from a Datum with type deduction
558 * @tparam Executor Executor type
559 * @tparam DataType Inner data type (deduced from Datum)
560 * @param executor Shared pointer to the executor
561 * @param datum Datum to start the chain from
562 * @return FluentExecutor<Executor, DataType>
563 */
564template <typename Executor, ComputeData DataType>
565auto make_fluent(std::shared_ptr<Executor> executor, Datum<DataType>&& datum)
566{
568 std::move(executor),
569 std::forward<Datum<DataType>>(datum));
570}
571
572/**
573 * @brief Construct a FluentExecutor from raw data with type deduction (convenience entry point)
574 *
575 * Wraps @p data in a Datum immediately. No container is attached. Use the
576 * Datum overload when a container must survive through the chain.
577 *
578 * @tparam Executor Executor type
579 * @tparam DataType Data type (deduced)
580 * @param executor Shared pointer to the executor
581 * @param data Raw data to wrap and start the chain from
582 * @return FluentExecutor<Executor, decay_t<DataType>>
583 */
584template <typename Executor, ComputeData DataType>
585auto make_fluent(std::shared_ptr<Executor> executor, DataType&& data)
586{
588 std::move(executor),
589 Datum<std::decay_t<DataType>>(std::forward<DataType>(data)));
590}
591
592} // namespace MayaFlux::Yantra
auto fork()
Fork execution into multiple independent parallel paths.
FluentExecutor(std::shared_ptr< Executor > executor, DataType &&input)
Construct with executor and raw data (move, convenience entry point)
FluentExecutor & when(Pred &&predicate)
Execute an operation conditionally on a predicate over the raw data.
FluentExecutor & reset(const DataType &new_data)
Reset the chain with raw data (convenience overload)
const std::vector< std::string > & get_history() const
Get the ordered list of operation names executed so far.
Datum< DataType > consume_datum() &&
Move the full result Datum out of the chain.
void record_error(const std::string &err)
FluentExecutor & tap(Func &&func)
Apply a side-effect function without changing data or type.
std::shared_ptr< Executor > get_executor() const
Get the underlying executor.
DataType get_or(const DataType &default_value) const
Return the raw result or a fallback value if the chain failed.
auto apply(Func &&func)
Apply a custom transformation function within the chain.
DataType consume() &&
Move the final raw result out of the chain.
FluentExecutor(std::shared_ptr< Executor > executor, const Datum< DataType > &input)
Construct with executor and Datum input.
FluentExecutor & when(bool condition)
Execute an operation conditionally on a boolean flag.
FluentExecutor(std::shared_ptr< Executor > executor, Datum< DataType > &&input)
Construct with executor and Datum input (move)
std::shared_ptr< Executor > m_executor
const std::vector< std::string > & get_errors() const
Get all errors accumulated during the chain.
bool is_successful() const
Check whether all operations in the chain have succeeded.
DataType get_or_else(Generator &&generator) const
Return the raw result or invoke a generator if the chain failed.
const Datum< DataType > & get_datum() const
Get the full result Datum by const reference.
FluentExecutor< Executor, OutputType > then(const std::string &name)
Chain a named operation fetched from the executor's pool.
std::vector< std::string > m_errors
FluentExecutor(std::shared_ptr< Executor > executor, const DataType &input)
Construct with executor and raw data (convenience entry point)
FluentExecutor & reset(const Datum< DataType > &new_datum)
Reset the chain with a new Datum, clearing history and errors.
DataType & get_mutable()
Get a mutable reference to the final raw result.
FluentExecutor< Executor, OutputType > then()
Chain an anonymous operation instance by type.
const DataType & get() const
Get the final raw result by const reference.
Datum< DataType > to_io() const
Return the result Datum with execution history appended to metadata.
std::vector< std::string > m_operation_history
Fluent interface for chaining operations on any executor.
auto make_fluent(std::shared_ptr< Executor > executor, Datum< DataType > &&datum)
Construct a FluentExecutor from a Datum with type deduction.
std::unordered_map< std::string, std::any > metadata
Associated metadata.
Definition DataIO.hpp:28