MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferPipeline.cpp
Go to the documentation of this file.
1#include "BufferPipeline.hpp"
2
4
6
9
11
12namespace MayaFlux::Kriya {
13
14BufferPipeline::BufferPipeline(Vruta::TaskScheduler& scheduler, std::shared_ptr<Buffers::BufferManager> buffer_manager)
15 : m_coordinator(std::make_shared<CycleCoordinator>(scheduler))
16 , m_buffer_manager(std::move(buffer_manager))
17 , m_scheduler(&scheduler)
18{
19}
20
22{
23 if (m_buffer_manager) {
24 for (auto& op : m_operations) {
25 if (op.get_type() == BufferOperation::OpType::MODIFY && op.m_attached_processor) {
26 m_buffer_manager->remove_processor(
27 op.m_attached_processor,
28 op.m_target_buffer);
29 }
30 }
31 }
32}
33
35 std::function<bool(uint32_t)> condition,
36 const std::function<void(BufferPipeline&)>& branch_builder,
37 bool synchronous,
38 uint64_t samples_per_operation)
39{
40 auto branch_pipeline = std::make_shared<BufferPipeline>();
41 if (m_scheduler) {
42 branch_pipeline->m_scheduler = m_scheduler;
43 }
44 branch_builder(*branch_pipeline);
45
46 m_branches.push_back({ std::move(condition),
47 std::move(branch_pipeline),
48 synchronous,
49 samples_per_operation });
50
51 return *this;
52}
53
54BufferPipeline& BufferPipeline::parallel(std::initializer_list<BufferOperation> operations)
55{
56 for (auto& op : operations) {
57 BufferOperation parallel_op = BufferOperation(op);
58 parallel_op.with_priority(255);
59 m_operations.emplace_back(std::move(parallel_op));
60 }
61 return *this;
62}
63
65 std::function<void(uint32_t)> on_cycle_start,
66 std::function<void(uint32_t)> on_cycle_end)
67{
68 m_cycle_start_callback = std::move(on_cycle_start);
69 m_cycle_end_callback = std::move(on_cycle_end);
70 return *this;
71}
72
73void BufferPipeline::execute_buffer_rate(uint64_t max_cycles)
74{
75 if (!m_scheduler) {
76 error<std::runtime_error>(Journal::Component::Kriya,
78 std::source_location::current(),
79 "Pipeline requires scheduler for execution");
80 }
81
82 auto self = shared_from_this();
83
84 if (max_cycles == 0) {
85 max_cycles = UINT64_MAX;
87 }
88 m_max_cycles = max_cycles;
89
90 auto routine = std::make_shared<Vruta::SoundRoutine>(
91 execute_internal(max_cycles, 0));
92
93 m_scheduler->add_task(std::move(routine));
94
95 m_active_self = self;
96}
97
99{
100 if (!m_scheduler) {
101 error<std::runtime_error>(Journal::Component::Kriya,
103 std::source_location::current(),
104 "Pipeline requires scheduler for execution");
105 }
106 auto self = shared_from_this();
107
108 m_max_cycles = 1;
109 auto routine = std::make_shared<Vruta::SoundRoutine>(
110 execute_internal(1, 0));
111 m_scheduler->add_task(std::move(routine));
112 m_active_self = self;
113}
114
116{
117 if (!m_scheduler) {
118 error<std::runtime_error>(Journal::Component::Kriya,
120 std::source_location::current(),
121 "Pipeline requires scheduler for execution");
122 }
123
124 auto self = shared_from_this();
125
126 if (cycles == 0) {
127 cycles = UINT64_MAX;
129 }
130 m_max_cycles = cycles;
131 auto routine = std::make_shared<Vruta::SoundRoutine>(
132 execute_internal(cycles, 0));
133 m_scheduler->add_task(std::move(routine));
134 m_active_self = self;
135}
136
142
144 uint64_t max_cycles,
145 uint64_t samples_per_operation)
146{
147 if (!m_scheduler) {
148 error<std::runtime_error>(Journal::Component::Kriya,
150 std::source_location::current(),
151 "Pipeline must have scheduler for scheduled execution");
152 }
153
154 auto self = shared_from_this();
155
156 if (max_cycles == 0) {
157 max_cycles = UINT64_MAX;
159 }
160
161 m_max_cycles = max_cycles;
162
163 auto routine = std::make_shared<Vruta::SoundRoutine>(
164 execute_internal(max_cycles, samples_per_operation));
165
166 m_scheduler->add_task(std::move(routine));
167
168 m_active_self = self;
169}
170
172 uint32_t max_cycles,
173 double seconds_per_operation)
174{
175 if (!m_scheduler) {
176 error<std::runtime_error>(Journal::Component::Kriya,
178 std::source_location::current(),
179 "Pipeline must have scheduler for scheduled execution");
180 }
181
182 uint64_t samples = m_scheduler->seconds_to_samples(seconds_per_operation);
183 execute_scheduled(max_cycles, samples);
184}
185
186void BufferPipeline::mark_data_consumed(uint32_t operation_index)
187{
188 if (operation_index < m_data_states.size()) {
189 m_data_states[operation_index] = DataState::CONSUMED;
190 }
191}
192
194{
195 return std::ranges::any_of(m_data_states,
196 [](DataState state) { return state == DataState::READY; });
197}
198
199Kakshya::DataVariant BufferPipeline::extract_buffer_data(const std::shared_ptr<Buffers::AudioBuffer>& buffer, bool should_process)
200{
201 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
202 if (audio_buffer) {
203 if (should_process) {
204 audio_buffer->process_default();
205 }
206 const auto& data_span = audio_buffer->get_data();
207 std::vector<double> data_vector(data_span.begin(), data_span.end());
208 return data_vector;
209 }
210
211 return std::vector<double> {};
212}
213
214void BufferPipeline::write_to_buffer(const std::shared_ptr<Buffers::AudioBuffer>& buffer, const Kakshya::DataVariant& data)
215{
216 auto audio_buffer = std::dynamic_pointer_cast<Buffers::AudioBuffer>(buffer);
217 if (audio_buffer) {
218 try {
219 auto audio_data = std::get<std::vector<double>>(data);
220 auto& buffer_data = audio_buffer->get_data();
221
222 if (buffer_data.size() != audio_data.size()) {
223 buffer_data.resize(audio_data.size());
224 }
225
226 std::ranges::copy(audio_data, buffer_data.begin());
227
228 } catch (const std::bad_variant_access& e) {
229 error_rethrow(Journal::Component::Kriya,
231 std::source_location::current(),
232 "Data type mismatch when writing to audio buffer: {}",
233 e.what());
234 }
235 }
236
237 // TODO: Handle other buffer types
238}
239
240void BufferPipeline::write_to_container(const std::shared_ptr<Kakshya::DynamicSoundStream>& container, const Kakshya::DataVariant& data)
241{
242 try {
243 auto audio_data = std::get<std::vector<double>>(data);
244 std::span<const double> data_span(audio_data.data(), audio_data.size());
245
246 container->write_frames(data_span, 0);
247
248 } catch (const std::bad_variant_access& e) {
249 error_rethrow(Journal::Component::Kriya,
251 std::source_location::current(),
252 "Data type mismatch when writing to container: {}",
253 e.what());
254 } catch (const std::exception& e) {
255 error_rethrow(Journal::Component::Kriya,
257 std::source_location::current(),
258 "Error writing to container: {}",
259 e.what());
260 }
261}
262
263Kakshya::DataVariant BufferPipeline::read_from_container(const std::shared_ptr<Kakshya::DynamicSoundStream>& container,
264 uint64_t /*start_frame*/,
265 uint32_t length)
266{
267 try {
268 uint32_t read_length = length;
269 if (read_length == 0) {
270 read_length = static_cast<uint32_t>(container->get_total_elements() / container->get_num_channels());
271 }
272
273 std::vector<double> output_data(static_cast<size_t>(read_length * container->get_num_channels()));
274 std::span<double> output_span(output_data.data(), output_data.size());
275
276 uint64_t frames_read = container->read_frames(output_span, read_length);
277
278 if (frames_read < output_data.size()) {
279 output_data.resize(frames_read);
280 }
281
282 return output_data;
283
284 } catch (const std::exception& e) {
285 error_rethrow(Journal::Component::Kriya,
287 std::source_location::current(),
288 "Error reading from container: {}",
289 e.what());
290
291 return std::vector<double> {};
292 }
293}
294
296{
298 auto buffer_data = extract_buffer_data(op.m_capture.get_buffer(), should_process);
299
301 op.m_capture.m_data_ready_callback(buffer_data, cycle);
302 }
303
304 auto capture_mode = op.m_capture.get_mode();
305
306 switch (capture_mode) {
308 m_operation_data[&op] = buffer_data;
309 break;
310
312 auto it = m_operation_data.find(&op);
313 if (it == m_operation_data.end()) {
314 m_operation_data[&op] = buffer_data;
315 } else {
316 try {
317 auto& existing = std::get<std::vector<double>>(it->second);
318 const auto& new_data = std::get<std::vector<double>>(buffer_data);
319 existing.insert(existing.end(), new_data.begin(), new_data.end());
320
321 } catch (const std::bad_variant_access& e) {
324 "Data type mismatch during ACCUMULATE capture: {}",
325 e.what());
326 m_operation_data[&op] = buffer_data;
327 }
328 }
329 break;
330 }
332 uint32_t circular_size = op.m_capture.get_circular_size();
333 if (circular_size == 0) {
334 circular_size = 4096;
335 }
336
337 auto it = m_operation_data.find(&op);
338 if (it == m_operation_data.end()) {
339 m_operation_data[&op] = buffer_data;
340 } else {
341 try {
342 auto& circular = std::get<std::vector<double>>(it->second);
343 const auto& new_data = std::get<std::vector<double>>(buffer_data);
344
345 circular.insert(circular.end(), new_data.begin(), new_data.end());
346
347 if (circular.size() > circular_size) {
348 circular.erase(circular.begin(),
349 circular.begin() + static_cast<int64_t>(circular.size() - circular_size));
350 }
351
352 } catch (const std::bad_variant_access& e) {
355 "Data type mismatch during CIRCULAR capture: {}",
356 e.what());
357 m_operation_data[&op] = buffer_data;
358 } catch (std::exception& e) {
359 error_rethrow(Journal::Component::Kriya,
361 std::source_location::current(),
362 "Error during CIRCULAR capture: {}",
363 e.what());
364 m_operation_data[&op] = buffer_data;
365 }
366 }
367 break;
368 }
370 uint32_t window_size = op.m_capture.get_window_size();
371 float overlap_ratio = op.m_capture.get_overlap_ratio();
372
373 if (window_size == 0) {
374 window_size = 512;
375 }
376
377 auto hop_size = static_cast<uint32_t>((float)window_size * (1.0F - overlap_ratio));
378 if (hop_size == 0)
379 hop_size = 1;
380
381 auto it = m_operation_data.find(&op);
382 if (it == m_operation_data.end()) {
383 m_operation_data[&op] = buffer_data;
384 } else {
385 try {
386 auto& windowed = std::get<std::vector<double>>(it->second);
387 const auto& new_data = std::get<std::vector<double>>(buffer_data);
388
389 if (windowed.size() >= window_size) {
390 if (hop_size >= windowed.size()) {
391 windowed = std::get<std::vector<double>>(buffer_data);
392 } else {
393 windowed.erase(windowed.begin(),
394 windowed.begin() + hop_size);
395
396 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
397
398 if (windowed.size() > window_size) {
399 size_t excess = windowed.size() - window_size;
400 windowed.erase(windowed.begin(),
401 windowed.begin() + excess);
402 }
403 }
404 } else {
405 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
406
407 if (windowed.size() > window_size) {
408 size_t excess = windowed.size() - window_size;
409 windowed.erase(windowed.begin(),
410 windowed.begin() + excess);
411 }
412 }
413
414 } catch (const std::bad_variant_access& e) {
416 "Data type mismatch during WINDOWED capture: {}", e.what());
417 m_operation_data[&op] = buffer_data;
418 }
419 }
420 break;
421 }
422
425 m_operation_data[&op] = buffer_data;
426 }
427 break;
428 }
429
430 default:
431 m_operation_data[&op] = buffer_data;
432 break;
433 }
434
435 if (has_immediate_routing(op)) {
436 auto current_it = std::ranges::find_if(m_operations,
437 [&op](const BufferOperation& o) { return &o == &op; });
438
439 if (current_it != m_operations.end()) {
440 auto next_it = std::next(current_it);
441 if (next_it != m_operations.end() && next_it->get_type() == BufferOperation::OpType::ROUTE) {
442
443 if (next_it->m_target_buffer) {
444 write_to_buffer(next_it->m_target_buffer, buffer_data);
445 } else if (next_it->m_target_container) {
446 write_to_container(next_it->m_target_container, buffer_data);
447 }
448
449 size_t route_index = std::distance(m_operations.begin(), next_it);
450 if (route_index < m_data_states.size()) {
451 m_data_states[route_index] = DataState::CONSUMED;
452 }
453 }
454 }
455 }
456}
457
459{
460 for (auto& op : m_operations) {
461 if (op.get_type() == BufferOperation::OpType::CAPTURE) {
462 auto mode = op.m_capture.get_mode();
464 m_operation_data.erase(&op);
465 }
466 }
467 }
468}
469
471{
472 auto it = std::ranges::find_if(m_operations,
473 [&op](const BufferOperation& o) { return &o == &op; });
474
475 if (it == m_operations.end() || std::next(it) == m_operations.end()) {
476 return false;
477 }
478
479 auto next_op = std::next(it);
480 return next_op->get_type() == BufferOperation::OpType::ROUTE;
481}
482
484{
485 try {
486 switch (op.get_type()) {
488 capture_operation(op, cycle);
489
490 break;
491 }
492
494 Kakshya::DataVariant input_data;
495 if (m_operation_data.find(&op) != m_operation_data.end()) {
496 input_data = m_operation_data[&op];
497 } else {
498 for (auto& it : m_operation_data) {
499 input_data = it.second;
500 break;
501 }
502 }
503
504 if (op.m_transformer) {
505 auto transformed = op.m_transformer(input_data, cycle);
506 m_operation_data[&op] = transformed;
507
508 for (auto& m_operation : std::ranges::reverse_view(m_operations)) {
509 if (&m_operation == &op)
510 continue;
511 if (m_operation.get_type() == BufferOperation::OpType::CAPTURE && m_operation.m_capture.get_buffer()) {
512 write_to_buffer(m_operation.m_capture.get_buffer(), transformed);
513 break;
514 }
515 }
516 }
517 break;
518 }
519
521 Kakshya::DataVariant data_to_route;
522 if (m_operation_data.find(&op) != m_operation_data.end()) {
523 data_to_route = m_operation_data[&op];
524 } else {
525 for (auto& it : m_operation_data) {
526 data_to_route = it.second;
527 break;
528 }
529 }
530
531 if (op.m_target_buffer) {
532 write_to_buffer(op.m_target_buffer, data_to_route);
533 } else if (op.m_target_container) {
534 write_to_container(op.m_target_container, data_to_route);
535 }
536 break;
537 }
538
540 auto loaded_data = read_from_container(op.m_source_container,
541 op.m_start_frame,
542 op.m_load_length);
543
544 if (op.m_target_buffer) {
545 write_to_buffer(op.m_target_buffer, loaded_data);
546 }
547
548 m_operation_data[&op] = loaded_data;
549 break;
550 }
551
553 std::vector<Kakshya::DataVariant> fusion_inputs;
554
555 for (auto& source_buffer : op.m_source_buffers) {
557 auto buffer_data = extract_buffer_data(source_buffer, should_process);
558 fusion_inputs.push_back(buffer_data);
559 }
560
561 for (auto& source_container : op.m_source_containers) {
562 auto container_data = read_from_container(source_container, 0, 0);
563 fusion_inputs.push_back(container_data);
564 }
565
566 if (op.m_fusion_function && !fusion_inputs.empty()) {
567 auto fused_data = op.m_fusion_function(fusion_inputs, cycle);
568
569 if (op.m_target_buffer) {
570 write_to_buffer(op.m_target_buffer, fused_data);
571 } else if (op.m_target_container) {
573 }
574
575 m_operation_data[&op] = fused_data;
576 }
577 break;
578 }
579
581 Kakshya::DataVariant data_to_dispatch;
582 if (m_operation_data.find(&op) != m_operation_data.end()) {
583 data_to_dispatch = m_operation_data[&op];
584 } else {
585 for (auto& it : m_operation_data) {
586 data_to_dispatch = it.second;
587 break;
588 }
589 }
590
591 if (op.m_dispatch_handler) {
592 op.m_dispatch_handler(data_to_dispatch, cycle);
593 }
594 break;
595 }
596
598 if (!m_buffer_manager) {
599 error<std::invalid_argument>(Journal::Component::Kriya,
601 std::source_location::current(),
602 "BufferPipeline has no BufferManager for MODIFY operation");
603 }
604
605 if (!op.m_attached_processor) {
606 op.m_attached_processor = m_buffer_manager->attach_quick_process(
609 if (m_max_cycles != 0 && op.is_streaming()) {
611 }
612 }
613
614 if (op.m_modify_cycle_count > 0 && cycle >= op.m_modify_cycle_count - 1) {
615 if (op.m_attached_processor) {
616 m_buffer_manager->remove_processor(
618 op.m_target_buffer);
619 op.m_attached_processor = nullptr;
620 }
621 }
622
623 break;
624 }
625
627 break;
628
629 default:
632 "Unknown operation type in pipeline : {} : {}",
633 Utils::enum_to_string(op.get_type()), std::to_string(static_cast<int>(op.get_type())));
634 break;
635 }
636 } catch (const std::exception& e) {
637 error_rethrow(Journal::Component::Kriya,
639 std::source_location::current(),
640 "Error processing operation in BufferPipeline: {}",
641 e.what());
642 }
643}
644
645std::shared_ptr<Vruta::SoundRoutine> BufferPipeline::dispatch_branch_async(BranchInfo& branch, uint64_t /*cycle*/)
646{
647 if (!m_scheduler)
648 return nullptr;
649
650 if (!m_coordinator) {
651 m_coordinator = std::make_unique<CycleCoordinator>(*m_scheduler);
652 }
653
654 branch.pipeline->m_active_self = branch.pipeline;
655
656 auto branch_routine = branch.pipeline->execute_internal(1, branch.samples_per_operation);
657
658 auto task = std::make_shared<Vruta::SoundRoutine>(std::move(branch_routine));
659 m_scheduler->add_task(task);
660
661 m_branch_tasks.push_back(task);
662
663 return task;
664}
665
667{
668 for (size_t i = 0; i < m_data_states.size(); ++i) {
670 if (i < m_operations.size() && m_operations[i].get_type() == BufferOperation::OpType::CAPTURE && m_operations[i].m_capture.get_mode() == BufferCapture::CaptureMode::TRANSIENT) {
671
672 if (m_operations[i].m_capture.m_data_expired_callback) {
673 auto it = m_operation_data.find(&m_operations[i]);
674 if (it != m_operation_data.end()) {
675 m_operations[i].m_capture.m_data_expired_callback(it->second, m_current_cycle);
676 }
677 }
678
680 } else {
682 }
683 }
684 }
685
686 auto it = m_operation_data.begin();
687 while (it != m_operation_data.end()) {
688 if (m_current_cycle > 2) {
689 it = m_operation_data.erase(it);
690 } else {
691 ++it;
692 }
693 }
694}
695
697{
698 for (auto& branch : m_branches) {
699 if (branch.pipeline) {
700 branch.pipeline->m_active_self.reset();
701 }
702 }
703
704 m_branch_tasks.erase(
705 std::remove_if(m_branch_tasks.begin(), m_branch_tasks.end(),
706 [](const auto& task) { return !task || !task->is_active(); }),
707 m_branch_tasks.end());
708}
709
710Vruta::SoundRoutine BufferPipeline::execute_internal(uint64_t max_cycles, uint64_t samples_per_operation)
711{
712 switch (m_execution_strategy) {
714 return execute_phased(max_cycles, samples_per_operation);
715
717 return execute_streaming(max_cycles, samples_per_operation);
718
720 return execute_parallel(max_cycles, samples_per_operation);
721
723 return execute_reactive(max_cycles, samples_per_operation);
724
725 default:
726 error<std::runtime_error>(Journal::Component::Kriya,
728 std::source_location::current(),
729 "Unknown execution strategy in BufferPipeline");
730 }
731}
732
733Vruta::SoundRoutine BufferPipeline::execute_phased(uint64_t max_cycles, uint64_t samples_per_operation)
734{
735 auto& promise = co_await Kriya::GetAudioPromise {};
736
737 if (m_operations.empty()) {
738 co_return;
739 }
740
742 uint32_t cycles_executed = 0;
743
744 while ((max_cycles == 0 || cycles_executed < max_cycles) && (m_continuous_execution || cycles_executed < max_cycles)) {
745
746 if (promise.should_terminate) {
747 break;
748 }
749
752 }
753
754 for (auto& state : m_data_states) {
755 if (state != DataState::EMPTY) {
756 state = DataState::EMPTY;
757 }
758 }
759
761
762 // ═══════════════════════════════════════════════════════
763 // PHASE 1: CAPTURE - Execute all capture operations
764 // ═══════════════════════════════════════════════════════
765 for (size_t i = 0; i < m_operations.size(); ++i) {
766 auto& op = m_operations[i];
767
769 continue;
770 }
771
772 if (op.get_type() == BufferOperation::OpType::CONDITION) {
773 if (!op.m_condition || !op.m_condition(m_current_cycle)) {
774 continue;
775 }
776 }
777
778 if (m_current_cycle % op.m_cycle_interval != 0) {
779 continue;
780 }
781
782 uint32_t op_iterations = 1;
783 if (op.get_type() == BufferOperation::OpType::CAPTURE) {
784 op_iterations = op.m_capture.get_cycle_count();
785 }
786
787 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
790 co_await BufferDelay { 1 };
791 } else if (m_capture_timing == Vruta::DelayContext::SAMPLE_BASED && samples_per_operation > 0) {
792 co_await SampleDelay { samples_per_operation };
793 }
794 }
795
797 }
798
799 // ═══════════════════════════════════════════════════════
800 // PHASE 2: PROCESS - Execute all processing operations
801 // ═══════════════════════════════════════════════════════
802
803 for (size_t i = 0; i < m_operations.size(); ++i) {
804 auto& op = m_operations[i];
805
807 continue;
808 }
809
811 continue;
812 }
813
814 if (op.get_type() == BufferOperation::OpType::CONDITION) {
815 if (!op.m_condition || !op.m_condition(m_current_cycle)) {
816 continue;
817 }
818 }
819
820 if (m_current_cycle % op.m_cycle_interval != 0) {
821 continue;
822 }
823
826
828 co_await BufferDelay { 1 };
829 } else if (m_process_timing == Vruta::DelayContext::SAMPLE_BASED && samples_per_operation > 0) {
830 co_await SampleDelay { samples_per_operation };
831 }
832 }
833
834 // ═══════════════════════════════════════════════════════
835 // Handle branches (if any)
836 // ═══════════════════════════════════════════════════════
837 std::vector<std::shared_ptr<Vruta::SoundRoutine>> current_cycle_sync_tasks;
838
839 for (auto& branch : m_branches) {
840 if (branch.condition(m_current_cycle)) {
841 auto task = dispatch_branch_async(branch, m_current_cycle);
842
843 if (branch.synchronous && task) {
844 current_cycle_sync_tasks.push_back(task);
845 }
846 }
847 }
848
849 if (!current_cycle_sync_tasks.empty()) {
850 bool any_active = true;
851 while (any_active) {
852 any_active = false;
853
854 for (auto& task : current_cycle_sync_tasks) {
855 if (task && task->is_active()) {
856 any_active = true;
857 break;
858 }
859 }
860
861 if (any_active) {
863 co_await BufferDelay { 1 };
864 } else {
865 co_await SampleDelay { 1 };
866 }
867 }
868 }
869 }
870
872
875 }
876
878
880 cycles_executed++;
881 }
882}
883
884Vruta::SoundRoutine BufferPipeline::execute_streaming(uint64_t max_cycles, uint64_t samples_per_operation)
885{
886 auto& promise = co_await Kriya::GetAudioPromise {};
887
888 if (m_operations.empty()) {
889 co_return;
890 }
891
893 uint32_t cycles_executed = 0;
894
895 while ((max_cycles == 0 || cycles_executed < max_cycles) && (m_continuous_execution || cycles_executed < max_cycles)) {
896
897 if (promise.should_terminate) {
898 break;
899 }
900
903 }
904
905 for (size_t i = 0; i < m_operations.size(); ++i) {
906 auto& op = m_operations[i];
907
908 if (op.get_type() == BufferOperation::OpType::CONDITION) {
909 if (!op.m_condition || !op.m_condition(m_current_cycle)) {
910 continue;
911 }
912 }
913
914 if (m_current_cycle % op.m_cycle_interval != 0) {
915 continue;
916 }
917
918 uint32_t op_iterations = 1;
919 if (op.get_type() == BufferOperation::OpType::CAPTURE) {
920 op_iterations = op.m_capture.get_cycle_count();
921 }
922
923 for (uint32_t iter = 0; iter < op_iterations; ++iter) {
924
927
928 for (size_t j = i + 1; j < m_operations.size(); ++j) {
929 auto& dependent_op = m_operations[j];
930
932 process_operation(dependent_op, m_current_cycle + iter);
934 }
935
936 break;
937 }
938
940 co_await BufferDelay { 1 };
941 } else if (samples_per_operation > 0) {
942 co_await SampleDelay { samples_per_operation };
943 }
944 }
945 }
946
947 for (auto& branch : m_branches) {
948 if (branch.condition(m_current_cycle)) {
950 }
951 }
952
954
957 }
958
960
962 cycles_executed++;
963 }
964}
965
966Vruta::SoundRoutine BufferPipeline::execute_parallel(uint64_t max_cycles, uint64_t samples_per_operation)
967{
968 // TODO: Implement parallel execution strategy
969 std::cout << "PARALLEL strategy not yet implemented, using PHASED\n";
970 return execute_phased(max_cycles, samples_per_operation);
971}
972
973Vruta::SoundRoutine BufferPipeline::execute_reactive(uint64_t max_cycles, uint64_t samples_per_operation)
974{
975 // TODO: Implement reactive execution strategy
976 std::cout << "REACTIVE strategy not yet implemented, using PHASED\n";
977 return execute_phased(max_cycles, samples_per_operation);
978}
979
980}
#define MF_ERROR(comp, ctx,...)
Cycle Behavior: The for_cycles(N) configuration controls how many times the capture operation execute...
std::shared_ptr< Buffers::AudioBuffer > get_buffer() const
Definition Capture.hpp:167
OperationFunction m_data_ready_callback
Definition Capture.hpp:187
ProcessingControl get_processing_control() const
Definition Capture.hpp:169
@ TRANSIENT
Single cycle capture (default) - data expires after 1 cycle.
@ CIRCULAR
Circular buffer with overwrite.
@ ACCUMULATE
Accumulate over multiple cycles in container.
@ WINDOWED
Rolling window capture with overlap.
@ TRIGGERED
Capture only when condition met.
std::function< bool()> m_stop_condition
Definition Capture.hpp:186
CaptureMode get_mode() const
Definition Capture.hpp:168
uint32_t get_window_size() const
Definition Capture.hpp:173
uint32_t get_circular_size() const
Definition Capture.hpp:172
std::vector< std::shared_ptr< Buffers::AudioBuffer > > m_source_buffers
std::shared_ptr< Buffers::AudioBuffer > m_target_buffer
std::vector< std::shared_ptr< Kakshya::DynamicSoundStream > > m_source_containers
Buffers::BufferProcessingFunction m_buffer_modifier
static bool is_capture_phase_operation(const BufferOperation &op)
std::shared_ptr< Kakshya::DynamicSoundStream > m_target_container
bool is_streaming() const
Check if this operation is a streaming operation.
TransformVectorFunction m_fusion_function
std::shared_ptr< Buffers::BufferProcessor > m_attached_processor
BufferOperation & with_priority(uint8_t priority)
Set execution priority for scheduler ordering.
OpType get_type() const
Getters for internal state (read-only)
std::shared_ptr< Kakshya::DynamicSoundStream > m_source_container
TransformationFunction m_transformer
@ LOAD
Load data from container to buffer with position control.
@ CONDITION
Conditional operation for branching logic.
@ FUSE
Fuse multiple sources using custom fusion functions.
@ ROUTE
Route data to destination (buffer or container)
@ CAPTURE
Capture data from source buffer using BufferCapture strategy.
@ MODIFY
Modify Buffer Data using custom quick process.
@ DISPATCH
Dispatch to external handler for custom processing.
@ TRANSFORM
Apply transformation function to data variants.
static bool is_process_phase_operation(const BufferOperation &op)
Fundamental unit of operation in buffer processing pipelines.
std::unordered_map< BufferOperation *, Kakshya::DataVariant > m_operation_data
@ READY
Data ready for processing.
@ EXPIRED
Data has expired and should be cleaned up.
Vruta::SoundRoutine execute_reactive(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, const Kakshya::DataVariant &data)
std::shared_ptr< CycleCoordinator > m_coordinator
BufferPipeline & branch_if(std::function< bool(uint32_t)> condition, const std::function< void(BufferPipeline &)> &branch_builder, bool synchronous=false, uint64_t samples_per_operation=1)
Add conditional branch to the pipeline.
void execute_for_cycles(uint64_t cycles=0)
Execute the pipeline for a specified number of cycles.
void process_operation(BufferOperation &op, uint64_t cycle)
Vruta::SoundRoutine execute_internal(uint64_t max_cycles, uint64_t samples_per_operation)
void execute_continuous()
Start continuous execution of the pipeline.
void execute_scheduled_at_rate(uint32_t max_cycles=0, double seconds_per_operation=1)
Execute pipeline with real-time rate control.
std::vector< BranchInfo > m_branches
void capture_operation(BufferOperation &op, uint64_t cycle)
std::shared_ptr< BufferPipeline > m_active_self
std::vector< BufferOperation > m_operations
void execute_once()
Execute the pipeline for a single cycle.
std::vector< std::shared_ptr< Vruta::SoundRoutine > > m_branch_tasks
std::vector< DataState > m_data_states
static Kakshya::DataVariant read_from_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, uint64_t start, uint32_t length)
Vruta::SoundRoutine execute_streaming(uint64_t max_cycles, uint64_t samples_per_operation)
static void write_to_buffer(const std::shared_ptr< Buffers::AudioBuffer > &buffer, const Kakshya::DataVariant &data)
Vruta::SoundRoutine execute_parallel(uint64_t max_cycles, uint64_t samples_per_operation)
std::function< void(uint32_t)> m_cycle_end_callback
bool has_pending_data() const
Check if any operations have pending data ready for processing.
void mark_data_consumed(uint32_t operation_index)
Execute pipeline synchronized to audio hardware cycle boundaries.
std::function< void(uint32_t)> m_cycle_start_callback
void execute_buffer_rate(uint64_t max_cycles=0)
Execute pipeline synchronized to audio hardware cycle boundaries.
std::shared_ptr< Buffers::BufferManager > m_buffer_manager
BufferPipeline & parallel(std::initializer_list< BufferOperation > operations)
Execute operations in parallel within the current cycle.
bool has_immediate_routing(const BufferOperation &op) const
void execute_scheduled(uint64_t max_cycles=0, uint64_t samples_per_operation=1)
Execute pipeline with sample-accurate timing between operations.
Vruta::SoundRoutine execute_phased(uint64_t max_cycles, uint64_t samples_per_operation)
static Kakshya::DataVariant extract_buffer_data(const std::shared_ptr< Buffers::AudioBuffer > &buffer, bool should_process=false)
Vruta::TaskScheduler * m_scheduler
std::shared_ptr< Vruta::SoundRoutine > dispatch_branch_async(BranchInfo &branch, uint64_t cycle)
BufferPipeline & with_lifecycle(std::function< void(uint32_t)> on_cycle_start, std::function< void(uint32_t)> on_cycle_end)
Set lifecycle callbacks for cycle management.
Coroutine-based execution engine for composable, multi-strategy buffer processing.
Cross-pipeline synchronization and coordination system.
A C++20 coroutine-based audio processing task with sample-accurate timing.
Definition Routine.hpp:309
void add_task(std::shared_ptr< Routine > routine, const std::string &name="", bool initialize=false)
Add a routine to the scheduler based on its processing token.
Definition Scheduler.cpp:17
uint64_t seconds_to_samples(double seconds) const
Converts a time in seconds to a number of samples.
Token-based multimodal task scheduling system for unified coroutine processing.
Definition Scheduler.hpp:51
@ AUDIO_BACKEND
Standard audio processing backend configuration.
@ CoroutineScheduling
Coroutine scheduling and temporal coordination (Vruta::TaskScheduler)
@ Kriya
Automatable tasks and fluent scheduling api for Nodes and Buffers.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
Definition NDData.hpp:73
@ PHASED
PHASED: Traditional phased execution (default)
@ STREAMING
STREAMING: Immediate flow-through execution.
@ PARALLEL
PARALLEL: Concurrent capture with synchronization.
@ REACTIVE
REACTIVE: Data-driven reactive execution.
constexpr std::string_view enum_to_string(EnumType value) noexcept
Universal enum to string converter using magic_enum (original case)
Definition EnumUtils.hpp:51
@ SAMPLE_BASED
Sample-accurate delay (audio domain)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)
Awaiter for suspending until a buffer cycle boundary.
Definition Awaiters.hpp:105
std::shared_ptr< BufferPipeline > pipeline
Templated awaitable for accessing a coroutine's promise object.
Definition Awaiters.hpp:188
Awaitable object for precise sample-accurate timing delays.
Definition Awaiters.hpp:35