16 , m_buffer_manager(
std::move(buffer_manager))
17 , m_scheduler(&scheduler)
27 op.m_attached_processor,
35 std::function<
bool(uint32_t)> condition,
38 uint64_t samples_per_operation)
40 auto branch_pipeline = std::make_shared<BufferPipeline>();
44 branch_builder(*branch_pipeline);
47 std::move(branch_pipeline),
49 samples_per_operation });
56 for (
auto& op : operations) {
65 std::function<
void(uint32_t)> on_cycle_start,
66 std::function<
void(uint32_t)> on_cycle_end)
78 std::source_location::current(),
79 "Pipeline requires scheduler for execution");
82 auto self = shared_from_this();
84 if (max_cycles == 0) {
85 max_cycles = UINT64_MAX;
90 auto routine = std::make_shared<Vruta::SoundRoutine>(
103 std::source_location::current(),
104 "Pipeline requires scheduler for execution");
106 auto self = shared_from_this();
109 auto routine = std::make_shared<Vruta::SoundRoutine>(
120 std::source_location::current(),
121 "Pipeline requires scheduler for execution");
124 auto self = shared_from_this();
131 auto routine = std::make_shared<Vruta::SoundRoutine>(
145 uint64_t samples_per_operation)
150 std::source_location::current(),
151 "Pipeline must have scheduler for scheduled execution");
154 auto self = shared_from_this();
156 if (max_cycles == 0) {
157 max_cycles = UINT64_MAX;
163 auto routine = std::make_shared<Vruta::SoundRoutine>(
173 double seconds_per_operation)
178 std::source_location::current(),
179 "Pipeline must have scheduler for scheduled execution");
201 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
203 if (should_process) {
204 audio_buffer->process_default();
206 const auto& data_span = audio_buffer->get_data();
207 std::vector<double> data_vector(data_span.begin(), data_span.end());
211 return std::vector<double> {};
216 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
219 auto audio_data = std::get<std::vector<double>>(data);
220 auto& buffer_data = audio_buffer->get_data();
222 if (buffer_data.size() != audio_data.size()) {
223 buffer_data.resize(audio_data.size());
226 std::ranges::copy(audio_data, buffer_data.begin());
228 }
catch (
const std::bad_variant_access& e) {
231 std::source_location::current(),
232 "Data type mismatch when writing to audio buffer: {}",
243 auto audio_data = std::get<std::vector<double>>(data);
244 std::span<const double> data_span(audio_data.data(), audio_data.size());
246 container->write_frames(data_span, 0);
248 }
catch (
const std::bad_variant_access& e) {
251 std::source_location::current(),
252 "Data type mismatch when writing to container: {}",
254 }
catch (
const std::exception& e) {
257 std::source_location::current(),
258 "Error writing to container: {}",
268 uint32_t read_length = length;
269 if (read_length == 0) {
270 read_length =
static_cast<uint32_t
>(container->get_total_elements() / container->get_num_channels());
273 std::vector<double> output_data(
static_cast<size_t>(read_length * container->get_num_channels()));
274 std::span<double> output_span(output_data.data(), output_data.size());
276 uint64_t frames_read = container->read_frames(output_span, read_length);
278 if (frames_read < output_data.size()) {
279 output_data.resize(frames_read);
284 }
catch (
const std::exception& e) {
287 std::source_location::current(),
288 "Error reading from container: {}",
291 return std::vector<double> {};
306 switch (capture_mode) {
317 auto&
existing = std::get<std::vector<double>>(it->second);
318 const auto& new_data = std::get<std::vector<double>>(buffer_data);
321 }
catch (
const std::bad_variant_access& e) {
324 "Data type mismatch during ACCUMULATE capture: {}",
333 if (circular_size == 0) {
334 circular_size = 4096;
342 auto& circular = std::get<std::vector<double>>(it->second);
343 const auto& new_data = std::get<std::vector<double>>(buffer_data);
345 circular.insert(circular.end(), new_data.begin(), new_data.end());
347 if (circular.size() > circular_size) {
348 circular.erase(circular.begin(),
349 circular.begin() +
static_cast<int64_t
>(circular.size() - circular_size));
352 }
catch (
const std::bad_variant_access& e) {
355 "Data type mismatch during CIRCULAR capture: {}",
358 }
catch (std::exception& e) {
361 std::source_location::current(),
362 "Error during CIRCULAR capture: {}",
373 if (window_size == 0) {
377 auto hop_size =
static_cast<uint32_t
>((float)window_size * (1.0F - overlap_ratio));
386 auto& windowed = std::get<std::vector<double>>(it->second);
387 const auto& new_data = std::get<std::vector<double>>(buffer_data);
389 if (windowed.size() >= window_size) {
390 if (hop_size >= windowed.size()) {
391 windowed = std::get<std::vector<double>>(buffer_data);
393 windowed.erase(windowed.begin(),
394 windowed.begin() + hop_size);
396 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
398 if (windowed.size() > window_size) {
399 size_t excess = windowed.size() - window_size;
400 windowed.erase(windowed.begin(),
401 windowed.begin() + excess);
405 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
407 if (windowed.size() > window_size) {
408 size_t excess = windowed.size() - window_size;
409 windowed.erase(windowed.begin(),
410 windowed.begin() + excess);
414 }
catch (
const std::bad_variant_access& e) {
416 "Data type mismatch during WINDOWED capture: {}", e.what());
440 auto next_it = std::next(current_it);
443 if (next_it->m_target_buffer) {
445 }
else if (next_it->m_target_container) {
449 size_t route_index = std::distance(
m_operations.begin(), next_it);
462 auto mode = op.m_capture.get_mode();
479 auto next_op = std::next(it);
499 input_data = it.second;
508 for (
auto& m_operation : std::ranges::reverse_view(
m_operations)) {
509 if (&m_operation == &op)
526 data_to_route = it.second;
553 std::vector<Kakshya::DataVariant> fusion_inputs;
558 fusion_inputs.push_back(buffer_data);
563 fusion_inputs.push_back(container_data);
586 data_to_dispatch = it.second;
601 std::source_location::current(),
602 "BufferPipeline has no BufferManager for MODIFY operation");
632 "Unknown operation type in pipeline : {} : {}",
636 }
catch (
const std::exception& e) {
639 std::source_location::current(),
640 "Error processing operation in BufferPipeline: {}",
658 auto task = std::make_shared<Vruta::SoundRoutine>(std::move(branch_routine));
672 if (
m_operations[i].m_capture.m_data_expired_callback) {
699 if (branch.pipeline) {
700 branch.pipeline->m_active_self.reset();
706 [](
const auto& task) { return !task || !task->is_active(); }),
728 std::source_location::current(),
729 "Unknown execution strategy in BufferPipeline");
742 uint32_t cycles_executed = 0;
744 while ((max_cycles == 0 || cycles_executed < max_cycles) && (
m_continuous_execution || cycles_executed < max_cycles)) {
746 if (promise.should_terminate) {
782 uint32_t op_iterations = 1;
784 op_iterations = op.m_capture.get_cycle_count();
787 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
837 std::vector<std::shared_ptr<Vruta::SoundRoutine>> current_cycle_sync_tasks;
843 if (branch.synchronous && task) {
844 current_cycle_sync_tasks.push_back(task);
849 if (!current_cycle_sync_tasks.empty()) {
850 bool any_active =
true;
854 for (
auto& task : current_cycle_sync_tasks) {
855 if (task && task->is_active()) {
893 uint32_t cycles_executed = 0;
895 while ((max_cycles == 0 || cycles_executed < max_cycles) && (
m_continuous_execution || cycles_executed < max_cycles)) {
897 if (promise.should_terminate) {
918 uint32_t op_iterations = 1;
920 op_iterations = op.m_capture.get_cycle_count();
923 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
941 }
else if (samples_per_operation > 0) {
969 std::cout <<
"PARALLEL strategy not yet implemented, using PHASED\n";
976 std::cout <<
"REACTIVE strategy not yet implemented, using PHASED\n";
#define MF_ERROR(comp, ctx,...)
Cycle Behavior: The for_cycles(N) configuration controls how many times the capture operation execute...
std::shared_ptr< Buffers::AudioBuffer > get_buffer() const
OperationFunction m_data_ready_callback
float get_overlap_ratio() const
ProcessingControl get_processing_control() const
@ TRANSIENT
Single cycle capture (default) - data expires after 1 cycle.
@ CIRCULAR
Circular buffer with overwrite.
@ ACCUMULATE
Accumulate over multiple cycles in container.
@ WINDOWED
Rolling window capture with overlap.
@ TRIGGERED
Capture only when condition met.
std::function< bool()> m_stop_condition
CaptureMode get_mode() const
uint32_t get_window_size() const
uint32_t get_circular_size() const
std::vector< std::shared_ptr< Buffers::AudioBuffer > > m_source_buffers
std::shared_ptr< Buffers::AudioBuffer > m_target_buffer
std::vector< std::shared_ptr< Kakshya::DynamicSoundStream > > m_source_containers
Buffers::BufferProcessingFunction m_buffer_modifier
static bool is_capture_phase_operation(const BufferOperation &op)
std::shared_ptr< Kakshya::DynamicSoundStream > m_target_container
bool is_streaming() const
Check if this operation is a streaming operation.
TransformVectorFunction m_fusion_function
std::shared_ptr< Buffers::BufferProcessor > m_attached_processor
BufferOperation & with_priority(uint8_t priority)
Set execution priority for scheduler ordering.
OpType get_type() const
Getters for internal state (read-only)
uint32_t m_modify_cycle_count
std::shared_ptr< Kakshya::DynamicSoundStream > m_source_container
TransformationFunction m_transformer
OperationFunction m_dispatch_handler
@ LOAD
Load data from container to buffer with position control.
@ CONDITION
Conditional operation for branching logic.
@ FUSE
Fuse multiple sources using custom fusion functions.
@ ROUTE
Route data to destination (buffer or container)
@ CAPTURE
Capture data from source buffer using BufferCapture strategy.
@ MODIFY
Modify Buffer Data using custom quick process.
@ DISPATCH
Dispatch to external handler for custom processing.
@ TRANSFORM
Apply transformation function to data variants.
static bool is_process_phase_operation(const BufferOperation &op)
Fundamental unit of operation in buffer processing pipelines.
std::unordered_map< BufferOperation *, Kakshya::DataVariant > m_operation_data
ExecutionStrategy m_execution_strategy
@ READY
Data ready for processing.
@ EXPIRED
Data has expired and should be cleaned up.
@ CONSUMED
Data has been processed.
@ EMPTY
No data available.
Vruta::SoundRoutine execute_reactive(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, const Kakshya::DataVariant &data)
Vruta::DelayContext m_process_timing
std::shared_ptr< CycleCoordinator > m_coordinator
BufferPipeline & branch_if(std::function< bool(uint32_t)> condition, const std::function< void(BufferPipeline &)> &branch_builder, bool synchronous=false, uint64_t samples_per_operation=1)
Add conditional branch to the pipeline.
void cleanup_completed_branches()
void execute_for_cycles(uint64_t cycles=0)
Execute the pipeline for a specified number of cycles.
void process_operation(BufferOperation &op, uint64_t cycle)
Vruta::SoundRoutine execute_internal(uint64_t max_cycles, uint64_t samples_per_operation)
void execute_continuous()
Start continuous execution of the pipeline.
void reset_accumulated_data()
Vruta::DelayContext m_capture_timing
void execute_scheduled_at_rate(uint32_t max_cycles=0, double seconds_per_operation=1)
Execute pipeline with real-time rate control.
std::vector< BranchInfo > m_branches
void capture_operation(BufferOperation &op, uint64_t cycle)
std::shared_ptr< BufferPipeline > m_active_self
std::vector< BufferOperation > m_operations
void execute_once()
Execute the pipeline for a single cycle.
std::vector< std::shared_ptr< Vruta::SoundRoutine > > m_branch_tasks
std::vector< DataState > m_data_states
void cleanup_expired_data()
static Kakshya::DataVariant read_from_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, uint64_t start, uint32_t length)
Vruta::SoundRoutine execute_streaming(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_buffer(const std::shared_ptr< Buffers::AudioBuffer > &buffer, const Kakshya::DataVariant &data)
Vruta::SoundRoutine execute_parallel(uint64_t max_cycles, uint64_t samples_per_operation)
std::function< void(uint32_t)> m_cycle_end_callback
bool has_pending_data() const
Check if any operations have pending data ready for processing.
void mark_data_consumed(uint32_t operation_index)
Execute pipeline synchronized to audio hardware cycle boundaries.
bool m_continuous_execution
std::function< void(uint32_t)> m_cycle_start_callback
void execute_buffer_rate(uint64_t max_cycles=0)
Execute pipeline synchronized to audio hardware cycle boundaries.
std::shared_ptr< Buffers::BufferManager > m_buffer_manager
BufferPipeline & parallel(std::initializer_list< BufferOperation > operations)
Execute operations in parallel within the current cycle.
bool has_immediate_routing(const BufferOperation &op) const
void execute_scheduled(uint64_t max_cycles=0, uint64_t samples_per_operation=1)
Execute pipeline with sample-accurate timing between operations.
Vruta::SoundRoutine execute_phased(uint64_t max_cycles, uint64_t samples_per_operation)
static Kakshya::DataVariant extract_buffer_data(const std::shared_ptr< Buffers::AudioBuffer > &buffer, bool should_process=false)
Vruta::TaskScheduler * m_scheduler
std::shared_ptr< Vruta::SoundRoutine > dispatch_branch_async(BranchInfo &branch, uint64_t cycle)
BufferPipeline & with_lifecycle(std::function< void(uint32_t)> on_cycle_start, std::function< void(uint32_t)> on_cycle_end)
Set lifecycle callbacks for cycle management.
Coroutine-based execution engine for composable, multi-strategy buffer processing.
Cross-pipeline synchronization and coordination system.
A C++20 coroutine-based audio processing task with sample-accurate timing.
void add_task(std::shared_ptr< Routine > routine, const std::string &name="", bool initialize=false)
Add a routine to the scheduler based on its processing token.
uint64_t seconds_to_samples(double seconds) const
Converts a time in seconds to a number of samples.
Token-based multimodal task scheduling system for unified coroutine processing.
@ AUDIO_BACKEND
Standard audio processing backend configuration.
@ CoroutineScheduling
Coroutine scheduling and temporal coordination (Vruta::TaskScheduler)
@ Kriya
Automatable tasks and fluent scheduling api for Nodes and Buffers.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
@ PHASED
PHASED: Traditional phased execution (default)
@ STREAMING
STREAMING: Immediate flow-through execution.
@ PARALLEL
PARALLEL: Concurrent capture with synchronization.
@ REACTIVE
REACTIVE: Data-driven reactive execution.
constexpr std::string_view enum_to_string(EnumType value) noexcept
Universal enum to string converter using magic_enum (original case)
@ SAMPLE_BASED
Sample-accurate delay (audio domain)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)
Awaiter for suspending until a buffer cycle boundary.
std::shared_ptr< BufferPipeline > pipeline
uint64_t samples_per_operation
Templated awaitable for accessing a coroutine's promise object.
Awaitable object for precise sample-accurate timing delays.