13 uint64_t initial_capacity,
bool circular_mode)
14 : m_sample_rate(sample_rate)
15 , m_num_channels(num_channels)
16 , m_num_frames(initial_capacity)
17 , m_circular_mode(circular_mode)
26 m_data = std::views::iota(0U, num_channels)
27 | std::views::transform([](
auto) {
return DataVariant(std::vector<double> {}); })
28 | std::ranges::to<std::vector>();
37 std::vector<uint64_t> shape;
94 std::span<const double> const_span(spans[0].data(), spans[0].size());
99 auto const_spans = spans | std::views::transform([](
const auto& span) {
100 return std::span<const double>(span.data(), span.size());
103 auto extracted_channels = extract_region_data<double>(
104 std::vector<std::span<const double>>(const_spans.begin(), const_spans.end()),
107 return extracted_channels
108 | std::views::transform([](
auto&& channel) {
109 return DataVariant(std::forward<
decltype(channel)>(channel));
111 | std::ranges::to<std::vector>();
117 if (
m_data.empty() || data.empty())
122 auto dest_span = convert_variant<double>(
m_data[0]);
123 auto src_span = convert_variant<double>(data[0]);
125 set_or_update_region_data<double>(
129 size_t channels_to_update = std::min(
m_data.size(), data.size());
132 for (
size_t i = 0; i < channels_to_update; ++i) {
133 auto dest_span = convert_variant<double>(
m_data[i]);
134 auto src_span = convert_variant<double>(data[i]);
136 set_or_update_region_data<double>(
152 auto const_spans = spans | std::views::transform([](
const auto& span) {
153 return std::span<const double>(span.data(), span.size());
156 auto extracted_channels = extract_group_data<double>(
157 std::vector<std::span<const double>>(const_spans.begin(), const_spans.end()),
160 return extracted_channels
161 | std::views::transform([](
auto&& channel) {
162 return DataVariant(std::forward<
decltype(channel)>(channel));
164 | std::ranges::to<std::vector>();
171 if (spans.empty() || segments.empty())
174 auto const_spans = spans | std::views::transform([](
const auto& span) {
175 return std::span<const double>(span.data(), span.size());
178 auto extracted_channels = extract_segments_data<double>(
180 std::vector<std::span<const double>>(const_spans.begin(), const_spans.end()),
183 return extracted_channels
184 | std::views::transform([](
auto&& channel) {
185 return DataVariant(std::forward<
decltype(channel)>(channel));
187 | std::ranges::to<std::vector>();
202 auto frame_span = extract_frame<double>(spans[0], frame_index,
m_num_channels);
203 return { frame_span.data(), frame_span.size() };
206 static thread_local std::vector<double> frame_buffer;
207 auto frame_span = extract_frame<double>(spans, frame_index, frame_buffer);
208 return { frame_span.data(), frame_span.size() };
214 std::ranges::fill(output, 0.0);
218 uint64_t frames_to_copy = std::min<size_t>(num_frames,
m_num_frames - start_frame);
219 uint64_t elements_to_copy = std::min(
221 static_cast<uint64_t
>(output.size()));
226 if (offset < interleaved_data.size()) {
227 uint64_t available = std::min<size_t>(elements_to_copy, interleaved_data.size() - offset);
228 std::copy_n(interleaved_data.begin() + offset, available, output.begin());
230 if (available < output.size()) {
231 std::fill(output.begin() + available, output.end(), 0.0);
234 std::ranges::fill(output, 0.0);
240 if (!
has_data() || coordinates.size() != 2) {
244 uint64_t frame = coordinates[0];
245 uint64_t channel = coordinates[1];
258 return (linear_index < spans[0].size()) ? spans[0][linear_index] : 0.0;
261 if (channel >= spans.size())
264 return (frame < spans[channel].size()) ? spans[channel][frame] : 0.0;
269 if (coordinates.size() != 2)
272 uint64_t frame = coordinates[0];
273 uint64_t channel = coordinates[1];
286 if (linear_index < spans[0].size()) {
287 const_cast<std::span<double>&
>(spans[0])[linear_index] = value;
291 if (channel >= spans.size())
294 if (frame < spans[channel].size()) {
295 const_cast<std::span<double>&
>(spans[channel])[frame] = value;
317 std::ranges::for_each(
m_data, [](
auto& vec) {
318 std::visit([](
auto& v) { v.clear(); }, vec);
322 std::visit([](
auto& v) { v.clear(); }, vec);
340 return span.empty() ? nullptr : span.data();
347 return std::ranges::any_of(
m_data, [](
const auto& variant) {
348 return std::visit([](
const auto& vec) {
return !vec.empty(); }, variant);
402 for (
size_t i = 0; i < wrapped_pos.size(); ++i) {
416 static thread_local std::vector<uint64_t> pos_cache;
438 for (
size_t i = 0; i < new_pos.size() && i <
m_read_position.size(); ++i) {
459 std::vector<uint64_t> start_pos;
468 m_read_position = std::vector<std::atomic<uint64_t>>(start_pos.size());
471 for (
size_t i = 0; i < start_pos.size(); ++i) {
504 bool outside_loop =
false;
534 for (
auto& frame : frames) {
535 frame = std::numeric_limits<uint64_t>::max();
540 for (
size_t i = 0; i < frames.size(); i++) {
552 std::vector<uint64_t> advance_amount(
m_num_channels, frames_to_advance);
561 if (interleaved_span.empty() || output.empty())
565 start_frame += offset;
566 uint64_t elements_to_read = std::min<uint64_t>(count,
static_cast<uint64_t
>(output.size()));
570 if (linear_start >= interleaved_span.size()) {
571 std::ranges::fill(output, 0.0);
574 auto view = interleaved_span
575 | std::views::drop(linear_start)
576 | std::views::take(elements_to_read);
578 auto copied = std::ranges::copy(view, output.begin());
579 std::ranges::fill(output.subspan(copied.out - output.begin()), 0.0);
580 return static_cast<uint64_t
>(copied.out - output.begin());
584 std::ranges::fill(output, 0.0);
590 uint64_t loop_length_frames = loop_end_frame - loop_start_frame + 1;
592 std::ranges::for_each(
593 std::views::iota(0UZ, elements_to_read),
599 uint64_t wrapped_frame = ((frame_pos - loop_start_frame) % loop_length_frames) + loop_start_frame;
600 uint64_t wrapped_element = wrapped_frame *
m_num_channels + channel_offset;
602 output[i] = (wrapped_element < interleaved_span.size()) ? interleaved_span[wrapped_element] : 0.0;
605 if (elements_to_read < output.size()) {
606 std::ranges::fill(output.subspan(elements_to_read), 0.0);
608 return elements_to_read;
615 if (old_state != new_state) {
650 auto processor = std::make_shared<ContiguousAccessProcessor>();
669 old_processor->on_detach(shared_from_this());
673 processor->on_attach(shared_from_this());
698 auto& [dim, count] = *it;
719 dimension_index, reader_id);
727 return std::ranges::all_of(
m_active_readers, [
this](
const auto& dim_reader_pair) {
728 const auto& [dim, expected_count] = dim_reader_pair;
731 [dim](
const auto& reader_dims_pair) {
732 return reader_dims_pair.second.contains(dim);
735 return actual_count >= expected_count;
744 [](
auto& reader_dims_pair) {
745 reader_dims_pair.second.clear();
767 auto current_span = convert_variant<double>(
m_data[0]);
768 std::vector<double> current_data(current_span.begin(), current_span.end());
770 auto channels = deinterleave_channels<double>(
771 std::span<const double>(current_data.data(), current_data.size()),
774 std::vector<double> reorganized_data;
778 reorganized_data.reserve(current_data.size());
779 for (
const auto& channel : channels) {
780 reorganized_data.insert(reorganized_data.end(), channel.begin(), channel.end());
804 auto span = convert_variant<double>(
m_data[0]);
805 return { span.data(), span.size() };
810 auto channels = spans
811 | std::views::transform([](
const auto& span) {
812 return std::vector<double>(span.begin(), span.end());
814 | std::ranges::to<std::vector>();
824 if (channel >=
m_data.size()) {
825 error<std::out_of_range>(
828 std::source_location::current(),
829 "Channel index {} out of range (max {})",
830 channel,
m_data.size() - 1);
837 std::vector<DataAccess> result;
838 result.reserve(
m_data.size());
859 | std::views::transform([](
auto& variant) {
860 return convert_variant<double>(
const_cast<DataVariant&
>(variant));
862 | std::ranges::to<std::vector>();
#define MF_WARN(comp, ctx,...)
Type-erased accessor for NDData with semantic view construction.
void reset_read_position() override
Reset read position to the beginning of the stream.
std::atomic< ProcessingState > m_processing_state
std::span< const double > get_data_as_double() const
Get the audio data as a specific type.
std::optional< std::vector< std::span< double > > > m_span_cache
void set_memory_layout(MemoryLayout layout) override
Set the memory layout for this container.
std::unordered_map< std::string, RegionGroup > m_region_groups
double position_to_time(uint64_t position) const override
Convert from position units (e.g., frame/sample index) to time (seconds).
Region get_loop_region() const override
Get the current loop region.
std::unordered_map< uint32_t, int > m_active_readers
void unload_region(const Region ®ion) override
Unload a region from memory.
std::unordered_set< uint32_t > m_consumed_dimensions
uint32_t register_dimension_reader(uint32_t dimension_index) override
Register a reader for a specific dimension.
bool has_active_readers() const override
Check if any dimensions currently have active readers.
void set_looping(bool enable) override
Enable or disable looping behavior for the stream.
uint64_t get_total_elements() const override
Get the total number of elements in the container.
std::vector< std::atomic< uint64_t > > m_read_position
const std::vector< std::span< double > > & get_span_cache() const
Get the cached spans for each channel, recomputing if dirty.
bool is_region_loaded(const Region ®ion) const override
Check if a region is loaded in memory.
bool all_dimensions_consumed() const override
Check if all active dimensions have been consumed in this cycle.
std::vector< double > m_cached_ext_buffer
uint64_t peek_sequential(std::span< double > output, uint64_t count, uint64_t offset=0) const override
Peek at data without advancing the read position.
std::vector< DataVariant > get_region_data(const Region ®ion) const override
Get data for a specific region.
std::atomic< bool > m_double_extraction_dirty
uint64_t time_to_position(double time) const override
Convert from time (seconds) to position units (e.g., frame/sample index).
const void * get_raw_data() const override
Get a raw pointer to the underlying data storage.
std::vector< DataVariant > m_processed_data
std::shared_mutex m_data_mutex
void advance_read_position(const std::vector< uint64_t > &frames) override
Advance the read position by a specified amount.
void unregister_dimension_reader(uint32_t dimension_index) override
Unregister a reader for a specific dimension.
void reorganize_data_layout(MemoryLayout new_layout)
void update_processing_state(ProcessingState new_state) override
Update the processing state of the container.
void mark_ready_for_processing(bool ready) override
Mark the container as ready or not ready for processing.
const RegionGroup & get_region_group(const std::string &name) const override
Get a region group by name.
std::vector< DataVariant > m_data
void set_read_position(const std::vector< uint64_t > &position) override
Set the current read position in the primary temporal dimension per channel.
std::vector< uint64_t > linear_index_to_coordinates(uint64_t linear_index) const override
Convert linear index to coordinates based on current memory layout.
void notify_state_change(ProcessingState new_state)
bool has_data() const override
Check if the container currently holds any data.
std::mutex m_reader_mutex
const std::vector< uint64_t > & get_read_position() const override
Get the current read position.
std::span< const double > get_frame(uint64_t frame_index) const override
Get a single frame of data efficiently.
void set_default_processor(const std::shared_ptr< DataProcessor > &processor) override
Set the default data processor for this container.
void load_region(const Region ®ion) override
Load a region into memory.
void add_region_group(const RegionGroup &group) override
Add a named group of regions to the container.
ProcessingState get_processing_state() const override
Get the current processing state of the container.
void create_default_processor() override
Create and configure a default processor for this container.
void invalidate_span_cache()
Invalidate the span cache when data or layout changes.
bool is_ready_for_processing() const override
Check if the container is ready for processing.
uint64_t m_circular_write_position
virtual void clear_all_consumption()
uint64_t read_sequential(std::span< double > output, uint64_t count) override
Read data sequentially from the current position.
DataAccess channel_data(size_t channel) override
Get channel data with semantic interpretation.
void get_frames(std::span< double > output, uint64_t start_frame, uint64_t num_frames) const override
Get multiple frames efficiently.
std::vector< DataDimension > get_dimensions() const override
Get the dimensions describing the structure of the data.
std::unordered_map< std::string, RegionGroup > get_all_region_groups() const override
Get all region groups in the container.
bool is_at_end() const override
Check if read position has reached the end of the stream.
std::vector< DataAccess > all_channel_data() override
Get all channel data as accessors.
std::vector< DataVariant > get_segments_data(const std::vector< RegionSegment > &segment) const override
Get data for multiple region segments efficiently.
uint64_t get_num_frames() const override
Get the number of frames in the primary (temporal) dimension.
std::unordered_map< uint32_t, std::unordered_set< uint32_t > > m_reader_consumed_dimensions
std::shared_ptr< DataProcessor > m_default_processor
void remove_region_group(const std::string &name) override
Remove a region group by name.
void lock() override
Acquire a lock for thread-safe access.
void set_value_at(const std::vector< uint64_t > &coordinates, double value) override
Set a single value at the specified coordinates.
std::vector< DataVariant > get_region_group_data(const RegionGroup &group) const override
Get data for multiple regions efficiently.
void clear() override
Clear all data in the container.
void process_default() override
Process the container's data using the default processor.
void set_region_data(const Region ®ion, const std::vector< DataVariant > &data) override
Set data for a specific region.
std::atomic< bool > m_span_cache_dirty
void update_read_position_for_channel(size_t channel, uint64_t frame) override
Update the read position for a specific channel.
uint64_t coordinates_to_linear_index(const std::vector< uint64_t > &coordinates) const override
Convert coordinates to linear index based on current memory layout.
SoundStreamContainer(uint32_t sample_rate=48000, uint32_t num_channels=2, uint64_t initial_capacity=0, bool circular_mode=false)
Construct a SoundStreamContainer with specified parameters.
void set_loop_region(const Region ®ion) override
Set the loop region using a Region.
std::vector< uint64_t > get_remaining_frames() const override
Get the number of remaining frames from the current position, per channel.
void mark_dimension_consumed(uint32_t dimension_index, uint32_t reader_id) override
Mark a dimension as consumed for the current processing cycle.
std::function< void(std::shared_ptr< SignalSourceContainer >, ProcessingState)> m_state_callback
ContainerDataStructure m_structure
bool is_ready() const override
Check if the stream is ready for reading.
std::unordered_map< uint32_t, uint32_t > m_dimension_to_next_reader_id
double get_value_at(const std::vector< uint64_t > &coordinates) const override
Get a single value at the specified coordinates.
std::shared_ptr< DataProcessor > get_default_processor() const override
Get the current default data processor.
uint64_t get_frame_size() const override
Get the number of elements that constitute one "frame".
@ ContainerProcessing
Container operations (Kakshya - file/stream/region processing)
@ Runtime
General runtime operations (default fallback)
@ Kakshya
Containers[Signalsource, Stream, File], Regions, DataProcessors.
ProcessingState
Represents the current processing lifecycle state of a container.
@ READY
Container has data loaded and is ready for processing.
@ IDLE
Container is inactive with no data or not ready for processing.
@ PROCESSING
Container is actively being processed.
@ PROCESSED
Container has completed processing and results are available.
uint64_t coordinates_to_linear(const std::vector< uint64_t > &coords, const std::vector< DataDimension > &dimensions)
Convert N-dimensional coordinates to a linear index for interleaved data.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
DataModality
Data modality types for cross-modal analysis.
@ AUDIO_MULTICHANNEL
Multi-channel audio.
@ AUDIO_1D
1D audio signal
std::vector< uint64_t > advance_position(const std::vector< uint64_t > ¤t_positions, uint64_t frames_to_advance, const ContainerDataStructure &structure, bool looping_enabled, const Region &loop_region)
Advance current positions by a number of frames, with optional looping.
std::vector< uint64_t > linear_to_coordinates(uint64_t index, const std::vector< DataDimension > &dimensions)
Convert a linear index to N-dimensional coordinates for interleaved data.
MemoryLayout
Memory layout for multi-dimensional data.
@ ROW_MAJOR
C/C++ style (last dimension varies fastest)
double position_to_time(uint64_t position, double sample_rate)
Convert position (samples/frames) to time (seconds) given a sample rate.
OrganizationStrategy
Data organization strategy for multi-channel/multi-frame data.
@ PLANAR
Separate DataVariant per logical unit (LLL...RRR for stereo)
@ INTERLEAVED
Single DataVariant with interleaved data (LRLRLR for stereo)
std::vector< T > interleave_channels(const std::vector< std::vector< T > > &channels)
Interleave multiple channels of data into a single vector.
uint64_t time_to_position(double time, double sample_rate)
Convert time (seconds) to position (samples/frames) given a sample rate.
std::vector< uint64_t > wrap_position_with_loop(const std::vector< uint64_t > &positions, const Region &loop_region, bool looping_enabled)
Wrap a position within loop boundaries if looping is enabled.
std::vector< DataDimension > dimensions
std::optional< size_t > channel_dims
OrganizationStrategy organization
std::optional< size_t > time_dims
static uint64_t get_total_elements(const std::vector< DataDimension > &dimensions)
Get total elements across all dimensions.
MemoryLayout memory_layout
Container structure for consistent dimension ordering.
static std::vector< DataDimension > create_dimensions(DataModality modality, const std::vector< uint64_t > &shape, MemoryLayout layout=MemoryLayout::ROW_MAJOR)
Create dimension descriptors for a data modality.
std::string name
Descriptive name of the group.
Organizes related signal regions into a categorized collection.
static Region time_span(uint64_t start_frame, uint64_t end_frame, const std::string &label="", const std::any &extra_data={})
Create a Region representing a time span (e.g., a segment of frames).
std::vector< uint64_t > end_coordinates
Ending frame index (inclusive)
std::vector< uint64_t > start_coordinates
Starting frame index (inclusive)
Represents a point or span in N-dimensional space.