MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
MayaFlux::Kriya::BufferPipeline Class Reference

Coroutine-based execution engine for composable, multi-strategy buffer processing. More...

#include <BufferPipeline.hpp>

+ Inheritance diagram for MayaFlux::Kriya::BufferPipeline:
+ Collaboration diagram for MayaFlux::Kriya::BufferPipeline:

Classes

struct  BranchInfo
 

Public Member Functions

 BufferPipeline ()=default
 
 BufferPipeline (Vruta::TaskScheduler &scheduler, std::shared_ptr< Buffers::BufferManager > buffer_manager=nullptr)
 
 ~BufferPipeline ()
 
BufferPipelineoperator>> (BufferOperation &&operation)
 Chain an operation to the pipeline.
 
BufferPipelinebranch_if (std::function< bool(uint32_t)> condition, const std::function< void(BufferPipeline &)> &branch_builder, bool synchronous=false, uint64_t samples_per_operation=1)
 Add conditional branch to the pipeline.
 
BufferPipelineparallel (std::initializer_list< BufferOperation > operations)
 Execute operations in parallel within the current cycle.
 
BufferPipelinewith_lifecycle (std::function< void(uint32_t)> on_cycle_start, std::function< void(uint32_t)> on_cycle_end)
 Set lifecycle callbacks for cycle management.
 
BufferPipelinewith_strategy (ExecutionStrategy strategy)
 Set the execution strategy for this pipeline.
 
BufferPipelinecapture_timing (Vruta::DelayContext mode)
 Set timing mode for capture phase.
 
BufferPipelineprocess_timing (Vruta::DelayContext mode)
 Set timing mode for process phase.
 
ExecutionStrategy get_strategy () const
 Get current execution strategy.
 
void execute_buffer_rate (uint64_t max_cycles=0)
 Execute pipeline synchronized to audio hardware cycle boundaries.
 
void execute_once ()
 Execute the pipeline for a single cycle.
 
void execute_for_cycles (uint64_t cycles=0)
 Execute the pipeline for a specified number of cycles.
 
void execute_continuous ()
 Start continuous execution of the pipeline.
 
void stop_continuous ()
 Stop continuous execution of the pipeline.
 
void execute_scheduled (uint64_t max_cycles=0, uint64_t samples_per_operation=1)
 Execute pipeline with sample-accurate timing between operations.
 
void execute_scheduled_at_rate (uint32_t max_cycles=0, double seconds_per_operation=1)
 Execute pipeline with real-time rate control.
 
void mark_data_consumed (uint32_t operation_index)
 Execute pipeline synchronized to audio hardware cycle boundaries.
 
bool has_pending_data () const
 Check if any operations have pending data ready for processing.
 
uint32_t get_current_cycle () const
 Get the current cycle number.
 

Static Public Member Functions

static std::shared_ptr< BufferPipelinecreate (Vruta::TaskScheduler &scheduler, std::shared_ptr< Buffers::BufferManager > buffer_manager=nullptr)
 

Private Types

enum class  DataState : uint8_t { EMPTY , READY , CONSUMED , EXPIRED }
 

Private Member Functions

void capture_operation (BufferOperation &op, uint64_t cycle)
 
void reset_accumulated_data ()
 
bool has_immediate_routing (const BufferOperation &op) const
 
void process_operation (BufferOperation &op, uint64_t cycle)
 
std::shared_ptr< Vruta::SoundRoutinedispatch_branch_async (BranchInfo &branch, uint64_t cycle)
 
void await_timing (Vruta::DelayContext mode, uint64_t units)
 
void cleanup_expired_data ()
 
void cleanup_completed_branches ()
 
Vruta::SoundRoutine execute_internal (uint64_t max_cycles, uint64_t samples_per_operation)
 
Vruta::SoundRoutine execute_phased (uint64_t max_cycles, uint64_t samples_per_operation)
 
Vruta::SoundRoutine execute_streaming (uint64_t max_cycles, uint64_t samples_per_operation)
 
Vruta::SoundRoutine execute_parallel (uint64_t max_cycles, uint64_t samples_per_operation)
 
Vruta::SoundRoutine execute_reactive (uint64_t max_cycles, uint64_t samples_per_operation)
 
void execute_capture_phase (uint64_t cycle_base)
 
void execute_process_phase (uint64_t cycle)
 

Static Private Member Functions

static Kakshya::DataVariant extract_buffer_data (const std::shared_ptr< Buffers::AudioBuffer > &buffer, bool should_process=false)
 
