17 , m_buffer_manager(
std::move(buffer_manager))
18 , m_scheduler(&scheduler)
29 if (op.m_attached_processor) {
31 m_buffer_manager->remove_processor(op.m_attached_processor, op.m_target_buffer);
34 m_buffer_manager->remove_processor(op.m_attached_processor, op.m_target_buffer);
36 op.m_attached_processor =
nullptr;
42 std::function<
bool(uint32_t)> condition,
45 uint64_t samples_per_operation)
47 auto branch_pipeline = std::make_shared<BufferPipeline>();
51 branch_builder(*branch_pipeline);
53 m_branches.push_back({ .condition = std::move(condition),
54 .pipeline = std::move(branch_pipeline),
55 .synchronous = synchronous,
56 .samples_per_operation = samples_per_operation });
63 for (
auto& op : operations) {
72 std::function<
void(uint32_t)> on_cycle_start,
73 std::function<
void(uint32_t)> on_cycle_end)
85 std::source_location::current(),
86 "Pipeline requires scheduler for execution");
89 auto self = shared_from_this();
91 if (max_cycles == 0) {
92 max_cycles = UINT64_MAX;
97 auto routine = std::make_shared<Vruta::SoundRoutine>(
110 std::source_location::current(),
111 "Pipeline requires scheduler for execution");
113 auto self = shared_from_this();
116 auto routine = std::make_shared<Vruta::SoundRoutine>(
127 std::source_location::current(),
128 "Pipeline requires scheduler for execution");
131 auto self = shared_from_this();
138 auto routine = std::make_shared<Vruta::SoundRoutine>(
152 uint64_t samples_per_operation)
157 std::source_location::current(),
158 "Pipeline must have scheduler for scheduled execution");
161 auto self = shared_from_this();
163 if (max_cycles == 0) {
164 max_cycles = UINT64_MAX;
170 auto routine = std::make_shared<Vruta::SoundRoutine>(
180 double seconds_per_operation)
185 std::source_location::current(),
186 "Pipeline must have scheduler for scheduled execution");
214 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
216 if (should_process) {
217 audio_buffer->process_default();
219 const auto& data_span = audio_buffer->get_data();
220 std::vector<double> data_vector(data_span.begin(), data_span.end());
224 return std::vector<double> {};
229 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
232 auto audio_data = std::get<std::vector<double>>(data);
233 auto& buffer_data = audio_buffer->get_data();
235 if (buffer_data.size() != audio_data.size()) {
236 buffer_data.resize(audio_data.size());
239 std::ranges::copy(audio_data, buffer_data.begin());
241 }
catch (
const std::bad_variant_access& e) {
244 std::source_location::current(),
245 "Data type mismatch when writing to audio buffer: {}",
256 auto audio_data = std::get<std::vector<double>>(data);
257 std::span<const double> data_span(audio_data.data(), audio_data.size());
259 container->write_frames(data_span, 0);
261 }
catch (
const std::bad_variant_access& e) {
264 std::source_location::current(),
265 "Data type mismatch when writing to container: {}",
267 }
catch (
const std::exception& e) {
270 std::source_location::current(),
271 "Error writing to container: {}",
281 uint32_t read_length = length;
282 if (read_length == 0) {
283 read_length =
static_cast<uint32_t
>(container->get_total_elements() / container->get_num_channels());
286 std::vector<double> output_data(
static_cast<size_t>(read_length * container->get_num_channels()));
287 std::span<double> output_span(output_data.data(), output_data.size());
289 uint64_t frames_read = container->read_frames(output_span, read_length);
291 if (frames_read < output_data.size()) {
292 output_data.resize(frames_read);
297 }
catch (
const std::exception& e) {
300 std::source_location::current(),
301 "Error reading from container: {}",
304 return std::vector<double> {};
319 switch (capture_mode) {
330 auto&
existing = std::get<std::vector<double>>(it->second);
331 const auto& new_data = std::get<std::vector<double>>(buffer_data);
334 }
catch (
const std::bad_variant_access& e) {
337 "Data type mismatch during ACCUMULATE capture: {}",
346 if (circular_size == 0) {
347 circular_size = 4096;
355 auto& circular = std::get<std::vector<double>>(it->second);
356 const auto& new_data = std::get<std::vector<double>>(buffer_data);
358 circular.insert(circular.end(), new_data.begin(), new_data.end());
360 if (circular.size() > circular_size) {
361 circular.erase(circular.begin(),
362 circular.begin() +
static_cast<int64_t
>(circular.size() - circular_size));
365 }
catch (
const std::bad_variant_access& e) {
368 "Data type mismatch during CIRCULAR capture: {}",
371 }
catch (std::exception& e) {
374 std::source_location::current(),
375 "Error during CIRCULAR capture: {}",
386 if (window_size == 0) {
390 auto hop_size =
static_cast<uint32_t
>((float)window_size * (1.0F - overlap_ratio));
399 auto& windowed = std::get<std::vector<double>>(it->second);
400 const auto& new_data = std::get<std::vector<double>>(buffer_data);
402 if (windowed.size() >= window_size) {
403 if (hop_size >= windowed.size()) {
404 windowed = std::get<std::vector<double>>(buffer_data);
406 windowed.erase(windowed.begin(),
407 windowed.begin() + hop_size);
409 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
411 if (windowed.size() > window_size) {
412 size_t excess = windowed.size() - window_size;
413 windowed.erase(windowed.begin(),
414 windowed.begin() + excess);
418 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
420 if (windowed.size() > window_size) {
421 size_t excess = windowed.size() - window_size;
422 windowed.erase(windowed.begin(),
423 windowed.begin() + excess);
427 }
catch (
const std::bad_variant_access& e) {
429 "Data type mismatch during WINDOWED capture: {}", e.what());
453 auto next_it = std::next(current_it);
456 if (next_it->m_target_buffer) {
460 std::source_location::current(),
461 "BufferPipeline has no BufferManager for immediate ROUTE-to-buffer");
464 if (!next_it->m_attached_processor) {
465 auto writer = std::make_shared<Buffers::AudioWriteProcessor>();
468 next_it->m_attached_processor = writer;
471 std::static_pointer_cast<Buffers::AudioWriteProcessor>(next_it->m_attached_processor)
472 ->set_data(buffer_data);
473 }
else if (next_it->m_target_container) {
477 size_t route_index = std::distance(
m_operations.begin(), next_it);
490 auto mode = op.m_capture.get_mode();
507 auto next_op = std::next(it);
527 input_data = it.second;
536 for (
auto& m_operation : std::ranges::reverse_view(
m_operations)) {
537 if (&m_operation == &op)
554 data_to_route = it.second;
563 std::source_location::current(),
564 "BufferPipeline has no BufferManager for ROUTE-to-buffer operation");
568 auto writer = std::make_shared<Buffers::AudioWriteProcessor>();
575 ->set_data(data_to_route);
597 std::vector<Kakshya::DataVariant> fusion_inputs;
602 fusion_inputs.push_back(buffer_data);
607 fusion_inputs.push_back(container_data);
630 data_to_dispatch = it.second;
645 std::source_location::current(),
646 "BufferPipeline has no BufferManager for MODIFY operation");
676 "Unknown operation type in pipeline : {} : {}",
680 }
catch (
const std::exception& e) {
683 std::source_location::current(),
684 "Error processing operation in BufferPipeline: {}",
702 auto task = std::make_shared<Vruta::SoundRoutine>(std::move(branch_routine));
716 if (
m_operations[i].m_capture.m_data_expired_callback) {
743 if (branch.pipeline) {
744 branch.pipeline->m_active_self.reset();
749 return !task || !task->is_active();
771 std::source_location::current(),
772 "Unknown execution strategy in BufferPipeline");
785 uint32_t cycles_executed = 0;
787 while ((max_cycles == 0 || cycles_executed < max_cycles) && (
m_continuous_execution || cycles_executed < max_cycles)) {
789 if (promise.should_terminate) {
825 uint32_t op_iterations = 1;
827 op_iterations = op.m_capture.get_cycle_count();
830 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
880 std::vector<std::shared_ptr<Vruta::SoundRoutine>> current_cycle_sync_tasks;
886 if (branch.synchronous && task) {
887 current_cycle_sync_tasks.push_back(task);
892 if (!current_cycle_sync_tasks.empty()) {
893 bool any_active =
true;
897 for (
auto& task : current_cycle_sync_tasks) {
898 if (task && task->is_active()) {
939 uint32_t cycles_executed = 0;
941 while ((max_cycles == 0 || cycles_executed < max_cycles) && (
m_continuous_execution || cycles_executed < max_cycles)) {
943 if (promise.should_terminate) {
964 uint32_t op_iterations = 1;
966 op_iterations = op.m_capture.get_cycle_count();
969 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
987 }
else if (samples_per_operation > 0) {
1019 "PARALLEL strategy not yet implemented, using PHASED as fallback");
1027 "REACTIVE strategy not yet implemented, using PHASED as fallback");
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
Cycle Behavior: The for_cycles(N) configuration controls how many times the capture operation execute...
std::shared_ptr< Buffers::AudioBuffer > get_buffer() const
OperationFunction m_data_ready_callback
float get_overlap_ratio() const
ProcessingControl get_processing_control() const
@ TRANSIENT
Single cycle capture (default) - data expires after 1 cycle.
@ CIRCULAR
Circular buffer with overwrite.
@ ACCUMULATE
Accumulate over multiple cycles in container.
@ WINDOWED
Rolling window capture with overlap.
@ TRIGGERED
Capture only when condition met.
std::function< bool()> m_stop_condition
CaptureMode get_mode() const
uint32_t get_window_size() const
uint32_t get_circular_size() const
std::vector< std::shared_ptr< Buffers::AudioBuffer > > m_source_buffers
Buffers::AudioProcessingFunction m_buffer_modifier
std::shared_ptr< Buffers::AudioBuffer > m_target_buffer
std::vector< std::shared_ptr< Kakshya::DynamicSoundStream > > m_source_containers
static bool is_capture_phase_operation(const BufferOperation &op)
std::shared_ptr< Kakshya::DynamicSoundStream > m_target_container
bool is_streaming() const
Check if this operation is a streaming operation.
TransformVectorFunction m_fusion_function
std::shared_ptr< Buffers::BufferProcessor > m_attached_processor
BufferOperation & with_priority(uint8_t priority)
Set execution priority for scheduler ordering.
OpType get_type() const
Getters for internal state (read-only)
uint32_t m_modify_cycle_count
std::shared_ptr< Kakshya::DynamicSoundStream > m_source_container
TransformationFunction m_transformer
OperationFunction m_dispatch_handler
@ LOAD
Load data from container to buffer with position control.
@ CONDITION
Conditional operation for branching logic.
@ FUSE
Fuse multiple sources using custom fusion functions.
@ ROUTE
Route data to destination (buffer or container)
@ CAPTURE
Capture data from source buffer using BufferCapture strategy.
@ MODIFY
Modify Buffer Data using custom quick process.
@ DISPATCH
Dispatch to external handler for custom processing.
@ TRANSFORM
Apply transformation function to data variants.
static bool is_process_phase_operation(const BufferOperation &op)
Fundamental unit of operation in buffer processing pipelines.
std::unordered_map< BufferOperation *, Kakshya::DataVariant > m_operation_data
ExecutionStrategy m_execution_strategy
@ READY
Data ready for processing.
@ EXPIRED
Data has expired and should be cleaned up.
@ CONSUMED
Data has been processed.
@ EMPTY
No data available.
Vruta::SoundRoutine execute_reactive(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, const Kakshya::DataVariant &data)
Vruta::DelayContext m_process_timing
std::shared_ptr< CycleCoordinator > m_coordinator
BufferPipeline & on_complete(std::function< void()> cb)
Register a callback fired once when pipeline execution ends.
BufferPipeline & branch_if(std::function< bool(uint32_t)> condition, const std::function< void(BufferPipeline &)> &branch_builder, bool synchronous=false, uint64_t samples_per_operation=1)
Add conditional branch to the pipeline.
void cleanup_completed_branches()
void execute_for_cycles(uint64_t cycles=0)
Execute the pipeline for a specified number of cycles.
void process_operation(BufferOperation &op, uint64_t cycle)
Vruta::SoundRoutine execute_internal(uint64_t max_cycles, uint64_t samples_per_operation)
void execute_continuous()
Start continuous execution of the pipeline.
void reset_accumulated_data()
Vruta::DelayContext m_capture_timing
void execute_scheduled_at_rate(uint32_t max_cycles=0, double seconds_per_operation=1)
Execute pipeline with real-time rate control.
std::vector< BranchInfo > m_branches
void capture_operation(BufferOperation &op, uint64_t cycle)
std::shared_ptr< BufferPipeline > m_active_self
std::vector< BufferOperation > m_operations
void execute_once()
Execute the pipeline for a single cycle.
std::vector< std::shared_ptr< Vruta::SoundRoutine > > m_branch_tasks
std::vector< DataState > m_data_states
void cleanup_expired_data()
static Kakshya::DataVariant read_from_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, uint64_t start, uint32_t length)
Vruta::SoundRoutine execute_streaming(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_buffer(const std::shared_ptr< Buffers::AudioBuffer > &buffer, const Kakshya::DataVariant &data)
Vruta::SoundRoutine execute_parallel(uint64_t max_cycles, uint64_t samples_per_operation)
std::function< void(uint32_t)> m_cycle_end_callback
bool has_pending_data() const
Check if any operations have pending data ready for processing.
void mark_data_consumed(uint32_t operation_index)
Execute pipeline synchronized to audio hardware cycle boundaries.
bool m_continuous_execution
std::function< void(uint32_t)> m_cycle_start_callback
std::function< void()> m_on_complete
void execute_buffer_rate(uint64_t max_cycles=0)
Execute pipeline synchronized to audio hardware cycle boundaries.
std::shared_ptr< Buffers::BufferManager > m_buffer_manager
BufferPipeline & parallel(std::initializer_list< BufferOperation > operations)
Execute operations in parallel within the current cycle.
bool has_immediate_routing(const BufferOperation &op) const
void execute_scheduled(uint64_t max_cycles=0, uint64_t samples_per_operation=1)
Execute pipeline with sample-accurate timing between operations.
Vruta::SoundRoutine execute_phased(uint64_t max_cycles, uint64_t samples_per_operation)
static Kakshya::DataVariant extract_buffer_data(const std::shared_ptr< Buffers::AudioBuffer > &buffer, bool should_process=false)
Vruta::TaskScheduler * m_scheduler
std::shared_ptr< Vruta::SoundRoutine > dispatch_branch_async(BranchInfo &branch, uint64_t cycle)
BufferPipeline & with_lifecycle(std::function< void(uint32_t)> on_cycle_start, std::function< void(uint32_t)> on_cycle_end)
Set lifecycle callbacks for cycle management.
Coroutine-based execution engine for composable, multi-strategy buffer processing.
Cross-pipeline synchronization and coordination system.
A C++20 coroutine-based audio processing task with sample-accurate timing.
void add_task(const std::shared_ptr< Routine > &routine, const std::string &name="", bool initialize=false)
Add a routine to the scheduler based on its processing token.
uint64_t seconds_to_samples(double seconds) const
Converts a time in seconds to a number of samples.
Token-based multimodal task scheduling system for unified coroutine processing.
@ AUDIO_BACKEND
Standard audio processing backend configuration.
@ CoroutineScheduling
Coroutine scheduling and temporal coordination (Vruta::TaskScheduler)
@ Kriya
Automatable tasks and fluent scheduling api for Nodes and Buffers.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
@ PHASED
PHASED: Traditional phased execution (default)
@ STREAMING
STREAMING: Immediate flow-through execution.
@ PARALLEL
PARALLEL: Concurrent capture with synchronization.
@ REACTIVE
REACTIVE: Data-driven reactive execution.
constexpr std::string_view enum_to_string(EnumType value) noexcept
Universal enum to string converter using magic_enum (original case)
@ SAMPLE_BASED
Sample-accurate delay (audio domain)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)
Awaiter for suspending until a buffer cycle boundary.
std::shared_ptr< BufferPipeline > pipeline
uint64_t samples_per_operation
Templated awaitable for accessing a coroutine's promise object.
Awaitable object for precise sample-accurate timing delays.