MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferPipeline.cpp
Go to the documentation of this file.
1#include "BufferPipeline.hpp"
2
4
7
10
12
13namespace MayaFlux::Kriya {
14
15BufferPipeline::BufferPipeline(Vruta::TaskScheduler& scheduler, std::shared_ptr<Buffers::BufferManager> buffer_manager)
16 : m_coordinator(std::make_shared<CycleCoordinator>(scheduler))
17 , m_buffer_manager(std::move(buffer_manager))
18 , m_scheduler(&scheduler)
19{
20}
21
23{
24 if (!m_buffer_manager) {
25 return;
26 }
27
28 for (auto& op : m_operations) {
29 if (op.m_attached_processor) {
30 if (op.get_type() == BufferOperation::OpType::ROUTE && op.m_target_buffer) {
31 m_buffer_manager->remove_processor(op.m_attached_processor, op.m_target_buffer);
32 }
33 if (op.get_type() == BufferOperation::OpType::MODIFY && op.m_target_buffer) {
34 m_buffer_manager->remove_processor(op.m_attached_processor, op.m_target_buffer);
35 }
36 op.m_attached_processor = nullptr;
37 }
38 }
39}
40
42 std::function<bool(uint32_t)> condition,
43 const std::function<void(BufferPipeline&)>& branch_builder,
44 bool synchronous,
45 uint64_t samples_per_operation)
46{
47 auto branch_pipeline = std::make_shared<BufferPipeline>();
48 if (m_scheduler) {
49 branch_pipeline->m_scheduler = m_scheduler;
50 }
51 branch_builder(*branch_pipeline);
52
53 m_branches.push_back({ .condition = std::move(condition),
54 .pipeline = std::move(branch_pipeline),
55 .synchronous = synchronous,
56 .samples_per_operation = samples_per_operation });
57
58 return *this;
59}
60
61BufferPipeline& BufferPipeline::parallel(std::initializer_list<BufferOperation> operations)
62{
63 for (auto& op : operations) {
64 BufferOperation parallel_op = BufferOperation(op);
65 parallel_op.with_priority(255);
66 m_operations.emplace_back(std::move(parallel_op));
67 }
68 return *this;
69}
70
72 std::function<void(uint32_t)> on_cycle_start,
73 std::function<void(uint32_t)> on_cycle_end)
74{
75 m_cycle_start_callback = std::move(on_cycle_start);
76 m_cycle_end_callback = std::move(on_cycle_end);
77 return *this;
78}
79
80void BufferPipeline::execute_buffer_rate(uint64_t max_cycles)
81{
82 if (!m_scheduler) {
83 error<std::runtime_error>(Journal::Component::Kriya,
85 std::source_location::current(),
86 "Pipeline requires scheduler for execution");
87 }
88
89 auto self = shared_from_this();
90
91 if (max_cycles == 0) {
92 max_cycles = UINT64_MAX;
94 }
95 m_max_cycles = max_cycles;
96
97 auto routine = std::make_shared<Vruta::SoundRoutine>(
98 execute_internal(max_cycles, 0));
99
100 m_scheduler->add_task(std::move(routine));
101
102 m_active_self = self;
103}
104
106{
107 if (!m_scheduler) {
108 error<std::runtime_error>(Journal::Component::Kriya,
110 std::source_location::current(),
111 "Pipeline requires scheduler for execution");
112 }
113 auto self = shared_from_this();
114
115 m_max_cycles = 1;
116 auto routine = std::make_shared<Vruta::SoundRoutine>(
117 execute_internal(1, 0));
118 m_scheduler->add_task(std::move(routine));
119 m_active_self = self;
120}
121
123{
124 if (!m_scheduler) {
125 error<std::runtime_error>(Journal::Component::Kriya,
127 std::source_location::current(),
128 "Pipeline requires scheduler for execution");
129 }
130
131 auto self = shared_from_this();
132
133 if (cycles == 0) {
134 cycles = UINT64_MAX;
136 }
137 m_max_cycles = cycles;
138 auto routine = std::make_shared<Vruta::SoundRoutine>(
139 execute_internal(cycles, 0));
140 m_scheduler->add_task(std::move(routine));
141 m_active_self = self;
142}
143
149
151 uint64_t max_cycles,
152 uint64_t samples_per_operation)
153{
154 if (!m_scheduler) {
155 error<std::runtime_error>(Journal::Component::Kriya,
157 std::source_location::current(),
158 "Pipeline must have scheduler for scheduled execution");
159 }
160
161 auto self = shared_from_this();
162
163 if (max_cycles == 0) {
164 max_cycles = UINT64_MAX;
166 }
167
168 m_max_cycles = max_cycles;
169
170 auto routine = std::make_shared<Vruta::SoundRoutine>(
171 execute_internal(max_cycles, samples_per_operation));
172
173 m_scheduler->add_task(std::move(routine));
174
175 m_active_self = self;
176}
177
179 uint32_t max_cycles,
180 double seconds_per_operation)
181{
182 if (!m_scheduler) {
183 error<std::runtime_error>(Journal::Component::Kriya,
185 std::source_location::current(),
186 "Pipeline must have scheduler for scheduled execution");
187 }
188
189 uint64_t samples = m_scheduler->seconds_to_samples(seconds_per_operation);
190 execute_scheduled(max_cycles, samples);
191}
192
193void BufferPipeline::mark_data_consumed(uint32_t operation_index)
194{
195 if (operation_index < m_data_states.size()) {
196 m_data_states[operation_index] = DataState::CONSUMED;
197 }
198}
199
201{
202 return std::ranges::any_of(m_data_states,
203 [](DataState state) { return state == DataState::READY; });
204}
205
207{
208 m_on_complete = std::move(cb);
209 return *this;
210}
211
212Kakshya::DataVariant BufferPipeline::extract_buffer_data(const std::shared_ptr<Buffers::AudioBuffer>& buffer, bool should_process)
213{
214 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
215 if (audio_buffer) {
216 if (should_process) {
217 audio_buffer->process_default();
218 }
219 const auto& data_span = audio_buffer->get_data();
220 std::vector<double> data_vector(data_span.begin(), data_span.end());
221 return data_vector;
222 }
223
224 return std::vector<double> {};
225}
226
227void BufferPipeline::write_to_buffer(const std::shared_ptr<Buffers::AudioBuffer>& buffer, const Kakshya::DataVariant& data)
228{
229 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
230 if (audio_buffer) {
231 try {
232 auto audio_data = std::get<std::vector<double>>(data);
233 auto& buffer_data = audio_buffer->get_data();
234
235 if (buffer_data.size() != audio_data.size()) {
236 buffer_data.resize(audio_data.size());
237 }
238
239 std::ranges::copy(audio_data, buffer_data.begin());
240
241 } catch (const std::bad_variant_access& e) {
242 error_rethrow(Journal::Component::Kriya,
244 std::source_location::current(),
245 "Data type mismatch when writing to audio buffer: {}",
246 e.what());
247 }
248 }
249
250 // TODO: Handle other buffer types
251}
252
253void BufferPipeline::write_to_container(const std::shared_ptr<Kakshya::DynamicSoundStream>& container, const Kakshya::DataVariant& data)
254{
255 try {
256 auto audio_data = std::get<std::vector<double>>(data);
257 std::span<const double> data_span(audio_data.data(), audio_data.size());
258
259 container->write_frames(data_span, 0);
260
261 } catch (const std::bad_variant_access& e) {
262 error_rethrow(Journal::Component::Kriya,
264 std::source_location::current(),
265 "Data type mismatch when writing to container: {}",
266 e.what());
267 } catch (const std::exception& e) {
268 error_rethrow(Journal::Component::Kriya,
270 std::source_location::current(),
271 "Error writing to container: {}",
272 e.what());
273 }
274}
275
276Kakshya::DataVariant BufferPipeline::read_from_container(const std::shared_ptr<Kakshya::DynamicSoundStream>& container,
277 uint64_t /*start_frame*/,
278 uint32_t length)
279{
280 try {
281 uint32_t read_length = length;
282 if (read_length == 0) {
283 read_length = static_cast<uint32_t>(container->get_total_elements() / container->get_num_channels());
284 }
285
286 std::vector<double> output_data(static_cast<size_t>(read_length * container->get_num_channels()));
287 std::span<double> output_span(output_data.data(), output_data.size());
288
289 uint64_t frames_read = container->read_frames(output_span, read_length);
290
291 if (frames_read < output_data.size()) {
292 output_data.resize(frames_read);
293 }
294
295 return output_data;
296
297 } catch (const std::exception& e) {
298 error_rethrow(Journal::Component::Kriya,
300 std::source_location::current(),
301 "Error reading from container: {}",
302 e.what());
303
304 return std::vector<double> {};
305 }
306}
307
309{
311 auto buffer_data = extract_buffer_data(op.m_capture.get_buffer(), should_process);
312
314 op.m_capture.m_data_ready_callback(buffer_data, cycle);
315 }
316
317 auto capture_mode = op.m_capture.get_mode();
318
319 switch (capture_mode) {
321 m_operation_data[&op] = buffer_data;
322 break;
323
325 auto it = m_operation_data.find(&op);
326 if (it == m_operation_data.end()) {
327 m_operation_data[&op] = buffer_data;
328 } else {
329 try {
330 auto& existing = std::get<std::vector<double>>(it->second);
331 const auto& new_data = std::get<std::vector<double>>(buffer_data);
332 existing.insert(existing.end(), new_data.begin(), new_data.end());
333
334 } catch (const std::bad_variant_access& e) {
337 "Data type mismatch during ACCUMULATE capture: {}",
338 e.what());
339 m_operation_data[&op] = buffer_data;
340 }
341 }
342 break;
343 }
345 uint32_t circular_size = op.m_capture.get_circular_size();
346 if (circular_size == 0) {
347 circular_size = 4096;
348 }
349
350 auto it = m_operation_data.find(&op);
351 if (it == m_operation_data.end()) {
352 m_operation_data[&op] = buffer_data;
353 } else {
354 try {
355 auto& circular = std::get<std::vector<double>>(it->second);
356 const auto& new_data = std::get<std::vector<double>>(buffer_data);
357
358 circular.insert(circular.end(), new_data.begin(), new_data.end());
359
360 if (circular.size() > circular_size) {
361 circular.erase(circular.begin(),
362 circular.begin() + static_cast<int64_t>(circular.size() - circular_size));
363 }
364
365 } catch (const std::bad_variant_access& e) {
368 "Data type mismatch during CIRCULAR capture: {}",
369 e.what());
370 m_operation_data[&op] = buffer_data;
371 } catch (std::exception& e) {
372 error_rethrow(Journal::Component::Kriya,
374 std::source_location::current(),
375 "Error during CIRCULAR capture: {}",
376 e.what());
377 m_operation_data[&op] = buffer_data;
378 }
379 }
380 break;
381 }
383 uint32_t window_size = op.m_capture.get_window_size();
384 float overlap_ratio = op.m_capture.get_overlap_ratio();
385
386 if (window_size == 0) {
387 window_size = 512;
388 }
389
390 auto hop_size = static_cast<uint32_t>((float)window_size * (1.0F - overlap_ratio));
391 if (hop_size == 0)
392 hop_size = 1;
393
394 auto it = m_operation_data.find(&op);
395 if (it == m_operation_data.end()) {
396 m_operation_data[&op] = buffer_data;
397 } else {
398 try {
399 auto& windowed = std::get<std::vector<double>>(it->second);
400 const auto& new_data = std::get<std::vector<double>>(buffer_data);
401
402 if (windowed.size() >= window_size) {
403 if (hop_size >= windowed.size()) {
404 windowed = std::get<std::vector<double>>(buffer_data);
405 } else {
406 windowed.erase(windowed.begin(),
407 windowed.begin() + hop_size);
408
409 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
410
411 if (windowed.size() > window_size) {
412 size_t excess = windowed.size() - window_size;
413 windowed.erase(windowed.begin(),
414 windowed.begin() + excess);
415 }
416 }
417 } else {
418 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
419
420 if (windowed.size() > window_size) {
421 size_t excess = windowed.size() - window_size;
422 windowed.erase(windowed.begin(),
423 windowed.begin() + excess);
424 }
425 }
426
427 } catch (const std::bad_variant_access& e) {
429 "Data type mismatch during WINDOWED capture: {}", e.what());
430 m_operation_data[&op] = buffer_data;
431 }
432 }
433 break;
434 }
435
438 m_operation_data[&op] = buffer_data;
439 }
440 break;
441 }
442
443 default:
444 m_operation_data[&op] = buffer_data;
445 break;
446 }
447
448 if (has_immediate_routing(op)) {
449 auto current_it = std::ranges::find_if(m_operations,
450 [&op](const BufferOperation& o) { return &o == &op; });
451
452 if (current_it != m_operations.end()) {
453 auto next_it = std::next(current_it);
454 if (next_it != m_operations.end() && next_it->get_type() == BufferOperation::OpType::ROUTE) {
455
456 if (next_it->m_target_buffer) {
457 if (!m_buffer_manager) {
458 error<std::invalid_argument>(Journal::Component::Kriya,
460 std::source_location::current(),
461 "BufferPipeline has no BufferManager for immediate ROUTE-to-buffer");
462 }
463
464 if (!next_it->m_attached_processor) {
465 auto writer = std::make_shared<Buffers::AudioWriteProcessor>();
466 m_buffer_manager->add_processor(writer, next_it->m_target_buffer,
468 next_it->m_attached_processor = writer;
469 }
470
471 std::static_pointer_cast<Buffers::AudioWriteProcessor>(next_it->m_attached_processor)
472 ->set_data(buffer_data);
473 } else if (next_it->m_target_container) {
474 write_to_container(next_it->m_target_container, buffer_data);
475 }
476
477 size_t route_index = std::distance(m_operations.begin(), next_it);
478 if (route_index < m_data_states.size()) {
479 m_data_states[route_index] = DataState::CONSUMED;
480 }
481 }
482 }
483 }
484}
485
487{
488 for (auto& op : m_operations) {
489 if (op.get_type() == BufferOperation::OpType::CAPTURE) {
490 auto mode = op.m_capture.get_mode();
492 m_operation_data.erase(&op);
493 }
494 }
495 }
496}
497
499{
500 auto it = std::ranges::find_if(m_operations,
501 [&op](const BufferOperation& o) { return &o == &op; });
502
503 if (it == m_operations.end() || std::next(it) == m_operations.end()) {
504 return false;
505 }
506
507 auto next_op = std::next(it);
508 return next_op->get_type() == BufferOperation::OpType::ROUTE;
509}
510
512{
513 try {
514 switch (op.get_type()) {
516 capture_operation(op, cycle);
517
518 break;
519 }
520
522 Kakshya::DataVariant input_data;
523 if (m_operation_data.find(&op) != m_operation_data.end()) {
524 input_data = m_operation_data[&op];
525 } else {
526 for (auto& it : m_operation_data) {
527 input_data = it.second;
528 break;
529 }
530 }
531
532 if (op.m_transformer) {
533 auto transformed = op.m_transformer(input_data, cycle);
534 m_operation_data[&op] = transformed;
535
536 for (auto& m_operation : std::ranges::reverse_view(m_operations)) {
537 if (&m_operation == &op)
538 continue;
539 if (m_operation.get_type() == BufferOperation::OpType::CAPTURE && m_operation.m_capture.get_buffer()) {
540 write_to_buffer(m_operation.m_capture.get_buffer(), transformed);
541 break;
542 }
543 }
544 }
545 break;
546 }
547
549 Kakshya::DataVariant data_to_route;
550 if (m_operation_data.find(&op) != m_operation_data.end()) {
551 data_to_route = m_operation_data[&op];
552 } else {
553 for (auto& it : m_operation_data) {
554 data_to_route = it.second;
555 break;
556 }
557 }
558
559 if (op.m_target_buffer) {
560 if (!m_buffer_manager) {
561 error<std::invalid_argument>(Journal::Component::Kriya,
563 std::source_location::current(),
564 "BufferPipeline has no BufferManager for ROUTE-to-buffer operation");
565 }
566
567 if (!op.m_attached_processor) {
568 auto writer = std::make_shared<Buffers::AudioWriteProcessor>();
569 m_buffer_manager->add_processor(writer, op.m_target_buffer,
571 op.m_attached_processor = writer;
572 }
573
574 std::static_pointer_cast<Buffers::AudioWriteProcessor>(op.m_attached_processor)
575 ->set_data(data_to_route);
576
577 } else if (op.m_target_container) {
578 write_to_container(op.m_target_container, data_to_route);
579 }
580 break;
581 }
582
584 auto loaded_data = read_from_container(op.m_source_container,
585 op.m_start_frame,
586 op.m_load_length);
587
588 if (op.m_target_buffer) {
589 write_to_buffer(op.m_target_buffer, loaded_data);
590 }
591
592 m_operation_data[&op] = loaded_data;
593 break;
594 }
595
597 std::vector<Kakshya::DataVariant> fusion_inputs;
598
599 for (auto& source_buffer : op.m_source_buffers) {
601 auto buffer_data = extract_buffer_data(source_buffer, should_process);
602 fusion_inputs.push_back(buffer_data);
603 }
604
605 for (auto& source_container : op.m_source_containers) {
606 auto container_data = read_from_container(source_container, 0, 0);
607 fusion_inputs.push_back(container_data);
608 }
609
610 if (op.m_fusion_function && !fusion_inputs.empty()) {
611 auto fused_data = op.m_fusion_function(fusion_inputs, cycle);
612
613 if (op.m_target_buffer) {
614 write_to_buffer(op.m_target_buffer, fused_data);
615 } else if (op.m_target_container) {
617 }
618
619 m_operation_data[&op] = fused_data;
620 }
621 break;
622 }
623
625 Kakshya::DataVariant data_to_dispatch;
626 if (m_operation_data.find(&op) != m_operation_data.end()) {
627 data_to_dispatch = m_operation_data[&op];
628 } else {
629 for (auto& it : m_operation_data) {
630 data_to_dispatch = it.second;
631 break;
632 }
633 }
634
635 if (op.m_dispatch_handler) {
636 op.m_dispatch_handler(data_to_dispatch, cycle);
637 }
638 break;
639 }
640
642 if (!m_buffer_manager) {
643 error<std::invalid_argument>(Journal::Component::Kriya,
645 std::source_location::current(),
646 "BufferPipeline has no BufferManager for MODIFY operation");
647 }
648
649 if (!op.m_attached_processor) {
650 op.m_attached_processor = m_buffer_manager->attach_quick_process(
653 if (m_max_cycles != 0 && op.is_streaming()) {
655 }
656 }
657
658 if (op.m_modify_cycle_count > 0 && cycle >= op.m_modify_cycle_count - 1) {
659 if (op.m_attached_processor) {
660 m_buffer_manager->remove_processor(
662 op.m_target_buffer);
663 op.m_attached_processor = nullptr;
664 }
665 }
666
667 break;
668 }
669
671 break;
672
673 default:
676 "Unknown operation type in pipeline : {} : {}",
677 Reflect::enum_to_string(op.get_type()), std::to_string(static_cast<int>(op.get_type())));
678 break;
679 }
680 } catch (const std::exception& e) {
681 error_rethrow(Journal::Component::Kriya,
683 std::source_location::current(),
684 "Error processing operation in BufferPipeline: {}",
685 e.what());
686 }
687}
688
689std::shared_ptr<Vruta::SoundRoutine> BufferPipeline::dispatch_branch_async(BranchInfo& branch, uint64_t /*cycle*/)
690{
691 if (!m_scheduler)
692 return nullptr;
693
694 if (!m_coordinator) {
695 m_coordinator = std::make_unique<CycleCoordinator>(*m_scheduler);
696 }
697
698 branch.pipeline->m_active_self = branch.pipeline;
699
700 auto branch_routine = branch.pipeline->execute_internal(1, branch.samples_per_operation);
701
702 auto task = std::make_shared<Vruta::SoundRoutine>(std::move(branch_routine));
703 m_scheduler->add_task(task);
704
705 m_branch_tasks.push_back(task);
706
707 return task;
708}
709
711{
712 for (size_t i = 0; i < m_data_states.size(); ++i) {
714 if (i < m_operations.size() && m_operations[i].get_type() == BufferOperation::OpType::CAPTURE && m_operations[i].m_capture.get_mode() == BufferCapture::CaptureMode::TRANSIENT) {
715
716 if (m_operations[i].m_capture.m_data_expired_callback) {
717 auto it = m_operation_data.find(&m_operations[i]);
718 if (it != m_operation_data.end()) {
719 m_operations[i].m_capture.m_data_expired_callback(it->second, m_current_cycle);
720 }
721 }
722
724 } else {
726 }
727 }
728 }
729
730 auto it = m_operation_data.begin();
731 while (it != m_operation_data.end()) {
732 if (m_current_cycle > 2) {
733 it = m_operation_data.erase(it);
734 } else {
735 ++it;
736 }
737 }
738}
739
741{
742 for (auto& branch : m_branches) {
743 if (branch.pipeline) {
744 branch.pipeline->m_active_self.reset();
745 }
746 }
747
748 std::erase_if(m_branch_tasks, [](const auto& task) {
749 return !task || !task->is_active();
750 });
751}
752
753Vruta::SoundRoutine BufferPipeline::execute_internal(uint64_t max_cycles, uint64_t samples_per_operation)
754{
755 switch (m_execution_strategy) {
757 return execute_phased(max_cycles, samples_per_operation);
758
760 return execute_streaming(max_cycles, samples_per_operation);
761
763 return execute_parallel(max_cycles, samples_per_operation);
764
766 return execute_reactive(max_cycles, samples_per_operation);
767
768 default:
769 error<std::runtime_error>(Journal::Component::Kriya,
771 std::source_location::current(),
772 "Unknown execution strategy in BufferPipeline");
773 }
774}
775
776Vruta::SoundRoutine BufferPipeline::execute_phased(uint64_t max_cycles, uint64_t samples_per_operation)
777{
778 auto& promise = co_await Kriya::GetAudioPromise {};
779
780 if (m_operations.empty()) {
781 co_return;
782 }
783
785 uint32_t cycles_executed = 0;
786
787 while ((max_cycles == 0 || cycles_executed < max_cycles) && (m_continuous_execution || cycles_executed < max_cycles)) {
788
789 if (promise.should_terminate) {
790 break;
791 }
792
795 }
796
797 for (auto& state : m_data_states) {
798 if (state != DataState::EMPTY) {
799 state = DataState::EMPTY;
800 }
801 }
802
804
805 // ═══════════════════════════════════════════════════════
806 // PHASE 1: CAPTURE - Execute all capture operations
807 // ═══════════════════════════════════════════════════════
808 for (size_t i = 0; i < m_operations.size(); ++i) {
809 auto& op = m_operations[i];
810
812 continue;
813 }
814
815 if (op.get_type() == BufferOperation::OpType::CONDITION) {
816 if (!op.m_condition || !op.m_condition(m_current_cycle)) {
817 continue;
818 }
819 }
820
821 if (m_current_cycle % op.m_cycle_interval != 0) {
822 continue;
823 }
824
825 uint32_t op_iterations = 1;
826 if (op.get_type() == BufferOperation::OpType::CAPTURE) {
827 op_iterations = op.m_capture.get_cycle_count();
828 }
829
830 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
833 co_await BufferDelay { 1 };
834 } else if (m_capture_timing == Vruta::DelayContext::SAMPLE_BASED && samples_per_operation > 0) {
835 co_await SampleDelay { samples_per_operation };
836 }
837 }
838
840 }
841
842 // ═══════════════════════════════════════════════════════
843 // PHASE 2: PROCESS - Execute all processing operations
844 // ═══════════════════════════════════════════════════════
845
846 for (size_t i = 0; i < m_operations.size(); ++i) {
847 auto& op = m_operations[i];
848
850 continue;
851 }
852
854 continue;
855 }
856
857 if (op.get_type() == BufferOperation::OpType::CONDITION) {
858 if (!op.m_condition || !op.m_condition(m_current_cycle)) {
859 continue;
860 }
861 }
862
863 if (m_current_cycle % op.m_cycle_interval != 0) {
864 continue;
865 }
866
869
871 co_await BufferDelay { 1 };
872 } else if (m_process_timing == Vruta::DelayContext::SAMPLE_BASED && samples_per_operation > 0) {
873 co_await SampleDelay { samples_per_operation };
874 }
875 }
876
877 // ═══════════════════════════════════════════════════════
878 // Handle branches (if any)
879 // ═══════════════════════════════════════════════════════
880 std::vector<std::shared_ptr<Vruta::SoundRoutine>> current_cycle_sync_tasks;
881
882 for (auto& branch : m_branches) {
883 if (branch.condition(m_current_cycle)) {
884 auto task = dispatch_branch_async(branch, m_current_cycle);
885
886 if (branch.synchronous && task) {
887 current_cycle_sync_tasks.push_back(task);
888 }
889 }
890 }
891
892 if (!current_cycle_sync_tasks.empty()) {
893 bool any_active = true;
894 while (any_active) {
895 any_active = false;
896
897 for (auto& task : current_cycle_sync_tasks) {
898 if (task && task->is_active()) {
899 any_active = true;
900 break;
901 }
902 }
903
904 if (any_active) {
906 co_await BufferDelay { 1 };
907 } else {
908 co_await SampleDelay { 1 };
909 }
910 }
911 }
912 }
913
915
918 }
919
921
923 cycles_executed++;
924 }
925
926 if (m_on_complete)
928}
929
930Vruta::SoundRoutine BufferPipeline::execute_streaming(uint64_t max_cycles, uint64_t samples_per_operation)
931{
932 auto& promise = co_await Kriya::GetAudioPromise {};
933
934 if (m_operations.empty()) {
935 co_return;
936 }
937
939 uint32_t cycles_executed = 0;
940
941 while ((max_cycles == 0 || cycles_executed < max_cycles) && (m_continuous_execution || cycles_executed < max_cycles)) {
942
943 if (promise.should_terminate) {
944 break;
945 }
946
949 }
950
951 for (size_t i = 0; i < m_operations.size(); ++i) {
952 auto& op = m_operations[i];
953
954 if (op.get_type() == BufferOperation::OpType::CONDITION) {
955 if (!op.m_condition || !op.m_condition(m_current_cycle)) {
956 continue;
957 }
958 }
959
960 if (m_current_cycle % op.m_cycle_interval != 0) {
961 continue;
962 }
963
964 uint32_t op_iterations = 1;
965 if (op.get_type() == BufferOperation::OpType::CAPTURE) {
966 op_iterations = op.m_capture.get_cycle_count();
967 }
968
969 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
970
973
974 for (size_t j = i + 1; j < m_operations.size(); ++j) {
975 auto& dependent_op = m_operations[j];
976
978 process_operation(dependent_op, m_current_cycle + iter);
980 }
981
982 break;
983 }
984
986 co_await BufferDelay { 1 };
987 } else if (samples_per_operation > 0) {
988 co_await SampleDelay { samples_per_operation };
989 }
990 }
991 }
992
993 for (auto& branch : m_branches) {
994 if (branch.condition(m_current_cycle)) {
996 }
997 }
998
1000
1003 }
1004
1006
1008 cycles_executed++;
1009 }
1010
1011 if (m_on_complete)
1012 m_on_complete();
1013}
1014
1015Vruta::SoundRoutine BufferPipeline::execute_parallel(uint64_t max_cycles, uint64_t samples_per_operation)
1016{
1017 // TODO: Implement parallel execution strategy
1019 "PARALLEL strategy not yet implemented, using PHASED as fallback");
1020 return execute_phased(max_cycles, samples_per_operation);
1021}
1022
1023Vruta::SoundRoutine BufferPipeline::execute_reactive(uint64_t max_cycles, uint64_t samples_per_operation)
1024{
1025 // TODO: Implement reactive execution strategy
1027 "REACTIVE strategy not yet implemented, using PHASED as fallback");
1028 return execute_phased(max_cycles, samples_per_operation);
1029}
1030
1031}
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
Cycle Behavior: The for_cycles(N) configuration controls how many times the capture operation execute...
std::shared_ptr< Buffers::AudioBuffer > get_buffer() const
Definition Capture.hpp:167
OperationFunction m_data_ready_callback
Definition Capture.hpp:187
ProcessingControl get_processing_control() const
Definition Capture.hpp:169
@ TRANSIENT
Single cycle capture (default) - data expires after 1 cycle.
@ CIRCULAR
Circular buffer with overwrite.
@ ACCUMULATE
Accumulate over multiple cycles in container.
@ WINDOWED
Rolling window capture with overlap.
@ TRIGGERED
Capture only when condition met.
std::function< bool()> m_stop_condition
Definition Capture.hpp:186
CaptureMode get_mode() const
Definition Capture.hpp:168
uint32_t get_window_size() const
Definition Capture.hpp:173
uint32_t get_circular_size() const
Definition Capture.hpp:172
std::vector< std::shared_ptr< Buffers::AudioBuffer > > m_source_buffers
Buffers::AudioProcessingFunction m_buffer_modifier
std::shared_ptr< Buffers::AudioBuffer > m_target_buffer
std::vector< std::shared_ptr< Kakshya::DynamicSoundStream > > m_source_containers
static bool is_capture_phase_operation(const BufferOperation &op)
std::shared_ptr< Kakshya::DynamicSoundStream > m_target_container
bool is_streaming() const
Check if this operation is a streaming operation.
TransformVectorFunction m_fusion_function
std::shared_ptr< Buffers::BufferProcessor > m_attached_processor
BufferOperation & with_priority(uint8_t priority)
Set execution priority for scheduler ordering.
OpType get_type() const
Getters for internal state (read-only)
std::shared_ptr< Kakshya::DynamicSoundStream > m_source_container
TransformationFunction m_transformer
@ LOAD
Load data from container to buffer with position control.
@ CONDITION
Conditional operation for branching logic.
@ FUSE
Fuse multiple sources using custom fusion functions.
@ ROUTE
Route data to destination (buffer or container)
@ CAPTURE
Capture data from source buffer using BufferCapture strategy.
@ MODIFY
Modify Buffer Data using custom quick process.
@ DISPATCH
Dispatch to external handler for custom processing.
@ TRANSFORM
Apply transformation function to data variants.
static bool is_process_phase_operation(const BufferOperation &op)
Fundamental unit of operation in buffer processing pipelines.
std::unordered_map< BufferOperation *, Kakshya::DataVariant > m_operation_data
@ READY
Data ready for processing.
@ EXPIRED
Data has expired and should be cleaned up.
Vruta::SoundRoutine execute_reactive(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, const Kakshya::DataVariant &data)
std::shared_ptr< CycleCoordinator > m_coordinator
BufferPipeline & on_complete(std::function< void()> cb)
Register a callback fired once when pipeline execution ends.
BufferPipeline & branch_if(std::function< bool(uint32_t)> condition, const std::function< void(BufferPipeline &)> &branch_builder, bool synchronous=false, uint64_t samples_per_operation=1)
Add conditional branch to the pipeline.
void execute_for_cycles(uint64_t cycles=0)
Execute the pipeline for a specified number of cycles.
void process_operation(BufferOperation &op, uint64_t cycle)
Vruta::SoundRoutine execute_internal(uint64_t max_cycles, uint64_t samples_per_operation)
void execute_continuous()
Start continuous execution of the pipeline.
void execute_scheduled_at_rate(uint32_t max_cycles=0, double seconds_per_operation=1)
Execute pipeline with real-time rate control.
std::vector< BranchInfo > m_branches
void capture_operation(BufferOperation &op, uint64_t cycle)
std::shared_ptr< BufferPipeline > m_active_self
std::vector< BufferOperation > m_operations
void execute_once()
Execute the pipeline for a single cycle.
std::vector< std::shared_ptr< Vruta::SoundRoutine > > m_branch_tasks
std::vector< DataState > m_data_states
static Kakshya::DataVariant read_from_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, uint64_t start, uint32_t length)
Vruta::SoundRoutine execute_streaming(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_buffer(const std::shared_ptr< Buffers::AudioBuffer > &buffer, const Kakshya::DataVariant &data)
Vruta::SoundRoutine execute_parallel(uint64_t max_cycles, uint64_t samples_per_operation)
std::function< void(uint32_t)> m_cycle_end_callback
bool has_pending_data() const
Check if any operations have pending data ready for processing.
void mark_data_consumed(uint32_t operation_index)
Execute pipeline synchronized to audio hardware cycle boundaries.
std::function< void(uint32_t)> m_cycle_start_callback
std::function< void()> m_on_complete
void execute_buffer_rate(uint64_t max_cycles=0)
Execute pipeline synchronized to audio hardware cycle boundaries.
std::shared_ptr< Buffers::BufferManager > m_buffer_manager
BufferPipeline & parallel(std::initializer_list< BufferOperation > operations)
Execute operations in parallel within the current cycle.
bool has_immediate_routing(const BufferOperation &op) const
void execute_scheduled(uint64_t max_cycles=0, uint64_t samples_per_operation=1)
Execute pipeline with sample-accurate timing between operations.
Vruta::SoundRoutine execute_phased(uint64_t max_cycles, uint64_t samples_per_operation)
static Kakshya::DataVariant extract_buffer_data(const std::shared_ptr< Buffers::AudioBuffer > &buffer, bool should_process=false)
Vruta::TaskScheduler * m_scheduler
std::shared_ptr< Vruta::SoundRoutine > dispatch_branch_async(BranchInfo &branch, uint64_t cycle)
BufferPipeline & with_lifecycle(std::function< void(uint32_t)> on_cycle_start, std::function< void(uint32_t)> on_cycle_end)
Set lifecycle callbacks for cycle management.
Coroutine-based execution engine for composable, multi-strategy buffer processing.
Cross-pipeline synchronization and coordination system.
A C++20 coroutine-based audio processing task with sample-accurate timing.
Definition Routine.hpp:316
void add_task(const std::shared_ptr< Routine > &routine, const std::string &name="", bool initialize=false)
Add a routine to the scheduler based on its processing token.
Definition Scheduler.cpp:19
uint64_t seconds_to_samples(double seconds) const
Converts a time in seconds to a number of samples.
Token-based multimodal task scheduling system for unified coroutine processing.
Definition Scheduler.hpp:51
@ AUDIO_BACKEND
Standard audio processing backend configuration.
@ CoroutineScheduling
Coroutine scheduling and temporal coordination (Vruta::TaskScheduler)
@ Kriya
Automatable tasks and fluent scheduling api for Nodes and Buffers.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
Definition NDData.hpp:76
@ PHASED
PHASED: Traditional phased execution (default)
@ STREAMING
STREAMING: Immediate flow-through execution.
@ PARALLEL
PARALLEL: Concurrent capture with synchronization.
@ REACTIVE
REACTIVE: Data-driven reactive execution.
constexpr std::string_view enum_to_string(EnumType value) noexcept
Universal enum to string converter using magic_enum (original case)
@ SAMPLE_BASED
Sample-accurate delay (audio domain)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)
Awaiter for suspending until a buffer cycle boundary.
std::shared_ptr< BufferPipeline > pipeline
Templated awaitable for accessing a coroutine's promise object.
Awaitable object for precise sample-accurate timing delays.