static void write_to_buffer (const std::shared_ptr< Buffers::AudioBuffer > &buffer, const Kakshya::DataVariant &data)
 
static void write_to_container (const std::shared_ptr< Kakshya::DynamicSoundStream > &container, const Kakshya::DataVariant &data)
 
static Kakshya::DataVariant read_from_container (const std::shared_ptr< Kakshya::DynamicSoundStream > &container, uint64_t start, uint32_t length)
 

Private Attributes

std::shared_ptr< BufferPipelinem_active_self
 
std::shared_ptr< CycleCoordinatorm_coordinator
 
std::shared_ptr< Buffers::BufferManagerm_buffer_manager
 
Vruta::TaskSchedulerm_scheduler = nullptr
 
std::vector< BufferOperationm_operations
 
std::vector< DataStatem_data_states
 
std::unordered_map< BufferOperation *, Kakshya::DataVariantm_operation_data
 
std::vector< BranchInfom_branches
 
std::vector< std::shared_ptr< Vruta::SoundRoutine > > m_branch_tasks
 
std::function< void(uint32_t)> m_cycle_start_callback
 
std::function< void(uint32_t)> m_cycle_end_callback
 
uint64_t m_current_cycle { 0 }
 
uint64_t m_max_cycles { 0 }
 
bool m_continuous_execution { false }
 
ExecutionStrategy m_execution_strategy { ExecutionStrategy::PHASED }
 
Vruta::DelayContext m_capture_timing { Vruta::DelayContext::BUFFER_BASED }
 
Vruta::DelayContext m_process_timing { Vruta::DelayContext::SAMPLE_BASED }
 

Detailed Description

Coroutine-based execution engine for composable, multi-strategy buffer processing.

BufferPipeline provides a flexible framework for orchestrating complex data flow patterns through declarative operation chains. It supports multiple execution strategies (phased, streaming, parallel, reactive), sophisticated data accumulation modes, and sample-accurate timing coordination via the coroutine scheduler.

Core Concepts:

  • Operations: Composable units (capture, transform, route, modify, fuse) chained via >>
  • Execution Strategies: PHASED (capture-then-process), STREAMING (immediate flow-through), PARALLEL (concurrent captures), REACTIVE (data-driven)
  • Capture Modes: TRANSIENT (single), ACCUMULATE (concatenate), CIRCULAR (rolling buffer), WINDOWED (overlapping windows), TRIGGERED (conditional)
  • Timing Control: Buffer-rate synchronization, sample-accurate delays, or immediate execution

Simple Capture & Route:

auto pipeline = BufferPipeline::create(*scheduler, buffer_manager);
*pipeline
pipeline->execute_buffer_rate(100); // Run for 100 audio buffer cycles
static BufferOperation route_to_buffer(std::shared_ptr< Buffers::AudioBuffer > target)
Create a routing operation to AudioBuffer destination.
static CaptureBuilder capture_from(std::shared_ptr< Buffers::AudioBuffer > buffer)
Create a CaptureBuilder for fluent capture configuration.
static std::shared_ptr< BufferPipeline > create(Vruta::TaskScheduler &scheduler, std::shared_ptr< Buffers::BufferManager > buffer_manager=nullptr)
CaptureBuilder & for_cycles(uint32_t count)
Set number of cycles to capture (enables ACCUMULATE mode).
Definition Capture.cpp:107

Accumulation & Batch Processing (PHASED strategy):

auto pipeline = BufferPipeline::create(*scheduler, buffer_manager)
->with_strategy(ExecutionStrategy::PHASED)
*pipeline
.for_cycles(20) // Captures 20 times, concatenates into single buffer
>> BufferOperation::transform([](const auto& data, uint32_t cycle) {
const auto& accumulated = std::get<std::vector<double>>(data);
// Process 20 * buffer_size samples as one batch
return apply_batch_fft(accumulated);
})
pipeline->execute_buffer_rate();
static BufferOperation route_to_container(std::shared_ptr< Kakshya::DynamicSoundStream > target)
Create a routing operation to DynamicSoundStream destination.
static BufferOperation transform(TransformationFunction transformer)
Create a transform operation with custom transformation function.
@ PHASED
PHASED: Traditional phased execution (default)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)

Real-Time Streaming Modification (STREAMING strategy):

