MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferPipeline.cpp
Go to the documentation of this file.
1#include "BufferPipeline.hpp"
2
4
7
10
12
13namespace MayaFlux::Kriya {
14
15BufferPipeline::BufferPipeline(Vruta::TaskScheduler& scheduler, std::shared_ptr<Buffers::BufferManager> buffer_manager)
16 : m_coordinator(std::make_shared<CycleCoordinator>(scheduler))
17 , m_buffer_manager(std::move(buffer_manager))
18 , m_scheduler(&scheduler)
19{
20}
21
23{
24 if (!m_buffer_manager) {
25 return;
26 }
27
28 for (auto& op : m_operations) {
29 if (op.m_attached_processor) {
30 if (op.get_type() == BufferOperation::OpType::ROUTE && op.m_target_buffer) {
31 m_buffer_manager->remove_processor(op.m_attached_processor, op.m_target_buffer);
32 }
33 if (op.get_type() == BufferOperation::OpType::MODIFY && op.m_target_buffer) {
34 m_buffer_manager->remove_processor(op.m_attached_processor, op.m_target_buffer);
35 }
36 op.m_attached_processor = nullptr;
37 }
38 }
39}
40
42 std::function<bool(uint32_t)> condition,
43 const std::function<void(BufferPipeline&)>& branch_builder,
44 bool synchronous,
45 uint64_t samples_per_operation)
46{
47 auto branch_pipeline = std::make_shared<BufferPipeline>();
48 if (m_scheduler) {
49 branch_pipeline->m_scheduler = m_scheduler;
50 }
51 branch_builder(*branch_pipeline);
52
53 m_branches.push_back({ .condition = std::move(condition),
54 .pipeline = std::move(branch_pipeline),
55 .synchronous = synchronous,
56 .samples_per_operation = samples_per_operation });
57
58 return *this;
59}
60
61BufferPipeline& BufferPipeline::parallel(std::initializer_list<BufferOperation> operations)
62{
63 for (auto& op : operations) {
64 BufferOperation parallel_op = BufferOperation(op);
65 parallel_op.with_priority(255);
66 m_operations.emplace_back(std::move(parallel_op));
67 }
68 return *this;
69}
70
72 std::function<void(uint32_t)> on_cycle_start,
73 std::function<void(uint32_t)> on_cycle_end)
74{
75 m_cycle_start_callback = std::move(on_cycle_start);
76 m_cycle_end_callback = std::move(on_cycle_end);
77 return *this;
78}
79
80void BufferPipeline::execute_buffer_rate(uint64_t max_cycles)
81{
82 if (!m_scheduler) {
83 error<std::runtime_error>(Journal::Component::Kriya,
85 std::source_location::current(),
86 "Pipeline requires scheduler for execution");
87 }
88
89 auto self = shared_from_this();
90
91 if (max_cycles == 0) {
92 max_cycles = UINT64_MAX;
94 }
95 m_max_cycles = max_cycles;
96
97 auto routine = std::make_shared<Vruta::SoundRoutine>(
98 execute_internal(max_cycles, 0));
99
100 m_scheduler->add_task(std::move(routine));
101
102 m_active_self = self;
103}
104
106{
107 if (!m_scheduler) {
108 error<std::runtime_error>(Journal::Component::Kriya,
110 std::source_location::current(),
111 "Pipeline requires scheduler for execution");
112 }
113 auto self = shared_from_this();
114
115 m_max_cycles = 1;
116 auto routine = std::make_shared<Vruta::SoundRoutine>(
117 execute_internal(1, 0));
118 m_scheduler->add_task(std::move(routine));
119 m_active_self = self;
120}
121
123{
124 if (!m_scheduler) {
125 error<std::runtime_error>(Journal::Component::Kriya,
127 std::source_location::current(),
128 "Pipeline requires scheduler for execution");
129 }
130
131 auto self = shared_from_this();
132
133 if (cycles == 0) {
134 cycles = UINT64_MAX;
136 }
137 m_max_cycles = cycles;
138 auto routine = std::make_shared<Vruta::SoundRoutine>(
139 execute_internal(cycles, 0));
140 m_scheduler->add_task(std::move(routine));
141 m_active_self = self;
142}
143
149
151 uint64_t max_cycles,
152 uint64_t samples_per_operation)
153{
154 if (!m_scheduler) {
155 error<std::runtime_error>(Journal::Component::Kriya,
157 std::source_location::current(),
158 "Pipeline must have scheduler for scheduled execution");
159 }
160
161 auto self = shared_from_this();
162
163 if (max_cycles == 0) {
164 max_cycles = UINT64_MAX;
166 }
167
168 m_max_cycles = max_cycles;
169
170 auto routine = std::make_shared<Vruta::SoundRoutine>(
171 execute_internal(max_cycles, samples_per_operation));
172
173 m_scheduler->add_task(std::move(routine));
174
175 m_active_self = self;
176}
177
179 uint32_t max_cycles,
180 double seconds_per_operation)
181{
182 if (!m_scheduler) {
183 error<std::runtime_error>(Journal::Component::Kriya,
185 std::source_location::current(),
186 "Pipeline must have scheduler for scheduled execution");
187 }
188
189 uint64_t samples = m_scheduler->seconds_to_samples(seconds_per_operation);
190 execute_scheduled(max_cycles, samples);
191}
192
193void BufferPipeline::mark_data_consumed(uint32_t operation_index)
194{
195 if (operation_index < m_data_states.size()) {
196 m_data_states[operation_index] = DataState::CONSUMED;
197 }
198}
199
201{
202 return std::ranges::any_of(m_data_states,
203 [](DataState state) { return state == DataState::READY; });
204}
205
207{
208 m_on_complete = std::move(cb);
209 return *this;
210}
211
212Kakshya::DataVariant BufferPipeline::extract_buffer_data(const std::shared_ptr<Buffers::AudioBuffer>& buffer, bool should_process)
213{
214 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
215 if (audio_buffer) {
216 if (should_process) {
217 audio_buffer->process_default();
218 }
219 const auto& data_span = audio_buffer->get_data();
220 std::vector<double> data_vector(data_span.begin(), data_span.end());
221 return data_vector;
222 }
223
224 return std::vector<double> {};
225}
226
227void BufferPipeline::write_to_buffer(const std::shared_ptr<Buffers::AudioBuffer>& buffer, const Kakshya::DataVariant& data)
228{
229 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
230 if (audio_buffer) {
231 try {
232 auto audio_data = std::get<std::vector<double>>(data);
233 auto& buffer_data = audio_buffer->get_data();
234
235 if (buffer_data.size() != audio_data.size()) {
236 buffer_data.resize(audio_data.size());
237 }
238
239 std::ranges::copy(audio_data, buffer_data.begin());
240
241 } catch (const std::bad_variant_access& e) {
242 error_rethrow(Journal::Component::Kriya,
244 std::source_location::current(),
245 "Data type mismatch when writing to audio buffer: {}",
246 e.what());
247 }
248 }
249
250 // TODO: Handle other buffer types
251}
252
253void BufferPipeline::write_to_container(const std::shared_ptr<Kakshya::DynamicSoundStream>& container, const Kakshya::DataVariant& data)
254{
255 try {
256 auto audio_data = std::get<std::vector<double>>(data);
257 std::span<const double> data_span(audio_data.data(), audio_data.size());
258
259 container->write_frames(data_span, 0);
260
261 } catch (const std::bad_variant_access& e) {
262 error_rethrow(Journal::Component::Kriya,
264 std::source_location::current(),
265 "Data type mismatch when writing to container: {}",
266 e.what());
267 } catch (const std::exception& e) {
268 error_rethrow(Journal::Component::Kriya,
270 std::source_location::current(),
271 "Error writing to container: {}",
272 e.what());
273 }
274}
275
276Kakshya::DataVariant BufferPipeline::read_from_container(const std::shared_ptr<Kakshya::DynamicSoundStream>& container,
277 uint64_t /*start_frame*/,
278 uint32_t length)
279{
280 try {
281 uint32_t read_length = length;
282 if (read_length == 0) {
283 read_length = static_cast<uint32_t>(container->get_total_elements() / container->get_num_channels());
284 }
285
286 std::vector<double> output_data(static_cast<size_t>(read_length * container->get_num_channels()));
287 std::span<double> output_span(output_data.data(), output_data.size());
288
289 uint64_t frames_read = container->read_frames(output_span, read_length);
290
291 if (frames_read < output_data.size()) {
292 output_data.resize(frames_read);
293 }
294
295 return output_data;
296
297 } catch (const std::exception& e) {
298 error_rethrow(Journal::Component::Kriya,
300 std::source_location::current(),
301 "Error reading from container: {}",
302 e.what());
303
304 return std::vector<double> {};
305 }
306}
307
309{
311 auto buffer_data = extract_buffer_data(op.m_capture.get_buffer(), should_process);
312
314 op.m_capture.m_data_ready_callback(buffer_data, cycle);
315 }
316
317 auto capture_mode = op.m_capture.get_mode();
318
319 switch (capture_mode) {
321 m_operation_data[&op] = buffer_data;
322 break;
323
325 auto it = m_operation_data.find(&op);
326 if (it == m_operation_data.end()) {
327 m_operation_data[&op] = buffer_data;
328 } else {
329 try {
330 auto& existing = std::get<std::vector<double>>(it->second);
331 const auto& new_data = std::get<std::vector<double>>(buffer_data);
332 existing.insert(existing.end(), new_data.begin(), new_data.end());
333
334 } catch (const std::bad_variant_access& e) {
337 "Data type mismatch during ACCUMULATE capture: {}",
338 e.what());
339 m_operation_data[&op] = buffer_data;
340 }
341 }
342 break;
343 }
345 uint32_t circular_size = op.m_capture.get_circular_size();
346 if (circular_size == 0) {
347 circular_size = 4096;
348 }
349
350 auto it = m_operation_data.find(&op);
351 if (it == m_operation_data.end()) {
352 m_operation_data[&op] = buffer_data;
353 } else {
354 try {
355 auto& circular = std::get<std::vector<double>>(it->second);
356 const auto& new_data = std::get<std::vector<double>>(buffer_data);
357
358 circular.insert(circular.end(), new_data.begin(), new_data.end());
359
360 if (circular.size() > circular_size) {
361 circular.erase(circular.begin(),
362 circular.begin() + static_cast<int64_t>(circular.size() - circular_size));
363 }
364
365 } catch (const std::bad_variant_access& e) {
368 "Data type mismatch during CIRCULAR capture: {}",
369 e.what());
370 m_operation_data[&op] = buffer_data;
371 } catch (std::exception& e) {
372 error_rethrow(Journal::Component::Kriya,
374 std::source_location::current(),
375 "Error during CIRCULAR capture: {}",
376 e.what());
377 m_operation_data[&op] = buffer_data;
378 }
379 }
380 break;
381 }
383 uint32_t window_size = op.m_capture.get_window_size();
384 float overlap_ratio = op.m_capture.get_overlap_ratio();
385
386 if (window_size == 0) {
387 window_size = 512;
388 }
389
390 auto hop_size = static_cast<uint32_t>((float)window_size * (1.0F - overlap_ratio));
391 if (hop_size == 0)
392 hop_size = 1;
393
394 auto it = m_operation_data.find(&op);
395 if (it == m_operation_data.end()) {
396 m_operation_data[&op] = buffer_data;
397 } else {
398 try {
399 auto& windowed = std::get<std::vector<double>>(it->second);
400 const auto& new_data = std::get<std::vector<double>>(buffer_data);
401
402 if (windowed.size() >= window_size) {
403 if (hop_size >= windowed.size()) {
404 windowed = std::get<std::vector<double>>(buffer_data);
405 } else {
406 windowed.erase(windowed.begin(),
407 windowed.begin() + hop_size);
408
409 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
410
411 if (windowed.size() > window_size) {
412 size_t excess = windowed.size() - window_size;
413 windowed.erase(windowed.begin(),
414 windowed.begin() + excess);
415 }
416 }
417 } else {
418 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
419
420 if (windowed.size() > window_size) {
421 size_t excess = windowed.size() - window_size;
422 windowed.erase(windowed.begin(),
423 windowed.begin() + excess);
424 }
425 }
426
427 } catch (const std::bad_variant_access& e) {
429 "Data type mismatch during WINDOWED capture: {}", e.what());
430 m_operation_data[&op] = buffer_data;
431 }
432 }
433 break;
434 }
435
438 m_operation_data[&op] = buffer_data;
439 }
440 break;
441 }
442
443 default:
444 m_operation_data[&op] = buffer_data;
445 break;
446 }
447
448 if (has_immediate_routing(op)) {
449 auto current_it = std::ranges::find_if(m_operations,
450 [&op](const BufferOperation& o) { return &o == &op; });
451
452 if (current_it != m_operations.end()) {
453 auto next_it = std::next(current_it);
454 if (next_it != m_operations.end() && next_it->get_type() == BufferOperation::OpType::ROUTE) {
455
456 if (next_it->m_target_buffer) {
457 if (!m_buffer_manager) {
458 error<std::invalid_argument>(Journal::Component::Kriya,
460 std::source_location::current(),
461 "BufferPipeline has no BufferManager for immediate ROUTE-to-buffer");
462 }
463
464 if (!next_it->m_attached_processor) {
465 auto writer = std::make_shared<Buffers::AudioWriteProcessor>();
466 m_buffer_manager->add_processor(writer, next_it->m_target_buffer,
468 next_it->m_attached_processor = writer;
469 }
470
471 std::static_pointer_cast<Buffers::AudioWriteProcessor>(next_it->m_attached_processor)
472 ->set_data(buffer_data);
473 } else if (next_it->m_target_container) {
474 write_to_container(next_it->m_target_container, buffer_data);
475 }
476
477 size_t route_index = std::distance(m_operations.begin(), next_it);
478 if (route_index < m_data_states.size()) {
479 m_data_states[route_index] = DataState::CONSUMED;
480 }
481 }
482 }
483 }
484}
485
487{
488 for (auto& op : m_operations) {
489 if (op.get_type() == BufferOperation::OpType::CAPTURE) {
490 auto mode = op.m_capture.get_mode();
492 m_operation_data.erase(&op);
493 }
494 }
495 }
496}
497
499{
500 auto it = std::ranges::find_if(m_operations,
501 [&op](const BufferOperation& o) { return &o == &op; });
502
503 if (it == m_operations.end() || std::next(it) == m_operations.end()) {
504 return false;
505 }
506
507 auto next_op = std::next(it);
508 return next_op->get_type() == BufferOperation::OpType::ROUTE;
509}
510
512{
513 try {
514 switch (op.get_type()) {
516 capture_operation(op, cycle);
517
518 break;
519 }
520
522 Kakshya::DataVariant input_data;
523 if (m_operation_data.find(&op) != m_operation_data.end()) {
524 input_data = m_operation_data[&op];
525 } else {
526 for (auto& it : m_operation_data) {
527 input_data = it.second;
528 break;
529 }
530 }
531
532 if (op.m_transformer) {
533 auto transformed = op.m_transformer(input_data, cycle);
534 m_operation_data[&op] = transformed;
535
536 const bool has_downstream_route = std::ranges::any_of(
538 [](const BufferOperation& o) {
540 });
541
542 if (!has_downstream_route) {
543 for (auto& candidate : std::ranges::reverse_view(m_operations)) {
544 if (&candidate == &op)
545 continue;
546 if (candidate.get_type() != BufferOperation::OpType::CAPTURE
547 || !candidate.m_capture.get_buffer()) {
548 continue;
549 }
550 const auto buf = candidate.m_capture.get_buffer();
551 if (std::holds_alternative<std::vector<double>>(transformed)
552 && std::get<std::vector<double>>(transformed).size() == buf->get_data().size()) {
553 write_to_buffer(buf, transformed);
554 }
555 break;
556 }
557 }
558 }
559 break;
560 }
561
563 Kakshya::DataVariant data_to_route;
564 if (m_operation_data.find(&op) != m_operation_data.end()) {
565 data_to_route = m_operation_data[&op];
566 } else {
567 for (auto& it : m_operation_data) {
568 data_to_route = it.second;
569 break;
570 }
571 }
572
573 if (op.m_target_buffer) {
574 if (!m_buffer_manager) {
575 error<std::invalid_argument>(Journal::Component::Kriya,
577 std::source_location::current(),
578 "BufferPipeline has no BufferManager for ROUTE-to-buffer operation");
579 }
580
581 if (!op.m_attached_processor) {
582 auto writer = std::make_shared<Buffers::AudioWriteProcessor>();
583 m_buffer_manager->add_processor(writer, op.m_target_buffer,
585 op.m_attached_processor = writer;
586 }
587
588 std::static_pointer_cast<Buffers::AudioWriteProcessor>(op.m_attached_processor)
589 ->set_data(data_to_route);
590
591 } else if (op.m_target_container) {
592 write_to_container(op.m_target_container, data_to_route);
593 }
594 break;
595 }
596
598 auto loaded_data = read_from_container(op.m_source_container,
599 op.m_start_frame,
600 op.m_load_length);
601
602 if (op.m_target_buffer) {
603 write_to_buffer(op.m_target_buffer, loaded_data);
604 }
605
606 m_operation_data[&op] = loaded_data;
607 break;
608 }
609
611 std::vector<Kakshya::DataVariant> fusion_inputs;
612
613 for (auto& source_buffer : op.m_source_buffers) {
615 auto buffer_data = extract_buffer_data(source_buffer, should_process);
616 fusion_inputs.push_back(buffer_data);
617 }
618
619 for (auto& source_container : op.m_source_containers) {
620 auto container_data = read_from_container(source_container, 0, 0);
621 fusion_inputs.push_back(container_data);
622 }
623
624 if (op.m_fusion_function && !fusion_inputs.empty()) {
625 auto fused_data = op.m_fusion_function(fusion_inputs, cycle);
626
627 if (op.m_target_buffer) {
628 write_to_buffer(op.m_target_buffer, fused_data);
629 } else if (op.m_target_container) {
631 }
632
633 m_operation_data[&op] = fused_data;
634 }
635 break;
636 }
637
639 Kakshya::DataVariant data_to_dispatch;
640 if (m_operation_data.find(&op) != m_operation_data.end()) {
641 data_to_dispatch = m_operation_data[&op];
642 } else {
643 for (auto& it : m_operation_data) {
644 data_to_dispatch = it.second;
645 break;
646 }
647 }
648
649 if (op.m_dispatch_handler) {
650 op.m_dispatch_handler(data_to_dispatch, cycle);
651 }
652 break;
653 }
654
656 if (!m_buffer_manager) {
657 error<std::invalid_argument>(Journal::Component::Kriya,
659 std::source_location::current(),
660 "BufferPipeline has no BufferManager for MODIFY operation");
661 }
662
663 if (!op.m_attached_processor) {
664 op.m_attached_processor = m_buffer_manager->attach_quick_process(
667 if (m_max_cycles != 0 && op.is_streaming()) {
669 }
670 }
671
672 if (op.m_modify_cycle_count > 0 && cycle >= op.m_modify_cycle_count - 1) {
673 if (op.m_attached_processor) {
674 m_buffer_manager->remove_processor(
676 op.m_target_buffer);
677 op.m_attached_processor = nullptr;
678 }
679 }
680
681 break;
682 }
683
685 break;
686
687 default:
690 "Unknown operation type in pipeline : {} : {}",
691 Reflect::enum_to_string(op.get_type()), std::to_string(static_cast<int>(op.get_type())));
692 break;
693 }
694 } catch (const std::exception& e) {
695 error_rethrow(Journal::Component::Kriya,
697 std::source_location::current(),
698 "Error processing operation in BufferPipeline: {}",
699 e.what());
700 }
701}
702
703std::shared_ptr<Vruta::SoundRoutine> BufferPipeline::dispatch_branch_async(BranchInfo& branch, uint64_t /*cycle*/)
704{
705 if (!m_scheduler)
706 return nullptr;
707
708 if (!m_coordinator) {
709 m_coordinator = std::make_unique<CycleCoordinator>(*m_scheduler);
710 }
711
712 branch.pipeline->m_active_self = branch.pipeline;
713
714 auto branch_routine = branch.pipeline->execute_internal(1, branch.samples_per_operation);
715
716 auto task = std::make_shared<Vruta::SoundRoutine>(std::move(branch_routine));
717 m_scheduler->add_task(task);
718
719 m_branch_tasks.push_back(task);
720
721 return task;
722}
723
725{
726 for (size_t i = 0; i < m_data_states.size(); ++i) {
728 if (i < m_operations.size() && m_operations[i].get_type() == BufferOperation::OpType::CAPTURE && m_operations[i].m_capture.get_mode() == BufferCapture::CaptureMode::TRANSIENT) {
729
730 if (m_operations[i].m_capture.m_data_expired_callback) {
731 auto it = m_operation_data.find(&m_operations[i]);
732 if (it != m_operation_data.end()) {
733 m_operations[i].m_capture.m_data_expired_callback(it->second, m_current_cycle);
734 }
735 }
736
738 } else {
740 }
741 }
742 }
743
744 auto it = m_operation_data.begin();
745 while (it != m_operation_data.end()) {
746 if (m_current_cycle > 2) {
747 it = m_operation_data.erase(it);
748 } else {
749 ++it;
750 }
751 }
752}
753
755{
756 for (auto& branch : m_branches) {
757 if (branch.pipeline) {
758 branch.pipeline->m_active_self.reset();
759 }
760 }
761
762 std::erase_if(m_branch_tasks, [](const auto& task) {
763 return !task || !task->is_active();
764 });
765}
766
767Vruta::SoundRoutine BufferPipeline::execute_internal(uint64_t max_cycles, uint64_t samples_per_operation)
768{
769 switch (m_execution_strategy) {
771 return execute_phased(max_cycles, samples_per_operation);
772
774 return execute_streaming(max_cycles, samples_per_operation);
775
777 return execute_parallel(max_cycles, samples_per_operation);
778
780 return execute_reactive(max_cycles, samples_per_operation);
781
782 default:
783 error<std::runtime_error>(Journal::Component::Kriya,
785 std::source_location::current(),
786 "Unknown execution strategy in BufferPipeline");
787 }
788}
789
790Vruta::SoundRoutine BufferPipeline::execute_phased(uint64_t max_cycles, uint64_t samples_per_operation)
791{
792 auto& promise = co_await Kriya::GetAudioPromise {};
793
794 if (m_operations.empty()) {
795 co_return;
796 }
797
799 uint32_t cycles_executed = 0;
800
801 while ((max_cycles == 0 || cycles_executed < max_cycles) && (m_continuous_execution || cycles_executed < max_cycles)) {
802
803 if (promise.should_terminate) {
804 break;
805 }
806
809 }
810
811 for (auto& state : m_data_states) {
812 if (state != DataState::EMPTY) {
813 state = DataState::EMPTY;
814 }
815 }
816
818
819 // ═══════════════════════════════════════════════════════
820 // PHASE 1: CAPTURE - Execute all capture operations
821 // ═══════════════════════════════════════════════════════
822 for (size_t i = 0; i < m_operations.size(); ++i) {
823 auto& op = m_operations[i];
824
826 continue;
827 }
828
829 if (op.get_type() == BufferOperation::OpType::CONDITION) {
830 if (!op.m_condition || !op.m_condition(m_current_cycle)) {
831 continue;
832 }
833 }
834
835 if (m_current_cycle % op.m_cycle_interval != 0) {
836 continue;
837 }
838
839 uint32_t op_iterations = 1;
840 if (op.get_type() == BufferOperation::OpType::CAPTURE) {
841 op_iterations = op.m_capture.get_cycle_count();
842 }
843
844 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
847 co_await BufferDelay { 1 };
848 } else if (m_capture_timing == Vruta::DelayContext::SAMPLE_BASED && samples_per_operation > 0) {
849 co_await SampleDelay { samples_per_operation };
850 }
851 }
852
854 }
855
856 // ═══════════════════════════════════════════════════════
857 // PHASE 2: PROCESS - Execute all processing operations
858 // ═══════════════════════════════════════════════════════
859
860 for (size_t i = 0; i < m_operations.size(); ++i) {
861 auto& op = m_operations[i];
862
864 continue;
865 }
866
868 continue;
869 }
870
871 if (op.get_type() == BufferOperation::OpType::CONDITION) {
872 if (!op.m_condition || !op.m_condition(m_current_cycle)) {
873 continue;
874 }
875 }
876
877 if (m_current_cycle % op.m_cycle_interval != 0) {
878 continue;
879 }
880
883
885 co_await BufferDelay { 1 };
886 } else if (m_process_timing == Vruta::DelayContext::SAMPLE_BASED && samples_per_operation > 0) {
887 co_await SampleDelay { samples_per_operation };
888 }
889 }
890
891 // ═══════════════════════════════════════════════════════
892 // Handle branches (if any)
893 // ═══════════════════════════════════════════════════════
894 std::vector<std::shared_ptr<Vruta::SoundRoutine>> current_cycle_sync_tasks;
895
896 for (auto& branch : m_branches) {
897 if (branch.condition(m_current_cycle)) {
898 auto task = dispatch_branch_async(branch, m_current_cycle);
899
900 if (branch.synchronous && task) {
901 current_cycle_sync_tasks.push_back(task);
902 }
903 }
904 }
905
906 if (!current_cycle_sync_tasks.empty()) {
907 bool any_active = true;
908 while (any_active) {
909 any_active = false;
910
911 for (auto& task : current_cycle_sync_tasks) {
912 if (task && task->is_active()) {
913 any_active = true;
914 break;
915 }
916 }
917
918 if (any_active) {
920 co_await BufferDelay { 1 };
921 } else {
922 co_await SampleDelay { 1 };
923 }
924 }
925 }
926 }
927
929
932 }
933
935
937 cycles_executed++;
938 }
939
940 if (m_on_complete)
942}
943
944Vruta::SoundRoutine BufferPipeline::execute_streaming(uint64_t max_cycles, uint64_t samples_per_operation)
945{
946 auto& promise = co_await Kriya::GetAudioPromise {};
947
948 if (m_operations.empty()) {
949 co_return;
950 }
951
953 uint32_t cycles_executed = 0;
954
955 while ((max_cycles == 0 || cycles_executed < max_cycles) && (m_continuous_execution || cycles_executed < max_cycles)) {
956
957 if (promise.should_terminate) {
958 break;
959 }
960
963 }
964
965 for (size_t i = 0; i < m_operations.size(); ++i) {
966 auto& op = m_operations[i];
967
968 if (op.get_type() == BufferOperation::OpType::CONDITION) {
969 if (!op.m_condition || !op.m_condition(m_current_cycle)) {
970 continue;
971 }
972 }
973
974 if (m_current_cycle % op.m_cycle_interval != 0) {
975 continue;
976 }
977
978 uint32_t op_iterations = 1;
979 if (op.get_type() == BufferOperation::OpType::CAPTURE) {
980 op_iterations = op.m_capture.get_cycle_count();
981 }
982
983 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
984
987
988 for (size_t j = i + 1; j < m_operations.size(); ++j) {
989 auto& dependent_op = m_operations[j];
990
992 process_operation(dependent_op, m_current_cycle + iter);
994 }
995
996 break;
997 }
998
1000 co_await BufferDelay { 1 };
1001 } else if (samples_per_operation > 0) {
1002 co_await SampleDelay { samples_per_operation };
1003 }
1004 }
1005 }
1006
1007 for (auto& branch : m_branches) {
1008 if (branch.condition(m_current_cycle)) {
1010 }
1011 }
1012
1014
1017 }
1018
1020
1022 cycles_executed++;
1023 }
1024
1025 if (m_on_complete)
1026 m_on_complete();
1027}
1028
1029Vruta::SoundRoutine BufferPipeline::execute_parallel(uint64_t max_cycles, uint64_t samples_per_operation)
1030{
1031 // TODO: Implement parallel execution strategy
1033 "PARALLEL strategy not yet implemented, using PHASED as fallback");
1034 return execute_phased(max_cycles, samples_per_operation);
1035}
1036
1037Vruta::SoundRoutine BufferPipeline::execute_reactive(uint64_t max_cycles, uint64_t samples_per_operation)
1038{
1039 // TODO: Implement reactive execution strategy
1041 "REACTIVE strategy not yet implemented, using PHASED as fallback");
1042 return execute_phased(max_cycles, samples_per_operation);
1043}
1044
1045}
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
Cycle Behavior: The for_cycles(N) configuration controls how many times the capture operation execute...
std::shared_ptr< Buffers::AudioBuffer > get_buffer() const
Definition Capture.hpp:167
OperationFunction m_data_ready_callback
Definition Capture.hpp:187
ProcessingControl get_processing_control() const
Definition Capture.hpp:169
@ TRANSIENT
Single cycle capture (default) - data expires after 1 cycle.
@ CIRCULAR
Circular buffer with overwrite.
@ ACCUMULATE
Accumulate over multiple cycles in container.
@ WINDOWED
Rolling window capture with overlap.
@ TRIGGERED
Capture only when condition met.
std::function< bool()> m_stop_condition
Definition Capture.hpp:186
CaptureMode get_mode() const
Definition Capture.hpp:168
uint32_t get_window_size() const
Definition Capture.hpp:173
uint32_t get_circular_size() const
Definition Capture.hpp:172
std::vector< std::shared_ptr< Buffers::AudioBuffer > > m_source_buffers
Buffers::AudioProcessingFunction m_buffer_modifier
std::shared_ptr< Buffers::AudioBuffer > m_target_buffer
std::vector< std::shared_ptr< Kakshya::DynamicSoundStream > > m_source_containers
static bool is_capture_phase_operation(const BufferOperation &op)
std::shared_ptr< Kakshya::DynamicSoundStream > m_target_container
bool is_streaming() const
Check if this operation is a streaming operation.
TransformVectorFunction m_fusion_function
std::shared_ptr< Buffers::BufferProcessor > m_attached_processor
BufferOperation & with_priority(uint8_t priority)
Set execution priority for scheduler ordering.
OpType get_type() const
Getters for internal state (read-only)
std::shared_ptr< Kakshya::DynamicSoundStream > m_source_container
TransformationFunction m_transformer
@ LOAD
Load data from container to buffer with position control.
@ CONDITION
Conditional operation for branching logic.
@ FUSE
Fuse multiple sources using custom fusion functions.
@ ROUTE
Route data to destination (buffer or container)
@ CAPTURE
Capture data from source buffer using BufferCapture strategy.
@ MODIFY
Modify Buffer Data using custom quick process.
@ DISPATCH
Dispatch to external handler for custom processing.
@ TRANSFORM
Apply transformation function to data variants.
static bool is_process_phase_operation(const BufferOperation &op)
Fundamental unit of operation in buffer processing pipelines.
std::unordered_map< BufferOperation *, Kakshya::DataVariant > m_operation_data
@ READY
Data ready for processing.
@ EXPIRED
Data has expired and should be cleaned up.
Vruta::SoundRoutine execute_reactive(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, const Kakshya::DataVariant &data)
std::shared_ptr< CycleCoordinator > m_coordinator
BufferPipeline & on_complete(std::function< void()> cb)
Register a callback fired once when pipeline execution ends.
BufferPipeline & branch_if(std::function< bool(uint32_t)> condition, const std::function< void(BufferPipeline &)> &branch_builder, bool synchronous=false, uint64_t samples_per_operation=1)
Add conditional branch to the pipeline.
void execute_for_cycles(uint64_t cycles=0)
Execute the pipeline for a specified number of cycles.
void process_operation(BufferOperation &op, uint64_t cycle)
Vruta::SoundRoutine execute_internal(uint64_t max_cycles, uint64_t samples_per_operation)
void execute_continuous()
Start continuous execution of the pipeline.
void execute_scheduled_at_rate(uint32_t max_cycles=0, double seconds_per_operation=1)
Execute pipeline with real-time rate control.
std::vector< BranchInfo > m_branches
void capture_operation(BufferOperation &op, uint64_t cycle)
std::shared_ptr< BufferPipeline > m_active_self
std::vector< BufferOperation > m_operations
void execute_once()
Execute the pipeline for a single cycle.
std::vector< std::shared_ptr< Vruta::SoundRoutine > > m_branch_tasks
std::vector< DataState > m_data_states
static Kakshya::DataVariant read_from_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, uint64_t start, uint32_t length)
Vruta::SoundRoutine execute_streaming(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_buffer(const std::shared_ptr< Buffers::AudioBuffer > &buffer, const Kakshya::DataVariant &data)
Vruta::SoundRoutine execute_parallel(uint64_t max_cycles, uint64_t samples_per_operation)
std::function< void(uint32_t)> m_cycle_end_callback
bool has_pending_data() const
Check if any operations have pending data ready for processing.
void mark_data_consumed(uint32_t operation_index)
Execute pipeline synchronized to audio hardware cycle boundaries.
std::function< void(uint32_t)> m_cycle_start_callback
std::function< void()> m_on_complete
void execute_buffer_rate(uint64_t max_cycles=0)
Execute pipeline synchronized to audio hardware cycle boundaries.
std::shared_ptr< Buffers::BufferManager > m_buffer_manager
BufferPipeline & parallel(std::initializer_list< BufferOperation > operations)
Execute operations in parallel within the current cycle.
bool has_immediate_routing(const BufferOperation &op) const
void execute_scheduled(uint64_t max_cycles=0, uint64_t samples_per_operation=1)
Execute pipeline with sample-accurate timing between operations.
Vruta::SoundRoutine execute_phased(uint64_t max_cycles, uint64_t samples_per_operation)
static Kakshya::DataVariant extract_buffer_data(const std::shared_ptr< Buffers::AudioBuffer > &buffer, bool should_process=false)
Vruta::TaskScheduler * m_scheduler
std::shared_ptr< Vruta::SoundRoutine > dispatch_branch_async(BranchInfo &branch, uint64_t cycle)
BufferPipeline & with_lifecycle(std::function< void(uint32_t)> on_cycle_start, std::function< void(uint32_t)> on_cycle_end)
Set lifecycle callbacks for cycle management.
Coroutine-based execution engine for composable, multi-strategy buffer processing.
Cross-pipeline synchronization and coordination system.
A C++20 coroutine-based audio processing task with sample-accurate timing.
Definition Routine.hpp:316
void add_task(const std::shared_ptr< Routine > &routine, const std::string &name="", bool initialize=false)
Add a routine to the scheduler based on its processing token.
Definition Scheduler.cpp:23
uint64_t seconds_to_samples(double seconds) const
Converts a time in seconds to a number of samples.
Token-based multimodal task scheduling system for unified coroutine processing.
Definition Scheduler.hpp:51
@ AUDIO_BACKEND
Standard audio processing backend configuration.
@ CoroutineScheduling
Coroutine scheduling and temporal coordination (Vruta::TaskScheduler)
@ Kriya
Automatable tasks and fluent scheduling api for Nodes and Buffers.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
Definition NDData.hpp:76
@ PHASED
PHASED: Traditional phased execution (default)
@ STREAMING
STREAMING: Immediate flow-through execution.
@ PARALLEL
PARALLEL: Concurrent capture with synchronization.
@ REACTIVE
REACTIVE: Data-driven reactive execution.
constexpr std::string_view enum_to_string(EnumType value) noexcept
Universal enum to string converter using magic_enum (original case)
@ SAMPLE_BASED
Sample-accurate delay (audio domain)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)
Awaiter for suspending until a buffer cycle boundary.
std::shared_ptr< BufferPipeline > pipeline
Templated awaitable for accessing a coroutine's promise object.
Awaitable object for precise sample-accurate timing delays.