MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
DynamicSoundStream.cpp
Go to the documentation of this file.
3
5
6namespace MayaFlux::Kakshya {
7
8DynamicSoundStream::DynamicSoundStream(uint32_t sample_rate, uint32_t num_channels)
9 : SoundStreamContainer(sample_rate, num_channels)
10 , m_auto_resize(true)
11{
12}
13
14uint64_t DynamicSoundStream::validate(std::vector<std::span<const double>>& data, uint64_t start_frame)
15{
16 if (data.empty() || data[0].empty()) {
17 return 0;
18 }
19
20 uint64_t num_frames {};
22 num_frames = data[0].size() / get_num_channels();
23
24 } else {
25 if (data.size() < get_num_channels()) {
26 MF_ERROR(Journal::Component::Kakshya, Journal::Context::ContainerProcessing, "Insufficient channel data for planar organization: expected {}, got {}", get_num_channels(), data.size());
27
28 return 0;
29 }
30 num_frames = data[0].size();
31
32 if (!std::ranges::all_of(data | std::views::drop(1) | std::views::take(get_num_channels() - 1),
33 [num_frames](const auto& span) { return span.size() == num_frames; })) {
34 MF_ERROR(Journal::Component::Kakshya, Journal::Context::ContainerProcessing, "Mismatched frame counts across channels in planar organization. Expected {} frames, but found a channel with a different frame count.", num_frames);
35 return 0;
36 }
37 }
38
39 if (num_frames == 0) {
40 MF_WARN(Journal::Component::Kakshya, Journal::Context::ContainerProcessing, "Attempting to write with insufficient data for complete frame. Data frames: {}, Channels: {}. Returning without writing.", num_frames, get_num_channels());
41 return 0;
42 }
43
44 if (uint64_t required_end_frame = start_frame + num_frames; m_auto_resize) {
45 if (required_end_frame > get_num_frames()) {
46 expand_to(required_end_frame);
47 }
48 } else {
49 uint64_t available_frames = (start_frame < get_num_frames()) ? (get_num_frames() - start_frame) : 0;
50
51 if (available_frames == 0) {
52 return 0;
53 }
54
55 if (num_frames > available_frames) {
56 num_frames = available_frames;
57 }
58
59 if (required_end_frame > m_num_frames) {
60 m_num_frames = required_end_frame;
62 }
63 }
64 return num_frames;
65}
66
67uint64_t DynamicSoundStream::validate_single_channel(std::span<const double> data, uint64_t start_frame, uint32_t channel)
68{
69 if (data.empty()) {
70 return 0;
71 }
72
73 if (channel >= get_num_channels()) {
74 MF_ERROR(Journal::Component::Kakshya, Journal::Context::ContainerProcessing, "Channel index {} exceeds available channels ({})", channel, get_num_channels());
75 return 0;
76 }
77
78 uint64_t num_frames = data.size();
79 uint64_t required_end_frame = start_frame + num_frames;
80
81 if (m_auto_resize) {
82 if (required_end_frame > get_num_frames()) {
83 expand_to(required_end_frame);
84 }
85 } else {
86 uint64_t available_frames = (start_frame < get_num_frames()) ? (get_num_frames() - start_frame) : 0;
87
88 if (available_frames == 0) {
89 return 0;
90 }
91
92 if (num_frames > available_frames) {
93 num_frames = available_frames;
94 }
95
96 if (required_end_frame > m_num_frames) {
97 m_num_frames = required_end_frame;
99 }
100 }
101
102 return num_frames;
103}
104
105uint64_t DynamicSoundStream::write_frames(std::vector<std::span<const double>> data, uint64_t start_frame)
106{
107 auto num_frames = validate(data, start_frame);
108
109 if (!num_frames)
110 return 0;
111
112 if (m_is_circular && start_frame + num_frames > m_circular_capacity) {
113 uint64_t frames_to_end = m_circular_capacity - start_frame;
114 uint64_t frames_from_start = num_frames - frames_to_end;
115
116 if (frames_to_end > 0) {
117 std::vector<std::span<const double>> first_part;
118 first_part.reserve(data.size());
119 for (const auto& span : data) {
120 first_part.emplace_back(span.subspan(0, frames_to_end));
121 }
122 write_frames(first_part, start_frame);
123 }
124
125 if (frames_from_start > 0) {
126 std::vector<std::span<const double>> second_part;
127 second_part.reserve(data.size());
128 for (const auto& span : data) {
129 second_part.emplace_back(span.subspan(frames_to_end, frames_from_start));
130 }
131 write_frames(second_part, 0);
132 }
133
134 return num_frames;
135 }
136
137 Region write_region {
138 { start_frame, 0 },
139 { start_frame + num_frames - 1, get_num_channels() - 1 }
140 };
141
142 std::vector<DataVariant> data_variants;
143
145 uint64_t samples_to_write = num_frames * get_num_channels();
146 data_variants.emplace_back(
147 std::vector<double>(data[0].begin(), data[0].begin() + samples_to_write));
148 } else {
149 data_variants = data
150 | std::views::take(get_num_channels())
151 | std::views::transform([num_frames](const auto& span) -> DataVariant {
152 return DataVariant(std::vector<double>(span.begin(), span.begin() + num_frames));
153 })
154 | std::ranges::to<std::vector>();
155 }
156
157 set_region_data(write_region, data_variants);
158
159 if (m_is_circular) {
161 }
162
163 return num_frames;
164}
165
166uint64_t DynamicSoundStream::write_frames(std::span<const double> data, uint64_t start_frame, uint32_t channel)
167{
168 auto num_frames = validate_single_channel(data, start_frame, channel);
169
170 if (!num_frames)
171 return 0;
172
173 if (m_is_circular && start_frame + num_frames > m_circular_capacity) {
174 uint64_t frames_to_end = m_circular_capacity - start_frame;
175 uint64_t frames_from_start = num_frames - frames_to_end;
176
177 if (frames_to_end > 0) {
178 write_frames(data.subspan(0, frames_to_end), start_frame, channel);
179 }
180
181 if (frames_from_start > 0) {
182 write_frames(data.subspan(frames_to_end, frames_from_start), 0, channel);
183 }
184
185 return num_frames;
186 }
187
189 std::unique_lock lock(m_data_mutex);
190
191 if (m_data.empty()) {
192 expand_to(start_frame + num_frames);
193 }
194
195 auto& interleaved_data = std::get<std::vector<double>>(m_data[0]);
196 uint32_t num_channels = get_num_channels();
197
198 for (uint64_t frame = 0; frame < num_frames; ++frame) {
199 uint64_t interleaved_index = (start_frame + frame) * num_channels + channel;
200 if (interleaved_index < interleaved_data.size()) {
201 interleaved_data[interleaved_index] = data[frame];
202 }
203 }
204 } else {
205 std::unique_lock lock(m_data_mutex);
206
207 if (channel >= m_data.size()) {
208 return 0;
209 }
210
211 auto dest_span = convert_variant<double>(m_data[channel]);
212
213 if (start_frame + num_frames > dest_span.size()) {
214 std::vector<double> current_data(dest_span.begin(), dest_span.end());
215 current_data.resize(start_frame + num_frames, 0.0);
216 m_data[channel] = DataVariant(std::move(current_data));
217 dest_span = convert_variant<double>(m_data[channel]);
218 }
219
220 std::copy(data.begin(), data.begin() + num_frames,
221 dest_span.begin() + start_frame);
222 }
223
224 if (m_is_circular) {
226 }
227
229 m_double_extraction_dirty.store(true, std::memory_order_release);
230
231 return num_frames;
232}
233
234std::span<const double> DynamicSoundStream::get_channel_frames(uint32_t channel, uint64_t start_frame, uint64_t num_frames) const
235{
236 if (channel >= get_num_channels()) {
237 return {};
238 }
239
241 MF_WARN(Journal::Component::Kakshya, Journal::Context::ContainerProcessing, "Direct span access not supported for interleaved data. Use get_frames() or Kakshya::extract_channel_data() instead.");
242 return {};
243 }
244
245 std::shared_lock lock(m_data_mutex);
246
247 if (channel >= m_data.size()) {
248 return {};
249 }
250
251 const auto& channel_data = std::get<std::vector<double>>(m_data[channel]);
252
253 if (start_frame >= channel_data.size()) {
254 return {};
255 }
256
257 uint64_t available_frames = channel_data.size() - start_frame;
258 uint64_t actual_frames = std::min(num_frames, available_frames);
259
260 return { channel_data.data() + start_frame, actual_frames };
261}
262
263void DynamicSoundStream::get_channel_frames(std::span<double> output, uint32_t channel, uint64_t start_frame) const
264{
265 if (channel >= get_num_channels() || output.empty()) {
266 return;
267 }
268
269 uint64_t num_frames = output.size();
270
272 std::shared_lock lock(m_data_mutex);
273
274 if (m_data.empty()) {
275 std::ranges::fill(output, 0.0);
276 return;
277 }
278
279 const auto& interleaved_data = std::get<std::vector<double>>(m_data[0]);
280 uint32_t num_channels = get_num_channels();
281
282 for (uint64_t frame = 0; frame < num_frames; ++frame) {
283 uint64_t interleaved_index = (start_frame + frame) * num_channels + channel;
284 if (interleaved_index < interleaved_data.size()) {
285 output[frame] = interleaved_data[interleaved_index];
286 } else {
287 output[frame] = 0.0;
288 }
289 }
290 } else {
291 std::shared_lock lock(m_data_mutex);
292
293 if (channel >= m_data.size()) {
294 std::ranges::fill(output, 0.0);
295 return;
296 }
297
298 const auto& channel_data = std::get<std::vector<double>>(m_data[channel]);
299
300 for (uint64_t frame = 0; frame < num_frames; ++frame) {
301 uint64_t data_index = start_frame + frame;
302 if (data_index < channel_data.size()) {
303 output[frame] = channel_data[data_index];
304 } else {
305 output[frame] = 0.0;
306 }
307 }
308 }
309}
310
311void DynamicSoundStream::ensure_capacity(uint64_t required_frames)
312{
313 if (uint64_t current_frames = get_total_elements() / get_num_channels();
314 required_frames > current_frames) {
315 expand_to(required_frames);
316 }
317}
318
320{
321 ensure_capacity(capacity);
322
323 Region circular_region {
324 { 0, 0 },
325 { capacity - 1, get_num_channels() - 1 }
326 };
327
328 set_loop_region(circular_region);
329 set_looping(true);
330
331 m_circular_capacity = capacity;
332 m_is_circular = true;
333}
334
336{
337 for (uint32_t i = 0; i < m_dynamic_slots.size(); ++i) {
338 if (!m_dynamic_slots[i]) {
339 m_dynamic_slots[i] = true;
340 m_dynamic_data[i].clear();
341 return i;
342 }
343 }
344
345 m_dynamic_data.emplace_back();
346 m_dynamic_slots.push_back(true);
347 return static_cast<uint32_t>(m_dynamic_slots.size() - 1);
348}
349
351{
352 if (index < m_dynamic_slots.size()) {
353 m_dynamic_slots[index] = false;
354 m_dynamic_data[index].clear();
355 }
356}
357
358std::vector<DataVariant>& DynamicSoundStream::get_dynamic_data(uint32_t index)
359{
360 return m_dynamic_data.at(index);
361}
362
363const std::vector<DataVariant>& DynamicSoundStream::get_dynamic_data(uint32_t index) const
364{
365 return m_dynamic_data.at(index);
366}
367
374
375void DynamicSoundStream::set_all_data(const std::vector<DataVariant>& data)
376{
377 std::unique_lock lock(m_data_mutex);
378 m_data.resize(data.size());
379
380 std::ranges::for_each(std::views::zip(data, m_data),
381 [](auto&& pair) {
382 auto&& [source, dest] = pair;
383 safe_copy_data_variant(source, dest);
384 });
385
386 m_num_frames = std::visit([](const auto& vec) {
387 return static_cast<uint64_t>(vec.size());
388 },
389 m_data[0]);
390
393 }
394
397}
398
400{
401 set_all_data(std::vector<DataVariant> { data });
402}
403
404void DynamicSoundStream::expand_to(uint64_t target_frames)
405{
406 uint64_t current_frames = get_total_elements() / get_num_channels();
407 uint64_t new_capacity = std::max(target_frames, current_frames * 2);
408
409 std::vector<DataVariant> new_data = create_expanded_data(new_capacity);
410 set_all_data(new_data);
411}
412
413std::vector<DataVariant> DynamicSoundStream::create_expanded_data(uint64_t new_frame_count)
414{
416 std::vector<DataVariant> expanded_data(1);
417
418 if (m_data.empty()) {
419 expanded_data[0] = DataVariant(std::vector<double>(new_frame_count * get_num_channels(), 0.0));
420 } else {
421 std::vector<double> current_data;
422 extract_from_variant(m_data[0], current_data);
423
424 std::vector<double> expanded_buffer(new_frame_count * get_num_channels(), 0.0);
425
426 std::ranges::copy_n(current_data.begin(),
427 std::min<size_t>(current_data.size(), expanded_buffer.size()),
428 expanded_buffer.begin());
429
430 expanded_data[0] = DataVariant(std::move(expanded_buffer));
431 }
432 return expanded_data;
433 }
434
435 return std::views::iota(0U, get_num_channels())
436 | std::views::transform([this, new_frame_count](uint32_t ch) -> DataVariant {
437 if (ch < m_data.size()) {
438 std::vector<double> current_channel_data;
439 extract_from_variant(m_data[ch], current_channel_data);
440
441 std::vector<double> expanded_channel(new_frame_count, 0.0);
442 std::ranges::copy_n(current_channel_data.begin(),
443 std::min<size_t>(current_channel_data.size(), expanded_channel.size()),
444 expanded_channel.begin());
445
446 return { std::move(expanded_channel) };
447 }
448
449 return { std::vector<double>(new_frame_count, 0.0) };
450 })
451 | std::ranges::to<std::vector>();
452}
453
454}
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
uint32_t allocate_dynamic_slot()
Allocate an independent processed data slot.
uint64_t validate_single_channel(std::span< const double > data, uint64_t start_frame=0, uint32_t channel=0)
bool m_auto_resize
Enable automatic capacity expansion.
DynamicSoundStream(uint32_t sample_rate=48000, uint32_t num_channels=2)
Construct a DynamicSoundStream with specified audio parameters.
std::vector< DataVariant > create_expanded_data(uint64_t new_frame_count)
bool m_is_circular
True when operating in circular buffer mode.
void enable_circular_buffer(uint64_t capacity)
Enable circular buffer mode with fixed capacity.
void ensure_capacity(uint64_t required_frames)
Pre-allocate capacity for the specified number of frames.
void set_all_data(const DataVariant &new_data)
void disable_circular_buffer()
Disable circular buffer mode and return to linear operation.
std::span< const double > get_channel_frames(uint32_t channel, uint64_t start_frame, uint64_t num_frames) const
Get the fixed capacity of the circular buffer if enabled.
uint64_t validate(std::vector< std::span< const double > > &data, uint64_t start_frame=0)
uint64_t write_frames(std::span< const double > data, uint64_t start_frame=0, uint32_t channel=0)
Write audio frame data to the container with automatic capacity management.
void release_dynamic_slot(uint32_t index)
Release a previously allocated dynamic slot, clearing its data.
std::vector< std::vector< DataVariant > > m_dynamic_data
uint64_t m_circular_capacity
Fixed capacity for circular mode.
std::vector< DataVariant > & get_dynamic_data(uint32_t index)
Access a dynamic processed data slot by index.
void set_looping(bool enable) override
Enable or disable looping behavior for the stream.
uint64_t get_total_elements() const override
Get the total number of elements in the container.
void update_processing_state(ProcessingState new_state) override
Update the processing state of the container.
void invalidate_span_cache()
Invalidate the span cache when data or layout changes.
DataAccess channel_data(size_t channel) override
Get channel data with semantic interpretation.
uint64_t get_num_frames() const override
Get the number of frames in the primary (temporal) dimension.
void lock() override
Acquire a lock for thread-safe access.
void set_region_data(const Region &region, const std::vector< DataVariant > &data) override
Set data for a specific region.
void set_loop_region(const Region &region) override
Set the loop region using a Region.
Concrete base implementation for streaming audio containers.
@ ContainerProcessing
Container operations (Kakshya - file/stream/region processing)
@ Kakshya
Containers[Signalsource, Stream, File], Regions, DataProcessors.
@ READY
Container has data loaded and is ready for processing.
std::span< T > extract_from_variant(const DataVariant &variant, std::vector< T > &storage, ComplexConversionStrategy strategy=ComplexConversionStrategy::MAGNITUDE)
Get typed span from DataVariant using concepts.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
Definition NDData.hpp:76
@ INTERLEAVED
Single DataVariant with interleaved data (LRLRLR for stereo)
void safe_copy_data_variant(const DataVariant &input, DataVariant &output)
Safely copy data from a DataVariant to another DataVariant, handling type conversion.
Definition DataUtils.cpp:34
Represents a point or span in N-dimensional space.
Definition Region.hpp:73