MayaFlux 0.2.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
SoundStreamContainer.cpp
Go to the documentation of this file.
2
4
7
9
10namespace MayaFlux::Kakshya {
11
12SoundStreamContainer::SoundStreamContainer(uint32_t sample_rate, uint32_t num_channels,
13 uint64_t initial_capacity, bool circular_mode)
14 : m_sample_rate(sample_rate)
15 , m_num_channels(num_channels)
16 , m_num_frames(initial_capacity)
17 , m_circular_mode(circular_mode)
18{
20
21 m_read_position = std::vector<std::atomic<uint64_t>>(m_num_channels);
22 for (auto& pos : m_read_position) {
23 pos.store(0);
24 }
25
26 m_data = std::views::iota(0U, num_channels)
27 | std::views::transform([](auto) { return DataVariant(std::vector<double> {}); })
28 | std::ranges::to<std::vector>();
29}
30
32{
33 DataModality modality = (m_num_channels > 1)
36
37 std::vector<uint64_t> shape;
38 if (modality == DataModality::AUDIO_1D) {
39 shape = { m_num_frames };
40 } else {
41 shape = { m_num_frames, m_num_channels };
42 }
43
44 auto layout = m_structure.memory_layout;
46
47 m_structure = ContainerDataStructure(modality, org, layout);
49
52}
53
54std::vector<DataDimension> SoundStreamContainer::get_dimensions() const
55{
57}
58
63
65{
66 if (layout != m_structure.memory_layout) {
67 std::unique_lock lock(m_data_mutex);
71 }
72}
73
75{
76 // return m_structure.get_channel_count();
77 return m_num_channels;
78}
79
81{
82 // return m_structure.get_samples_count_per_channel();
83 return m_num_frames;
84}
85
86std::vector<DataVariant> SoundStreamContainer::get_region_data(const Region& region) const
87{
88 const auto& spans = get_span_cache();
89
91 if (spans.empty())
92 return {};
93
94 std::span<const double> const_span(spans[0].data(), spans[0].size());
95 auto extracted = extract_region_data<double>(const_span, region, m_structure.dimensions);
96 return { DataVariant(std::move(extracted)) };
97 }
98
99 auto const_spans = spans | std::views::transform([](const auto& span) {
100 return std::span<const double>(span.data(), span.size());
101 });
102
103 auto extracted_channels = extract_region_data<double>(
104 std::vector<std::span<const double>>(const_spans.begin(), const_spans.end()),
105 region, m_structure.dimensions);
106
107 return extracted_channels
108 | std::views::transform([](auto&& channel) {
109 return DataVariant(std::forward<decltype(channel)>(channel));
110 })
111 | std::ranges::to<std::vector>();
112}
113
114void SoundStreamContainer::set_region_data(const Region& region, const std::vector<DataVariant>& data)
115{
117 if (m_data.empty() || data.empty())
118 return;
119
120 std::unique_lock lock(m_data_mutex);
121
122 auto dest_span = convert_variant<double>(m_data[0]);
123 auto src_span = convert_variant<double>(data[0]);
124
125 set_or_update_region_data<double>(
126 dest_span, src_span, region, m_structure.dimensions);
127
128 } else {
129 size_t channels_to_update = std::min(m_data.size(), data.size());
130
131 std::unique_lock lock(m_data_mutex);
132 for (size_t i = 0; i < channels_to_update; ++i) {
133 auto dest_span = convert_variant<double>(m_data[i]);
134 auto src_span = convert_variant<double>(data[i]);
135
136 set_or_update_region_data<double>(
137 dest_span, src_span, region, m_structure.dimensions);
138 }
139 }
140
142 m_double_extraction_dirty.store(true, std::memory_order_release);
143}
144
145std::vector<DataVariant> SoundStreamContainer::get_region_group_data(const RegionGroup& region_group) const
146{
147 const auto& spans = get_span_cache();
148
149 if (spans.empty())
150 return {};
151
152 auto const_spans = spans | std::views::transform([](const auto& span) {
153 return std::span<const double>(span.data(), span.size());
154 });
155
156 auto extracted_channels = extract_group_data<double>(
157 std::vector<std::span<const double>>(const_spans.begin(), const_spans.end()),
159
160 return extracted_channels
161 | std::views::transform([](auto&& channel) {
162 return DataVariant(std::forward<decltype(channel)>(channel));
163 })
164 | std::ranges::to<std::vector>();
165}
166
167std::vector<DataVariant> SoundStreamContainer::get_segments_data(const std::vector<RegionSegment>& segments) const
168{
169 const auto& spans = get_span_cache();
170
171 if (spans.empty() || segments.empty())
172 return {};
173
174 auto const_spans = spans | std::views::transform([](const auto& span) {
175 return std::span<const double>(span.data(), span.size());
176 });
177
178 auto extracted_channels = extract_segments_data<double>(
179 segments,
180 std::vector<std::span<const double>>(const_spans.begin(), const_spans.end()),
182
183 return extracted_channels
184 | std::views::transform([](auto&& channel) {
185 return DataVariant(std::forward<decltype(channel)>(channel));
186 })
187 | std::ranges::to<std::vector>();
188}
189
190std::span<const double> SoundStreamContainer::get_frame(uint64_t frame_index) const
191{
192 if (frame_index >= m_num_frames) {
193 return {};
194 }
195
196 const auto& spans = get_span_cache();
197
199 if (spans.empty())
200 return {};
201
202 auto frame_span = extract_frame<double>(spans[0], frame_index, m_num_channels);
203 return { frame_span.data(), frame_span.size() };
204 }
205
206 static thread_local std::vector<double> frame_buffer;
207 auto frame_span = extract_frame<double>(spans, frame_index, frame_buffer);
208 return { frame_span.data(), frame_span.size() };
209}
210
211void SoundStreamContainer::get_frames(std::span<double> output, uint64_t start_frame, uint64_t num_frames) const
212{
213 if (start_frame >= m_num_frames || output.empty()) {
214 std::ranges::fill(output, 0.0);
215 return;
216 }
217
218 uint64_t frames_to_copy = std::min<size_t>(num_frames, m_num_frames - start_frame);
219 uint64_t elements_to_copy = std::min(
220 frames_to_copy * m_num_channels,
221 static_cast<uint64_t>(output.size()));
222
223 auto interleaved_data = get_data_as_double();
224 uint64_t offset = start_frame * m_num_channels;
225
226 if (offset < interleaved_data.size()) {
227 uint64_t available = std::min<size_t>(elements_to_copy, interleaved_data.size() - offset);
228 std::copy_n(interleaved_data.begin() + offset, available, output.begin());
229
230 if (available < output.size()) {
231 std::fill(output.begin() + available, output.end(), 0.0);
232 }
233 } else {
234 std::ranges::fill(output, 0.0);
235 }
236}
237
238double SoundStreamContainer::get_value_at(const std::vector<uint64_t>& coordinates) const
239{
240 if (!has_data() || coordinates.size() != 2) {
241 return 0.0;
242 }
243
244 uint64_t frame = coordinates[0];
245 uint64_t channel = coordinates[1];
246
247 if (frame >= m_num_frames || channel >= m_num_channels) {
248 return 0.0;
249 }
250
251 const auto& spans = get_span_cache();
252
254 if (spans.empty())
255 return 0.0;
256
257 uint64_t linear_index = frame * m_num_channels + channel;
258 return (linear_index < spans[0].size()) ? spans[0][linear_index] : 0.0;
259 }
260
261 if (channel >= spans.size())
262 return 0.0;
263
264 return (frame < spans[channel].size()) ? spans[channel][frame] : 0.0;
265}
266
267void SoundStreamContainer::set_value_at(const std::vector<uint64_t>& coordinates, double value)
268{
269 if (coordinates.size() != 2)
270 return;
271
272 uint64_t frame = coordinates[0];
273 uint64_t channel = coordinates[1];
274
275 if (frame >= m_num_frames || channel >= m_num_channels) {
276 return;
277 }
278
279 const auto& spans = get_span_cache();
280
282 if (spans.empty())
283 return;
284
285 uint64_t linear_index = frame * m_num_channels + channel;
286 if (linear_index < spans[0].size()) {
287 const_cast<std::span<double>&>(spans[0])[linear_index] = value;
288 }
289
290 } else {
291 if (channel >= spans.size())
292 return;
293
294 if (frame < spans[channel].size()) {
295 const_cast<std::span<double>&>(spans[channel])[frame] = value;
296 }
297 }
298
300 m_double_extraction_dirty.store(true, std::memory_order_release);
301}
302
303uint64_t SoundStreamContainer::coordinates_to_linear_index(const std::vector<uint64_t>& coordinates) const
304{
305 return coordinates_to_linear(coordinates, m_structure.dimensions);
306}
307
308std::vector<uint64_t> SoundStreamContainer::linear_index_to_coordinates(uint64_t linear_index) const
309{
310 return linear_to_coordinates(linear_index, m_structure.dimensions);
311}
312
314{
315 std::unique_lock lock(m_data_mutex);
316
317 std::ranges::for_each(m_data, [](auto& vec) {
318 std::visit([](auto& v) { v.clear(); }, vec);
319 });
320
321 std::ranges::for_each(m_processed_data, [](auto& vec) {
322 std::visit([](auto& v) { v.clear(); }, vec);
323 });
324
325 m_num_frames = 0;
327
328 m_read_position = std::vector<std::atomic<uint64_t>>(m_num_channels);
329 for (auto& pos : m_read_position) {
330 pos.store(0);
331 }
332
335}
336
338{
339 auto span = get_data_as_double();
340 return span.empty() ? nullptr : span.data();
341}
342
344{
345 std::shared_lock lock(m_data_mutex);
346
347 return std::ranges::any_of(m_data, [](const auto& variant) {
348 return std::visit([](const auto& vec) { return !vec.empty(); }, variant);
349 });
350}
351
353{
354 std::lock_guard<std::mutex> lock(m_state_mutex);
355 m_region_groups[group.name] = group;
356}
357
358const RegionGroup& SoundStreamContainer::get_region_group(const std::string& name) const
359{
360 static const RegionGroup empty_group;
361
362 std::shared_lock lock(m_data_mutex);
363 auto it = m_region_groups.find(name);
364 return it != m_region_groups.end() ? it->second : empty_group;
365}
366
367std::unordered_map<std::string, RegionGroup> SoundStreamContainer::get_all_region_groups() const
368{
369 std::lock_guard<std::mutex> lock(m_state_mutex);
370 return m_region_groups;
371}
372
373void SoundStreamContainer::remove_region_group(const std::string& name)
374{
375 std::lock_guard<std::mutex> lock(m_state_mutex);
376 m_region_groups.erase(name);
377}
378
380{
381 return true;
382}
383
385{
386 // No-op for in-memory container
387}
388
390{
391 // No-op for in-memory container
392}
393
394void SoundStreamContainer::set_read_position(const std::vector<uint64_t>& position)
395{
396 if (m_read_position.size() != position.size()) {
397 m_read_position = std::vector<std::atomic<uint64_t>>(position.size());
398 }
399
400 auto wrapped_pos = wrap_position_with_loop(position, m_loop_region, m_looping_enabled);
401
402 for (size_t i = 0; i < wrapped_pos.size(); ++i) {
403 m_read_position[i].store(wrapped_pos[i]);
404 }
405}
406
408{
409 if (channel < m_read_position.size()) {
410 m_read_position[channel].store(frame);
411 }
412}
413
414const std::vector<uint64_t>& SoundStreamContainer::get_read_position() const
415{
416 static thread_local std::vector<uint64_t> pos_cache;
417 pos_cache.resize(m_read_position.size());
418
419 for (size_t i = 0; i < m_read_position.size(); ++i) {
420 pos_cache[i] = m_read_position[i].load();
421 }
422
423 return pos_cache;
424}
425
426void SoundStreamContainer::advance_read_position(const std::vector<uint64_t>& frames)
427{
428 if (frames.empty())
429 return;
430
431 std::vector<uint64_t> current_pos(m_read_position.size());
432 for (size_t i = 0; i < m_read_position.size(); ++i) {
433 current_pos[i] = m_read_position[i].load();
434 }
435
436 auto new_pos = advance_position(current_pos, frames, m_structure, m_looping_enabled, m_loop_region);
437
438 for (size_t i = 0; i < new_pos.size() && i < m_read_position.size(); ++i) {
439 m_read_position[i].store(new_pos[i]);
440 }
441}
442
444{
445 if (m_looping_enabled) {
446 return false;
447 }
448
449 if (m_read_position.empty()) {
450 return true;
451 }
452
453 uint64_t current_frame = m_read_position[0].load();
454 return current_frame >= m_num_frames;
455}
456
458{
459 std::vector<uint64_t> start_pos;
460
463 } else {
464 start_pos = std::vector<uint64_t>(m_num_channels, 0);
465 }
466
467 if (m_read_position.size() != start_pos.size()) {
468 m_read_position = std::vector<std::atomic<uint64_t>>(start_pos.size());
469 }
470
471 for (size_t i = 0; i < start_pos.size(); ++i) {
472 m_read_position[i].store(start_pos[i]);
473 }
474}
475
476uint64_t SoundStreamContainer::time_to_position(double time) const
477{
479}
480
481double SoundStreamContainer::position_to_time(uint64_t position) const
482{
484}
485
487{
488 m_looping_enabled = enable;
489 if (enable && m_loop_region.start_coordinates.empty()) {
491 }
492}
493
495{
496 m_loop_region = region;
497
498 if (m_looping_enabled && !region.start_coordinates.empty()) {
499 std::vector<uint64_t> current_pos(m_read_position.size());
500 for (size_t i = 0; i < m_read_position.size(); ++i) {
501 current_pos[i] = m_read_position[i].load();
502 }
503
504 bool outside_loop = false;
505 for (size_t i = 0; i < current_pos.size() && i < region.start_coordinates.size() && i < region.end_coordinates.size(); ++i) {
506 if (current_pos[i] < region.start_coordinates[i] || current_pos[i] > region.end_coordinates[i]) {
507 outside_loop = true;
508 break;
509 }
510 }
511
512 if (outside_loop) {
514 }
515 }
516}
517
522
524{
525 auto state = get_processing_state();
526 return has_data() && (state == ProcessingState::READY || state == ProcessingState::PROCESSED);
527}
528
530{
531 std::vector<uint64_t> frames(m_num_channels);
532 if (m_looping_enabled || m_read_position.empty()) {
533
534 for (auto& frame : frames) {
535 frame = std::numeric_limits<uint64_t>::max();
536 }
537 return frames;
538 }
539
540 for (size_t i = 0; i < frames.size(); i++) {
541 uint64_t current_frame = m_read_position[i].load();
542 frames[i] = (current_frame < m_num_frames) ? (m_num_frames - current_frame) : 0;
543 }
544 return frames;
545}
546
547uint64_t SoundStreamContainer::read_sequential(std::span<double> output, uint64_t count)
548{
549 uint64_t frames_read = peek_sequential(output, count, 0);
550
551 uint64_t frames_to_advance = frames_read / m_num_channels;
552 std::vector<uint64_t> advance_amount(m_num_channels, frames_to_advance);
553
554 advance_read_position(advance_amount);
555 return frames_read;
556}
557
558uint64_t SoundStreamContainer::peek_sequential(std::span<double> output, uint64_t count, uint64_t offset) const
559{
560 auto interleaved_span = get_data_as_double();
561 if (interleaved_span.empty() || output.empty())
562 return 0;
563
564 uint64_t start_frame = m_read_position.empty() ? 0 : m_read_position[0].load();
565 start_frame += offset;
566 uint64_t elements_to_read = std::min<uint64_t>(count, static_cast<uint64_t>(output.size()));
567
568 if (!m_looping_enabled) {
569 uint64_t linear_start = start_frame * m_num_channels;
570 if (linear_start >= interleaved_span.size()) {
571 std::ranges::fill(output, 0.0);
572 return 0;
573 }
574 auto view = interleaved_span
575 | std::views::drop(linear_start)
576 | std::views::take(elements_to_read);
577
578 auto copied = std::ranges::copy(view, output.begin());
579 std::ranges::fill(output.subspan(copied.out - output.begin()), 0.0);
580 return static_cast<uint64_t>(copied.out - output.begin());
581 }
582
583 if (m_loop_region.start_coordinates.empty()) {
584 std::ranges::fill(output, 0.0);
585 return 0;
586 }
587
588 uint64_t loop_start_frame = m_loop_region.start_coordinates[0];
589 uint64_t loop_end_frame = m_loop_region.end_coordinates[0];
590 uint64_t loop_length_frames = loop_end_frame - loop_start_frame + 1;
591
592 std::ranges::for_each(
593 std::views::iota(0UZ, elements_to_read),
594 [&](uint64_t i) {
595 uint64_t element_pos = start_frame * m_num_channels + i;
596 uint64_t frame_pos = element_pos / m_num_channels;
597 uint64_t channel_offset = element_pos % m_num_channels;
598
599 uint64_t wrapped_frame = ((frame_pos - loop_start_frame) % loop_length_frames) + loop_start_frame;
600 uint64_t wrapped_element = wrapped_frame * m_num_channels + channel_offset;
601
602 output[i] = (wrapped_element < interleaved_span.size()) ? interleaved_span[wrapped_element] : 0.0;
603 });
604
605 if (elements_to_read < output.size()) {
606 std::ranges::fill(output.subspan(elements_to_read), 0.0);
607 }
608 return elements_to_read;
609}
610
612{
613 ProcessingState old_state = m_processing_state.exchange(new_state);
614
615 if (old_state != new_state) {
616 notify_state_change(new_state);
617
618 if (new_state == ProcessingState::READY) {
619 std::lock_guard<std::mutex> lock(m_reader_mutex);
620 m_consumed_dimensions.clear();
621 }
622 }
623}
624
626{
627 std::lock_guard<std::mutex> lock(m_state_mutex);
628 if (m_state_callback) {
629 m_state_callback(shared_from_this(), new_state);
630 }
631}
632
634{
635 auto state = get_processing_state();
636 return has_data() && (state == ProcessingState::READY || state == ProcessingState::PROCESSED);
637}
638
647
649{
650 auto processor = std::make_shared<ContiguousAccessProcessor>();
651 set_default_processor(processor);
652}
653
662
663void SoundStreamContainer::set_default_processor(const std::shared_ptr<DataProcessor>& processor)
664{
665 auto old_processor = m_default_processor;
666 m_default_processor = processor;
667
668 if (old_processor) {
669 old_processor->on_detach(shared_from_this());
670 }
671
672 if (processor) {
673 processor->on_attach(shared_from_this());
674 }
675}
676
677std::shared_ptr<DataProcessor> SoundStreamContainer::get_default_processor() const
678{
679 std::lock_guard<std::mutex> lock(m_state_mutex);
680 return m_default_processor;
681}
682
683uint32_t SoundStreamContainer::register_dimension_reader(uint32_t dimension_index)
684{
685 std::lock_guard<std::mutex> lock(m_reader_mutex);
686 m_active_readers[dimension_index]++;
687
688 uint32_t reader_id = m_dimension_to_next_reader_id[dimension_index]++;
689 m_reader_consumed_dimensions[reader_id] = std::unordered_set<uint32_t>();
690
691 return reader_id;
692}
693
695{
696 std::lock_guard<std::mutex> lock(m_reader_mutex);
697 if (auto it = m_active_readers.find(dimension_index); it != m_active_readers.end()) {
698 auto& [dim, count] = *it;
699 if (--count <= 0) {
700 m_active_readers.erase(it);
701 m_dimension_to_next_reader_id.erase(dimension_index);
702 }
703 }
704}
705
707{
708 std::lock_guard<std::mutex> lock(m_reader_mutex);
709 return !m_active_readers.empty();
710}
711
712void SoundStreamContainer::mark_dimension_consumed(uint32_t dimension_index, uint32_t reader_id)
713{
714 if (m_reader_consumed_dimensions.contains(reader_id)) {
715 m_reader_consumed_dimensions[reader_id].insert(dimension_index);
716 } else {
717
718 MF_WARN(Journal::Component::Kakshya, Journal::Context::ContainerProcessing, "Attempted to mark dimension {} as consumed for unknown reader_id {}. This may indicate the reader was not registered or has already been unregistered. Please ensure readers are properly registered before marking dimensions as consumed.",
719 dimension_index, reader_id);
720 }
721}
722
724{
725 std::lock_guard<std::mutex> lock(m_reader_mutex);
726
727 return std::ranges::all_of(m_active_readers, [this](const auto& dim_reader_pair) {
728 const auto& [dim, expected_count] = dim_reader_pair;
729
730 auto actual_count = std::ranges::count_if(m_reader_consumed_dimensions,
731 [dim](const auto& reader_dims_pair) {
732 return reader_dims_pair.second.contains(dim);
733 });
734
735 return actual_count >= expected_count;
736 });
737}
738
740{
741 std::lock_guard<std::mutex> lock(m_reader_mutex);
742
743 std::ranges::for_each(m_reader_consumed_dimensions,
744 [](auto& reader_dims_pair) {
745 reader_dims_pair.second.clear();
746 });
747}
748
750{
751 if (new_layout == m_structure.memory_layout) {
752 return;
753 }
754
756 m_structure.memory_layout = new_layout;
758 return;
759 }
760
762 m_structure.memory_layout = new_layout;
764 return;
765 }
766
767 auto current_span = convert_variant<double>(m_data[0]);
768 std::vector<double> current_data(current_span.begin(), current_span.end());
769
770 auto channels = deinterleave_channels<double>(
771 std::span<const double>(current_data.data(), current_data.size()),
773
774 std::vector<double> reorganized_data;
775 if (new_layout == MemoryLayout::ROW_MAJOR) {
776 reorganized_data = interleave_channels(channels);
777 } else {
778 reorganized_data.reserve(current_data.size());
779 for (const auto& channel : channels) {
780 reorganized_data.insert(reorganized_data.end(), channel.begin(), channel.end());
781 }
782 }
783
784 m_data[0] = DataVariant(std::move(reorganized_data));
785
787 m_double_extraction_dirty.store(true, std::memory_order_release);
788
789 m_structure.memory_layout = new_layout;
791}
792
793std::span<const double> SoundStreamContainer::get_data_as_double() const
794{
795 if (!m_double_extraction_dirty.load(std::memory_order_acquire)) {
796 return { m_cached_ext_buffer };
797 }
798
800 if (m_data.empty())
801 return {};
802
803 std::shared_lock data_lock(m_data_mutex);
804 auto span = convert_variant<double>(m_data[0]);
805 return { span.data(), span.size() };
806 }
807
808 const auto& spans = get_span_cache();
809
810 auto channels = spans
811 | std::views::transform([](const auto& span) {
812 return std::vector<double>(span.begin(), span.end());
813 })
814 | std::ranges::to<std::vector>();
815
817 m_double_extraction_dirty.store(false, std::memory_order_release);
818
819 return { m_cached_ext_buffer };
820}
821
823{
824 if (channel >= m_data.size()) {
825 error<std::out_of_range>(
828 std::source_location::current(),
829 "Channel index {} out of range (max {})",
830 channel, m_data.size() - 1);
831 }
833}
834
836{
837 std::vector<DataAccess> result;
838 result.reserve(m_data.size());
839
840 for (auto& i : m_data) {
841 result.emplace_back(i, m_structure.dimensions, m_structure.modality);
842 }
843 return result;
844}
845
846const std::vector<std::span<double>>& SoundStreamContainer::get_span_cache() const
847{
848 if (!m_span_cache_dirty.load(std::memory_order_acquire) && m_span_cache.has_value()) {
849 return *m_span_cache;
850 }
851
852 std::unique_lock data_lock(m_data_mutex);
853
854 if (!m_span_cache_dirty.load(std::memory_order_acquire) && m_span_cache.has_value()) {
855 return *m_span_cache;
856 }
857
858 auto spans = m_data
859 | std::views::transform([](auto& variant) {
860 return convert_variant<double>(const_cast<DataVariant&>(variant));
861 })
862 | std::ranges::to<std::vector>();
863
864 m_span_cache = std::move(spans);
865 m_span_cache_dirty.store(false, std::memory_order_release);
866
867 return *m_span_cache;
868}
869
871{
872 m_span_cache_dirty.store(true, std::memory_order_release);
873}
874
875}
#define MF_WARN(comp, ctx,...)
Type-erased accessor for NDData with semantic view construction.
void reset_read_position() override
Reset read position to the beginning of the stream.
std::atomic< ProcessingState > m_processing_state
std::span< const double > get_data_as_double() const
Get the audio data as a specific type.
std::optional< std::vector< std::span< double > > > m_span_cache
void set_memory_layout(MemoryLayout layout) override
Set the memory layout for this container.
std::unordered_map< std::string, RegionGroup > m_region_groups
double position_to_time(uint64_t position) const override
Convert from position units (e.g., frame/sample index) to time (seconds).
Region get_loop_region() const override
Get the current loop region.
std::unordered_map< uint32_t, int > m_active_readers
void unload_region(const Region &region) override
Unload a region from memory.
std::unordered_set< uint32_t > m_consumed_dimensions
uint32_t register_dimension_reader(uint32_t dimension_index) override
Register a reader for a specific dimension.
bool has_active_readers() const override
Check if any dimensions currently have active readers.
void set_looping(bool enable) override
Enable or disable looping behavior for the stream.
uint64_t get_total_elements() const override
Get the total number of elements in the container.
std::vector< std::atomic< uint64_t > > m_read_position
const std::vector< std::span< double > > & get_span_cache() const
Get the cached spans for each channel, recomputing if dirty.
bool is_region_loaded(const Region &region) const override
Check if a region is loaded in memory.
bool all_dimensions_consumed() const override
Check if all active dimensions have been consumed in this cycle.
uint64_t peek_sequential(std::span< double > output, uint64_t count, uint64_t offset=0) const override
Peek at data without advancing the read position.
std::vector< DataVariant > get_region_data(const Region &region) const override
Get data for a specific region.
uint64_t time_to_position(double time) const override
Convert from time (seconds) to position units (e.g., frame/sample index).
const void * get_raw_data() const override
Get a raw pointer to the underlying data storage.
void advance_read_position(const std::vector< uint64_t > &frames) override
Advance the read position by a specified amount.
void unregister_dimension_reader(uint32_t dimension_index) override
Unregister a reader for a specific dimension.
void reorganize_data_layout(MemoryLayout new_layout)
void update_processing_state(ProcessingState new_state) override
Update the processing state of the container.
void mark_ready_for_processing(bool ready) override
Mark the container as ready or not ready for processing.
const RegionGroup & get_region_group(const std::string &name) const override
Get a region group by name.
void set_read_position(const std::vector< uint64_t > &position) override
Set the current read position in the primary temporal dimension per channel.
std::vector< uint64_t > linear_index_to_coordinates(uint64_t linear_index) const override
Convert linear index to coordinates based on current memory layout.
void notify_state_change(ProcessingState new_state)
bool has_data() const override
Check if the container currently holds any data.
const std::vector< uint64_t > & get_read_position() const override
Get the current read position.
std::span< const double > get_frame(uint64_t frame_index) const override
Get a single frame of data efficiently.
void set_default_processor(const std::shared_ptr< DataProcessor > &processor) override
Set the default data processor for this container.
void load_region(const Region &region) override
Load a region into memory.
void add_region_group(const RegionGroup &group) override
Add a named group of regions to the container.
ProcessingState get_processing_state() const override
Get the current processing state of the container.
void create_default_processor() override
Create and configure a default processor for this container.
void invalidate_span_cache()
Invalidate the span cache when data or layout changes.
bool is_ready_for_processing() const override
Check if the container is ready for processing.
uint64_t read_sequential(std::span< double > output, uint64_t count) override
Read data sequentially from the current position.
DataAccess channel_data(size_t channel) override
Get channel data with semantic interpretation.
void get_frames(std::span< double > output, uint64_t start_frame, uint64_t num_frames) const override
Get multiple frames efficiently.
std::vector< DataDimension > get_dimensions() const override
Get the dimensions describing the structure of the data.
std::unordered_map< std::string, RegionGroup > get_all_region_groups() const override
Get all region groups in the container.
bool is_at_end() const override
Check if read position has reached the end of the stream.
std::vector< DataAccess > all_channel_data() override
Get all channel data as accessors.
std::vector< DataVariant > get_segments_data(const std::vector< RegionSegment > &segment) const override
Get data for multiple region segments efficiently.
uint64_t get_num_frames() const override
Get the number of frames in the primary (temporal) dimension.
std::unordered_map< uint32_t, std::unordered_set< uint32_t > > m_reader_consumed_dimensions
std::shared_ptr< DataProcessor > m_default_processor
void remove_region_group(const std::string &name) override
Remove a region group by name.
void lock() override
Acquire a lock for thread-safe access.
void set_value_at(const std::vector< uint64_t > &coordinates, double value) override
Set a single value at the specified coordinates.
std::vector< DataVariant > get_region_group_data(const RegionGroup &group) const override
Get data for multiple regions efficiently.
void clear() override
Clear all data in the container.
void process_default() override
Process the container's data using the default processor.
void set_region_data(const Region &region, const std::vector< DataVariant > &data) override
Set data for a specific region.
void update_read_position_for_channel(size_t channel, uint64_t frame) override
Update the read position for a specific channel.
uint64_t coordinates_to_linear_index(const std::vector< uint64_t > &coordinates) const override
Convert coordinates to linear index based on current memory layout.
SoundStreamContainer(uint32_t sample_rate=48000, uint32_t num_channels=2, uint64_t initial_capacity=0, bool circular_mode=false)
Construct a SoundStreamContainer with specified parameters.
void set_loop_region(const Region &region) override
Set the loop region using a Region.
std::vector< uint64_t > get_remaining_frames() const override
Get the number of remaining frames from the current position, per channel.
void mark_dimension_consumed(uint32_t dimension_index, uint32_t reader_id) override
Mark a dimension as consumed for the current processing cycle.
std::function< void(std::shared_ptr< SignalSourceContainer >, ProcessingState)> m_state_callback
bool is_ready() const override
Check if the stream is ready for reading.
std::unordered_map< uint32_t, uint32_t > m_dimension_to_next_reader_id
double get_value_at(const std::vector< uint64_t > &coordinates) const override
Get a single value at the specified coordinates.
std::shared_ptr< DataProcessor > get_default_processor() const override
Get the current default data processor.
uint64_t get_frame_size() const override
Get the number of elements that constitute one "frame".
@ ContainerProcessing
Container operations (Kakshya - file/stream/region processing)
@ Runtime
General runtime operations (default fallback)
@ Kakshya
Containers[Signalsource, Stream, File], Regions, DataProcessors.
ProcessingState
Represents the current processing lifecycle state of a container.
@ READY
Container has data loaded and is ready for processing.
@ IDLE
Container is inactive with no data or not ready for processing.
@ PROCESSING
Container is actively being processed.
@ PROCESSED
Container has completed processing and results are available.
uint64_t coordinates_to_linear(const std::vector< uint64_t > &coords, const std::vector< DataDimension > &dimensions)
Convert N-dimensional coordinates to a linear index for interleaved data.
Definition CoordUtils.cpp:6
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
Definition NDData.hpp:73
DataModality
Data modality types for cross-modal analysis.
Definition NDData.hpp:78
@ AUDIO_MULTICHANNEL
Multi-channel audio.
std::vector< uint64_t > advance_position(const std::vector< uint64_t > &current_positions, uint64_t frames_to_advance, const ContainerDataStructure &structure, bool looping_enabled, const Region &loop_region)
Advance current positions by a number of frames, with optional looping.
std::vector< uint64_t > linear_to_coordinates(uint64_t index, const std::vector< DataDimension > &dimensions)
Convert a linear index to N-dimensional coordinates for interleaved data.
MemoryLayout
Memory layout for multi-dimensional data.
Definition NDData.hpp:36
@ ROW_MAJOR
C/C++ style (last dimension varies fastest)
double position_to_time(uint64_t position, double sample_rate)
Convert position (samples/frames) to time (seconds) given a sample rate.
OrganizationStrategy
Data organization strategy for multi-channel/multi-frame data.
Definition NDData.hpp:46
@ PLANAR
Separate DataVariant per logical unit (LLL...RRR for stereo)
@ INTERLEAVED
Single DataVariant with interleaved data (LRLRLR for stereo)
std::vector< T > interleave_channels(const std::vector< std::vector< T > > &channels)
Interleave multiple channels of data into a single vector.
uint64_t time_to_position(double time, double sample_rate)
Convert time (seconds) to position (samples/frames) given a sample rate.
std::vector< uint64_t > wrap_position_with_loop(const std::vector< uint64_t > &positions, const Region &loop_region, bool looping_enabled)
Wrap a position within loop boundaries if looping is enabled.
static uint64_t get_total_elements(const std::vector< DataDimension > &dimensions)
Get total elements across all dimensions.
Container structure for consistent dimension ordering.
static std::vector< DataDimension > create_dimensions(DataModality modality, const std::vector< uint64_t > &shape, MemoryLayout layout=MemoryLayout::ROW_MAJOR)
Create dimension descriptors for a data modality.
Definition NDData.cpp:107
std::string name
Descriptive name of the group.
Organizes related signal regions into a categorized collection.
static Region time_span(uint64_t start_frame, uint64_t end_frame, const std::string &label="", const std::any &extra_data={})
Create a Region representing a time span (e.g., a segment of frames).
Definition Region.hpp:135
std::vector< uint64_t > end_coordinates
Ending frame index (inclusive)
Definition Region.hpp:72
std::vector< uint64_t > start_coordinates
Starting frame index (inclusive)
Definition Region.hpp:69
Represents a point or span in N-dimensional space.
Definition Region.hpp:67