auto pipeline = BufferPipeline::create(*scheduler, buffer_manager)
->with_strategy(ExecutionStrategy::STREAMING);
*pipeline
>> BufferOperation::modify_buffer(audio_buffer, [noise](auto buf) {
auto& data = buf->get_data();
for (auto& sample : data) {
sample *= noise->process_sample(); // Apply effect in-place
}
}).as_streaming(); // Processor stays attached across cycles
pipeline->execute_buffer_rate(); // Runs continuously
static BufferOperation modify_buffer(std::shared_ptr< Buffers::AudioBuffer > buffer, Buffers::BufferProcessingFunction modifier)
Create a modify operation for direct buffer manipulation.
@ STREAMING
STREAMING: Immediate flow-through execution.

Circular Buffer for Rolling Analysis:

*pipeline
.for_cycles(100)
.as_circular(2048) // Maintains last 2048 samples
>> BufferOperation::transform([](const auto& data, uint32_t cycle) {
const auto& history = std::get<std::vector<double>>(data);
return analyze_recent_trends(history); // Always sees last 2048 samples
});
CaptureBuilder & as_circular(uint32_t buffer_size)
Enable circular buffer mode with specified size.
Definition Capture.cpp:119

Windowed Capture with Overlap:

*pipeline
.with_window(1024, 0.5f) // 1024 samples, 50% overlap
>> BufferOperation::transform([](const auto& data, uint32_t cycle) {
const auto& window = std::get<std::vector<double>>(data);
return apply_hann_window_and_fft(window);
});
CaptureBuilder & with_window(uint32_t window_size, float overlap_ratio=0.0F)
Configure windowed capture (enables WINDOWED mode).
Definition Capture.cpp:125

Multi-Source Fusion:

*pipeline
{mic_buffer, synth_buffer, file_buffer},
[](std::vector<Kakshya::DataVariant>& sources, uint32_t cycle) {
// Combine three audio sources with custom mixing
return mix_sources(sources, {0.5, 0.3, 0.2});
},
output_buffer);
static BufferOperation fuse_data(std::vector< std::shared_ptr< Buffers::AudioBuffer > > sources, TransformVectorFunction fusion_func, std::shared_ptr< Buffers::AudioBuffer > target)
Create a fusion operation for multiple AudioBuffer sources.

Conditional Branching:

pipeline->branch_if(
[](uint32_t cycle) { return cycle % 16 == 0; }, // Every 16 cycles
[](BufferPipeline& branch) {
branch >> BufferOperation::dispatch_to([](const auto& data, uint32_t cycle) {
save_analysis_snapshot(data, cycle);
});
},
true // Synchronous - wait for branch to complete
);
static BufferOperation dispatch_to(OperationFunction handler)
Create a dispatch operation for external processing.
Coroutine-based execution engine for composable, multi-strategy buffer processing.

Per-Iteration Routing (Immediate Routing):

// ROUTE directly after CAPTURE → routes each iteration immediately
*pipeline
>> BufferOperation::route_to_container(stream); // Writes 20 times (streaming output)
// ROUTE after TRANSFORM → routes accumulated result once
*pipeline
>> BufferOperation::route_to_container(stream); // Writes 1 time (batch output)

Lifecycle Callbacks:

pipeline->with_lifecycle(
[](uint32_t cycle) { std::cout << "Cycle " << cycle << " start\n"; },
[](uint32_t cycle) { std::cout << "Cycle " << cycle << " end\n"; }
);

Execution Modes:

  • execute_buffer_rate(N): Synchronized to audio buffer boundaries for N cycles
  • execute_continuous(): Runs indefinitely until stopped
  • execute_for_cycles(N): Runs exactly N cycles then stops
  • execute_once(): Single cycle execution
  • execute_scheduled(N, samples): With sample-accurate delays between operations

Strategy Selection:

  • PHASED (default): Capture phase completes, then process phase runs
    • Best for: batch analysis, accumulation, FFT processing
  • STREAMING: Operations flow through immediately, minimal latency
    • Best for: real-time effects, low-latency processing, modify_buffer
  • PARALLEL: Multiple captures run concurrently (TODO)
    • Best for: multi-source synchronized capture
  • REACTIVE: Data-driven execution when inputs available (TODO)
    • Best for: event-driven workflows, complex dependencies
See also
BufferOperation For operation types and configuration
BufferCapture For capture modes and data accumulation strategies
CycleCoordinator For multi-pipeline synchronization
Vruta::TaskScheduler For coroutine scheduling and timing
ExecutionStrategy For execution coordination patterns

Definition at line 168 of file BufferPipeline.hpp.


The documentation for this class was generated from the following files: