MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferPipeline.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "BufferOperation.hpp"
5
6namespace MayaFlux::Kriya {
7
8class CycleCoordinator;
9
10/**
11 * @class BufferPipeline
12 * @brief Coroutine-based execution engine for composable, multi-strategy buffer processing.
13 *
14 * BufferPipeline provides a flexible framework for orchestrating complex data flow patterns
15 * through declarative operation chains. It supports multiple execution strategies (phased,
16 * streaming, parallel, reactive), sophisticated data accumulation modes, and sample-accurate
17 * timing coordination via the coroutine scheduler.
18 *
19 * **Core Concepts:**
20 * - **Operations**: Composable units (capture, transform, route, modify, fuse) chained via >>
21 * - **Execution Strategies**: PHASED (capture-then-process), STREAMING (immediate flow-through),
22 * PARALLEL (concurrent captures), REACTIVE (data-driven)
23 * - **Capture Modes**: TRANSIENT (single), ACCUMULATE (concatenate), CIRCULAR (rolling buffer),
24 * WINDOWED (overlapping windows), TRIGGERED (conditional)
25 * - **Timing Control**: Buffer-rate synchronization, sample-accurate delays, or immediate execution
26 *
27 * **Simple Capture & Route:**
28 * ```cpp
29 * auto pipeline = BufferPipeline::create(*scheduler, buffer_manager);
30 * *pipeline
31 * >> BufferOperation::capture_from(input_buffer)
32 * .for_cycles(1)
33 * >> BufferOperation::route_to_buffer(output_buffer);
34 *
35 * pipeline->execute_buffer_rate(100); // Run for 100 audio buffer cycles
36 * ```
37 *
38 * **Accumulation & Batch Processing (PHASED strategy):**
39 * ```cpp
40 * auto pipeline = BufferPipeline::create(*scheduler, buffer_manager)
41 * ->with_strategy(ExecutionStrategy::PHASED)
42 * ->capture_timing(Vruta::DelayContext::BUFFER_BASED);
43 *
44 * *pipeline
45 * >> BufferOperation::capture_from(audio_buffer)
46 * .for_cycles(20) // Captures 20 times, concatenates into single buffer
47 * >> BufferOperation::transform([](const auto& data, uint32_t cycle) {
48 * const auto& accumulated = std::get<std::vector<double>>(data);
49 * // Process 20 * buffer_size samples as one batch
50 * return apply_batch_fft(accumulated);
51 * })
52 * >> BufferOperation::route_to_container(output_stream);
53 *
54 * pipeline->execute_buffer_rate();
55 * ```
56 *
57 * **Real-Time Streaming Modification (STREAMING strategy):**
58 * ```cpp
59 * auto pipeline = BufferPipeline::create(*scheduler, buffer_manager)
60 * ->with_strategy(ExecutionStrategy::STREAMING);
61 *
62 * *pipeline
63 * >> BufferOperation::capture_from(audio_buffer).for_cycles(1)
64 * >> BufferOperation::modify_buffer(audio_buffer, [noise](auto buf) {
65 * auto& data = buf->get_data();
66 * for (auto& sample : data) {
67 * sample *= noise->process_sample(); // Apply effect in-place
68 * }
69 * }).as_streaming(); // Processor stays attached across cycles
70 *
71 * pipeline->execute_buffer_rate(); // Runs continuously
72 * ```
73 *
74 * **Circular Buffer for Rolling Analysis:**
75 * ```cpp
76 * *pipeline
77 * >> BufferOperation::capture_from(input_buffer)
78 * .for_cycles(100)
79 * .as_circular(2048) // Maintains last 2048 samples
80 * >> BufferOperation::transform([](const auto& data, uint32_t cycle) {
81 * const auto& history = std::get<std::vector<double>>(data);
82 * return analyze_recent_trends(history); // Always sees last 2048 samples
83 * });
84 * ```
85 *
86 * **Windowed Capture with Overlap:**
87 * ```cpp
88 * *pipeline
89 * >> BufferOperation::capture_from(input_buffer)
90 * .for_cycles(50)
91 * .with_window(1024, 0.5f) // 1024 samples, 50% overlap
92 * >> BufferOperation::transform([](const auto& data, uint32_t cycle) {
93 * const auto& window = std::get<std::vector<double>>(data);
94 * return apply_hann_window_and_fft(window);
95 * });
96 * ```
97 *
98 * **Multi-Source Fusion:**
99 * ```cpp
100 * *pipeline
101 * >> BufferOperation::fuse_data(
102 * {mic_buffer, synth_buffer, file_buffer},
103 * [](std::vector<Kakshya::DataVariant>& sources, uint32_t cycle) {
104 * // Combine three audio sources with custom mixing
105 * return mix_sources(sources, {0.5, 0.3, 0.2});
106 * },
107 * output_buffer);
108 * ```
109 *
110 * **Conditional Branching:**
111 * ```cpp
112 * pipeline->branch_if(
113 * [](uint32_t cycle) { return cycle % 16 == 0; }, // Every 16 cycles
114 * [](BufferPipeline& branch) {
115 * branch >> BufferOperation::dispatch_to([](const auto& data, uint32_t cycle) {
116 * save_analysis_snapshot(data, cycle);
117 * });
118 * },
119 * true // Synchronous - wait for branch to complete
120 * );
121 * ```
122 *
123 * **Per-Iteration Routing (Immediate Routing):**
124 * ```cpp
125 * // ROUTE directly after CAPTURE → routes each iteration immediately
126 * *pipeline
127 * >> BufferOperation::capture_from(input_buffer).for_cycles(20)
128 * >> BufferOperation::route_to_container(stream); // Writes 20 times (streaming output)
129 *
130 * // ROUTE after TRANSFORM → routes accumulated result once
131 * *pipeline
132 * >> BufferOperation::capture_from(input_buffer).for_cycles(20)
133 * >> BufferOperation::transform(process_fn)
134 * >> BufferOperation::route_to_container(stream); // Writes 1 time (batch output)
135 * ```
136 *
137 * **Lifecycle Callbacks:**
138 * ```cpp
139 * pipeline->with_lifecycle(
140 * [](uint32_t cycle) { std::cout << "Cycle " << cycle << " start\n"; },
141 * [](uint32_t cycle) { std::cout << "Cycle " << cycle << " end\n"; }
142 * );
143 * ```
144 *
145 * **Execution Modes:**
146 * - `execute_buffer_rate(N)`: Synchronized to audio buffer boundaries for N cycles
147 * - `execute_continuous()`: Runs indefinitely until stopped
148 * - `execute_for_cycles(N)`: Runs exactly N cycles then stops
149 * - `execute_once()`: Single cycle execution
150 * - `execute_scheduled(N, samples)`: With sample-accurate delays between operations
151 *
152 * **Strategy Selection:**
153 * - **PHASED** (default): Capture phase completes, then process phase runs
154 * - Best for: batch analysis, accumulation, FFT processing
155 * - **STREAMING**: Operations flow through immediately, minimal latency
156 * - Best for: real-time effects, low-latency processing, modify_buffer
157 * - **PARALLEL**: Multiple captures run concurrently (TODO)
158 * - Best for: multi-source synchronized capture
159 * - **REACTIVE**: Data-driven execution when inputs available (TODO)
160 * - Best for: event-driven workflows, complex dependencies
161 *
162 * @see BufferOperation For operation types and configuration
163 * @see BufferCapture For capture modes and data accumulation strategies
164 * @see CycleCoordinator For multi-pipeline synchronization
165 * @see Vruta::TaskScheduler For coroutine scheduling and timing
166 * @see ExecutionStrategy For execution coordination patterns
167 */
168class MAYAFLUX_API BufferPipeline : public std::enable_shared_from_this<BufferPipeline> {
169public:
170 static std::shared_ptr<BufferPipeline> create(Vruta::TaskScheduler& scheduler, std::shared_ptr<Buffers::BufferManager> buffer_manager = nullptr)
171 {
172 return std::make_shared<BufferPipeline>(scheduler, std::move(buffer_manager));
173 }
174
175 BufferPipeline() = default;
176 explicit BufferPipeline(Vruta::TaskScheduler& scheduler, std::shared_ptr<Buffers::BufferManager> buffer_manager = nullptr);
177
179
180 /**
181 * @brief Chain an operation to the pipeline.
182 * @param operation BufferOperation to add to the processing chain
183 * @return Reference to this pipeline for continued chaining
184 */
186 {
187 m_operations.emplace_back(std::move(operation));
188 return *this;
189 }
190
191 /**
192 * @brief Add conditional branch to the pipeline.
193 * @param condition Function that determines if branch should execute
194 * @param branch_builder Function that configures the branch pipeline
195 * @param synchronous If true, branch executes within main cycle (default: false)
196 * @param samples_per_operation Number of samples per operation in branch (default: 1)
197 * @return Reference to this pipeline for continued chaining
198 */
199 BufferPipeline& branch_if(std::function<bool(uint32_t)> condition,
200 const std::function<void(BufferPipeline&)>& branch_builder,
201 bool synchronous = false, uint64_t samples_per_operation = 1);
202
203 /**
204 * @brief Execute operations in parallel within the current cycle.
205 * @param operations List of operations to execute concurrently
206 * @return Reference to this pipeline for continued chaining
207 */
208 BufferPipeline& parallel(std::initializer_list<BufferOperation> operations);
209
210 /**
211 * @brief Set lifecycle callbacks for cycle management.
212 * @param on_cycle_start Function called at the beginning of each cycle
213 * @param on_cycle_end Function called at the end of each cycle
214 * @return Reference to this pipeline for continued chaining
215 */
216 BufferPipeline& with_lifecycle(
217 std::function<void(uint32_t)> on_cycle_start,
218 std::function<void(uint32_t)> on_cycle_end);
219
220 /**
221 * @brief Set the execution strategy for this pipeline
222 * @param strategy Execution coordination strategy
223 * @return Reference to this pipeline for chaining
224 */
226 {
227 m_execution_strategy = strategy;
228 return *this;
229 }
230
231 /**
232 * @brief Set timing mode for capture phase
233 * @param mode Timing synchronization mode
234 * @return Reference to this pipeline for chaining
235 */
237 {
238 m_capture_timing = mode;
239 return *this;
240 }
241
242 /**
243 * @brief Set timing mode for process phase
244 * @param mode Timing synchronization mode
245 * @return Reference to this pipeline for chaining
246 */
248 {
249 m_process_timing = mode;
250 return *this;
251 }
252
253 /**
254 * @brief Get current execution strategy
255 */
256 inline ExecutionStrategy get_strategy() const { return m_execution_strategy; }
257
258 // Update execute_buffer_rate to use new infrastructure:
259 /**
260 * @brief Execute pipeline synchronized to audio hardware cycle boundaries
261 *
262 * This now respects the configured execution strategy and timing modes.
263 * Default strategy is PHASED with BUFFER_RATE timing.
264 *
265 * @param max_cycles Maximum number of audio cycles to process (0 = infinite)
266 */
267 void execute_buffer_rate(uint64_t max_cycles = 0);
268
269 /**
270 * @brief Execute the pipeline for a single cycle.
271 *
272 * Schedules the pipeline to run once through all operations. The execution
273 * happens asynchronously via the scheduler's coroutine system. The pipeline
274 * must have a scheduler and will be kept alive via shared_ptr until execution
275 * completes.
276 *
277 * @throws std::runtime_error if pipeline has no scheduler
278 *
279 * @note This is asynchronous - the function returns immediately and execution
280 * happens when the scheduler processes the coroutine.
281 */
282 void execute_once();
283
284 /**
285 * @brief Execute the pipeline for a specified number of cycles.
286 * @param cycles Number of processing cycles to execute
287 *
288 * Schedules the pipeline to run for the given number of cycles. Each cycle
289 * processes all operations once. Execution is asynchronous via the scheduler.
290 * The pipeline remains alive until all cycles complete.
291 *
292 * @throws std::runtime_error if pipeline has no scheduler
293 *
294 * @note Execution happens asynchronously - this function returns immediately.
295 */
296 void execute_for_cycles(uint64_t cycles = 0);
297
298 /**
299 * @brief Start continuous execution of the pipeline.
300 *
301 * Schedules the pipeline to run indefinitely until stop_continuous() is called.
302 * The pipeline processes all operations in each cycle and automatically
303 * advances to the next cycle. The pipeline keeps itself alive via shared_ptr
304 * during continuous execution.
305 *
306 * @throws std::runtime_error if pipeline has no scheduler
307 *
308 * @note Execution is asynchronous. Call stop_continuous() to halt execution.
309 * @see stop_continuous()
310 */
311 void execute_continuous();
312
313 /**
314 * @brief Stop continuous execution of the pipeline.
315 *
316 * Signals the pipeline to stop after completing the current cycle. The
317 * pipeline will gracefully terminate its execution loop. Has no effect
318 * if the pipeline is not currently executing continuously.
319 *
320 * @note This only sets a flag - actual termination happens at the end of
321 * the current cycle.
322 */
323 inline void stop_continuous() { m_continuous_execution = false; }
324
325 /**
326 * @brief Execute pipeline with sample-accurate timing between operations.
327 * @param max_cycles Maximum number of cycles to execute (0 = infinite)
328 * @param samples_per_operation Number of samples to wait between operations (default: 1)
329 *
330 * Schedules pipeline execution with precise timing control. After each operation
331 * within a cycle, the pipeline waits for the specified number of samples before
332 * proceeding. Useful for rate-limiting operations or creating timed sequences.
333 *
334 * @throws std::runtime_error if pipeline has no scheduler
335 *
336 * @note Execution is asynchronous. The pipeline keeps itself alive until completion.
337 */
338 void execute_scheduled(uint64_t max_cycles = 0, uint64_t samples_per_operation = 1);
339
340 /**
341 * @brief Execute pipeline with real-time rate control.
342 * @param max_cycles Maximum number of cycles to execute
343 * @param seconds_per_operation Duration between operations in seconds
344 *
345 * Convenience wrapper around execute_scheduled() that converts time intervals
346 * to sample counts based on the scheduler's sample rate. Enables natural
347 * specification of timing in seconds rather than samples.
348 *
349 * @throws std::runtime_error if pipeline has no scheduler
350 *
351 * Example:
352 * ```cpp
353 * // Execute 10 cycles with 0.5 seconds between operations
354 * pipeline->execute_scheduled_at_rate(10, 0.5);
355 * ```
356 */
357 void execute_scheduled_at_rate(uint32_t max_cycles = 0, double seconds_per_operation = 1);
358
359 /**
360 * @brief Execute pipeline synchronized to audio hardware cycle boundaries
361 *
362 * Creates a coroutine and registers it with the scheduler using the processing hook
363 * for timing synchronization. The pipeline will be added to the scheduler at the
364 * start of each audio cycle, ensuring buffer operations complete before the buffer
365 * manager reads data.
366 *
367 * @param max_cycles Maximum number of audio cycles to process (0 = infinite)
368 */
369 // void execute_buffer_rate(uint32_t max_cycles = 0);
370
371 // State management
372
373 /**
374 * @brief Mark operation data as consumed for cleanup.
375 * @param operation_index Index of the operation in the pipeline
376 *
377 * Manually marks an operation's data state as consumed, allowing it to be
378 * cleaned up in the next cleanup cycle. Useful for manual data lifecycle
379 * management in custom processing scenarios.
380 *
381 * @note Out-of-range indices are silently ignored.
382 */
383 void mark_data_consumed(uint32_t operation_index);
384
385 /**
386 * @brief Check if any operations have pending data ready for processing.
387 * @return true if any operation has data in READY state, false otherwise
388 *
389 * Queries the data states of all operations to determine if there is
390 * unprocessed data available. Useful for synchronization and debugging.
391 */
392 bool has_pending_data() const;
393
394 /**
395 * @brief Get the current cycle number.
396 * @return Current cycle count since pipeline creation or last reset
397 *
398 * Returns the internal cycle counter that increments with each complete
399 * cycle execution. Useful for cycle-dependent logic and debugging.
400 */
401 uint32_t get_current_cycle() const { return m_current_cycle; }
402
403private:
404 enum class DataState : uint8_t {
405 EMPTY, ///< No data available
406 READY, ///< Data ready for processing
407 CONSUMED, ///< Data has been processed
408 EXPIRED ///< Data has expired and should be cleaned up
409 };
410
411 struct BranchInfo {
412 std::function<bool(uint32_t)> condition;
413 std::shared_ptr<BufferPipeline> pipeline;
416 };
417
418 std::shared_ptr<BufferPipeline> m_active_self;
419 std::shared_ptr<CycleCoordinator> m_coordinator;
420 std::shared_ptr<Buffers::BufferManager> m_buffer_manager;
421 Vruta::TaskScheduler* m_scheduler = nullptr;
422
423 std::vector<BufferOperation> m_operations;
424 std::vector<DataState> m_data_states;
425 std::unordered_map<BufferOperation*, Kakshya::DataVariant> m_operation_data;
426 std::vector<BranchInfo> m_branches;
427 std::vector<std::shared_ptr<Vruta::SoundRoutine>> m_branch_tasks;
428 std::function<void(uint32_t)> m_cycle_start_callback;
429 std::function<void(uint32_t)> m_cycle_end_callback;
430
431 uint64_t m_current_cycle { 0 };
432 uint64_t m_max_cycles { 0 };
433 bool m_continuous_execution { false };
434 ExecutionStrategy m_execution_strategy { ExecutionStrategy::PHASED };
435 Vruta::DelayContext m_capture_timing { Vruta::DelayContext::BUFFER_BASED };
436 Vruta::DelayContext m_process_timing { Vruta::DelayContext::SAMPLE_BASED };
437
438 static Kakshya::DataVariant extract_buffer_data(const std::shared_ptr<Buffers::AudioBuffer>& buffer, bool should_process = false);
439 static void write_to_buffer(const std::shared_ptr<Buffers::AudioBuffer>& buffer, const Kakshya::DataVariant& data);
440 static void write_to_container(const std::shared_ptr<Kakshya::DynamicSoundStream>& container, const Kakshya::DataVariant& data);
441 static Kakshya::DataVariant read_from_container(const std::shared_ptr<Kakshya::DynamicSoundStream>& container, uint64_t start, uint32_t length);
442
443 void capture_operation(BufferOperation& op, uint64_t cycle);
444 void reset_accumulated_data();
445 bool has_immediate_routing(const BufferOperation& op) const;
446
447 void process_operation(BufferOperation& op, uint64_t cycle);
448 std::shared_ptr<Vruta::SoundRoutine> dispatch_branch_async(BranchInfo& branch, uint64_t cycle);
449 void await_timing(Vruta::DelayContext mode, uint64_t units);
450
451 void cleanup_expired_data();
452 void cleanup_completed_branches();
453
454 Vruta::SoundRoutine execute_internal(uint64_t max_cycles, uint64_t samples_per_operation);
455 Vruta::SoundRoutine execute_phased(uint64_t max_cycles, uint64_t samples_per_operation);
456 Vruta::SoundRoutine execute_streaming(uint64_t max_cycles, uint64_t samples_per_operation);
457 Vruta::SoundRoutine execute_parallel(uint64_t max_cycles, uint64_t samples_per_operation);
458 Vruta::SoundRoutine execute_reactive(uint64_t max_cycles, uint64_t samples_per_operation);
459
460 void execute_capture_phase(uint64_t cycle_base);
461 void execute_process_phase(uint64_t cycle);
462};
463
464inline std::shared_ptr<BufferPipeline> operator>>(
465 std::shared_ptr<BufferPipeline> pipeline,
466 BufferOperation&& operation)
467{
468 *pipeline >> std::move(operation);
469 return pipeline;
470}
471
472inline std::shared_ptr<BufferPipeline> operator>>(
473 std::shared_ptr<BufferPipeline> pipeline,
474 BufferOperation& operation) // <-- Accepts lvalue
475{
476 *pipeline >> std::move(operation);
477 return pipeline;
478}
479
480}
Fundamental unit of operation in buffer processing pipelines.
std::unordered_map< BufferOperation *, Kakshya::DataVariant > m_operation_data
BufferPipeline & process_timing(Vruta::DelayContext mode)
Set timing mode for process phase.
std::shared_ptr< CycleCoordinator > m_coordinator
BufferPipeline & capture_timing(Vruta::DelayContext mode)
Set timing mode for capture phase.
BufferPipeline & operator>>(BufferOperation &&operation)
Chain an operation to the pipeline.
void stop_continuous()
Stop continuous execution of the pipeline.
BufferPipeline & with_strategy(ExecutionStrategy strategy)
Set the execution strategy for this pipeline.
void await_timing(Vruta::DelayContext mode, uint64_t units)
uint32_t get_current_cycle() const
Get the current cycle number.
std::vector< BranchInfo > m_branches
std::shared_ptr< BufferPipeline > m_active_self
std::vector< BufferOperation > m_operations
std::vector< std::shared_ptr< Vruta::SoundRoutine > > m_branch_tasks
std::vector< DataState > m_data_states
std::function< void(uint32_t)> m_cycle_end_callback
static std::shared_ptr< BufferPipeline > create(Vruta::TaskScheduler &scheduler, std::shared_ptr< Buffers::BufferManager > buffer_manager=nullptr)
std::function< void(uint32_t)> m_cycle_start_callback
void execute_capture_phase(uint64_t cycle_base)
std::shared_ptr< Buffers::BufferManager > m_buffer_manager
void execute_process_phase(uint64_t cycle)
ExecutionStrategy get_strategy() const
Get current execution strategy.
Coroutine-based execution engine for composable, multi-strategy buffer processing.
A C++20 coroutine-based audio processing task with sample-accurate timing.
Definition Routine.hpp:309
Token-based multimodal task scheduling system for unified coroutine processing.
Definition Scheduler.hpp:51
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
Definition NDData.hpp:73
ExecutionStrategy
Defines how operations in a pipeline are coordinated and executed.
std::shared_ptr< BufferPipeline > operator>>(std::shared_ptr< BufferPipeline > pipeline, BufferOperation &&operation)
DelayContext
Discriminator for different temporal delay mechanisms.
std::shared_ptr< BufferPipeline > pipeline
std::function< bool(uint32_t)> condition