38template <
typename Executor, ComputeData DataType>
55 : m_executor(
std::move(executor))
60 error<std::invalid_argument>(
61 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
62 std::source_location::current(),
63 "FluentExecutor requires non-null executor");
73 : m_executor(
std::move(executor))
74 , m_data(
std::move(input))
78 error<std::invalid_argument>(
79 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
80 std::source_location::current(),
81 "FluentExecutor requires non-null executor");
127 template <
typename OpClass, ComputeData OutputType = DataType>
131 error<std::runtime_error>(
132 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
133 std::source_location::current(),
134 "Cannot continue chain after failed operation");
138 auto result = m_executor->template execute<OpClass, DataType, OutputType>(m_data);
140 m_successful =
false;
141 record_error(
"Operation " + std::string(
typeid(OpClass).name()) +
" failed");
142 error<std::runtime_error>(
143 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
144 std::source_location::current(),
145 "Operation failed in fluent chain: {}", std::string(
typeid(OpClass).name()));
149 next.m_operation_history = m_operation_history;
150 next.m_operation_history.push_back(
typeid(OpClass).name());
152 }
catch (
const std::exception& e) {
153 m_successful =
false;
154 record_error(e.what());
156 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
157 std::source_location::current(),
158 "Exception in fluent chain: " + std::string(e.what()));
174 template <
typename OpClass, ComputeData OutputType = DataType>
178 error<std::runtime_error>(
179 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
180 std::source_location::current(),
181 "Cannot continue chain after failed operation: {}", name);
185 auto result = m_executor->template execute_named<OpClass, DataType, OutputType>(name, m_data);
187 m_successful =
false;
188 record_error(
"Named operation '" + name +
"' failed");
189 error<std::runtime_error>(
190 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
191 std::source_location::current(),
192 "Named operation failed in fluent chain: {}", name);
196 next.m_operation_history = m_operation_history;
197 next.m_operation_history.push_back(name);
199 }
catch (
const std::exception& e) {
200 m_successful =
false;
201 record_error(e.what());
203 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
204 std::source_location::current(),
205 "Exception in named operation '{}': {}", name, e.what());
222 template <
typename Func>
223 requires std::invocable<Func, const DataType&>
227 error<std::runtime_error>(
228 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
229 std::source_location::current(),
230 "Cannot continue chain after failed operation in apply");
233 using ResultType = std::invoke_result_t<Func, const DataType&>;
236 auto result = std::forward<Func>(func)(m_data.data);
239 next.m_operation_history = m_operation_history;
240 next.m_operation_history.push_back(
"custom_function");
242 }
catch (
const std::exception& e) {
243 m_successful =
false;
244 record_error(std::string(
"Custom function failed: ") + e.what());
246 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
247 std::source_location::current(),
248 "Exception in custom function: " + std::string(e.what()));
263 template <
typename Func>
264 requires std::invocable<Func, DataType&>
268 error<std::runtime_error>(
269 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
270 std::source_location::current(),
271 "Cannot continue chain after failed operation in tap");
275 std::forward<Func>(func)(m_data.data);
276 m_operation_history.emplace_back(
"tap");
278 }
catch (
const std::exception& e) {
279 m_successful =
false;
280 record_error(std::string(
"Tap function failed: ") + e.what());
282 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
283 std::source_location::current(),
284 "Exception in tap function: " + std::string(e.what()));
298 template <
typename OpClass>
301 if (condition && m_successful)
302 return then<OpClass>();
313 template <
typename OpClass,
typename Pred>
314 requires std::predicate<Pred, const DataType&>
317 if (m_successful && std::forward<Pred>(predicate)(m_data.data))
318 return then<OpClass>();
336 template <
typename... OpClasses>
340 error<std::runtime_error>(
341 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
342 std::source_location::current(),
343 "Cannot fork after failed operation");
346 return std::make_tuple(
347 m_executor->template execute<OpClasses, DataType>(m_data)...);
358 [[nodiscard]]
const DataType&
get()
const
361 error<std::runtime_error>(
362 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
363 std::source_location::current(),
364 "Cannot get result from failed chain");
376 error<std::runtime_error>(
377 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
378 std::source_location::current(),
379 "Cannot get mutable result from failed chain");
391 error<std::runtime_error>(
392 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
393 std::source_location::current(),
394 "Cannot consume result from failed chain");
396 return std::move(m_data.data);
410 error<std::runtime_error>(
411 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
412 std::source_location::current(),
413 "Cannot get datum from failed chain");
425 error<std::runtime_error>(
426 Journal::Component::Yantra, Journal::Context::ComputeMatrix,
427 std::source_location::current(),
428 "Cannot consume datum from failed chain");
430 return std::move(m_data);
444 result.
metadata[
"execution_history"] = m_operation_history;
445 result.
metadata[
"successful"] = m_successful;
446 if (!m_errors.empty())
447 result.
metadata[
"errors"] = m_errors;
456 [[nodiscard]] DataType
get_or(
const DataType& default_value)
const
458 return m_successful ? m_data.data : default_value;
467 template <
typename Generator>
468 requires std::invocable<Generator>
471 return m_successful ? m_data.data : std::forward<Generator>(generator)();
491 m_operation_history.clear();
525 [[nodiscard]]
const std::vector<std::string>&
get_history()
const {
return m_operation_history; }
531 [[nodiscard]]
const std::vector<std::string>&
get_errors()
const {
return m_errors; }
537 [[nodiscard]] std::shared_ptr<Executor>
get_executor()
const {
return m_executor; }
546 template <
typename E, ComputeData U>
564template <
typename Executor, ComputeData DataType>
569 std::forward<Datum<DataType>>(datum));
584template <
typename Executor, ComputeData DataType>
585auto make_fluent(std::shared_ptr<Executor> executor, DataType&& data)
auto fork()
Fork execution into multiple independent parallel paths.
FluentExecutor(std::shared_ptr< Executor > executor, DataType &&input)
Construct with executor and raw data (move, convenience entry point)
FluentExecutor & when(Pred &&predicate)
Execute an operation conditionally on a predicate over the raw data.
FluentExecutor & reset(const DataType &new_data)
Reset the chain with raw data (convenience overload)
const std::vector< std::string > & get_history() const
Get the ordered list of operation names executed so far.
Datum< DataType > consume_datum() &&
Move the full result Datum out of the chain.
void record_error(const std::string &err)
FluentExecutor & tap(Func &&func)
Apply a side-effect function without changing data or type.
std::shared_ptr< Executor > get_executor() const
Get the underlying executor.
DataType get_or(const DataType &default_value) const
Return the raw result or a fallback value if the chain failed.
auto apply(Func &&func)
Apply a custom transformation function within the chain.
DataType consume() &&
Move the final raw result out of the chain.
FluentExecutor(std::shared_ptr< Executor > executor, const Datum< DataType > &input)
Construct with executor and Datum input.
FluentExecutor & when(bool condition)
Execute an operation conditionally on a boolean flag.
FluentExecutor(std::shared_ptr< Executor > executor, Datum< DataType > &&input)
Construct with executor and Datum input (move)
std::shared_ptr< Executor > m_executor
const std::vector< std::string > & get_errors() const
Get all errors accumulated during the chain.
bool is_successful() const
Check whether all operations in the chain have succeeded.
DataType get_or_else(Generator &&generator) const
Return the raw result or invoke a generator if the chain failed.
const Datum< DataType > & get_datum() const
Get the full result Datum by const reference.
FluentExecutor< Executor, OutputType > then(const std::string &name)
Chain a named operation fetched from the executor's pool.
std::vector< std::string > m_errors
FluentExecutor(std::shared_ptr< Executor > executor, const DataType &input)
Construct with executor and raw data (convenience entry point)
FluentExecutor & reset(const Datum< DataType > &new_datum)
Reset the chain with a new Datum, clearing history and errors.
DataType & get_mutable()
Get a mutable reference to the final raw result.
FluentExecutor< Executor, OutputType > then()
Chain an anonymous operation instance by type.
const DataType & get() const
Get the final raw result by const reference.
Datum< DataType > to_io() const
Return the result Datum with execution history appended to metadata.
std::vector< std::string > m_operation_history
Fluent interface for chaining operations on any executor.
auto make_fluent(std::shared_ptr< Executor > executor, Datum< DataType > &&datum)
Construct a FluentExecutor from a Datum with type deduction.
std::unordered_map< std::string, std::any > metadata
Associated metadata.