MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
ContainerBuffer.cpp
Go to the documentation of this file.
1#include "ContainerBuffer.hpp"
2
4
6
7namespace MayaFlux::Buffers {
8
9ContainerToBufferAdapter::ContainerToBufferAdapter(std::shared_ptr<Kakshya::StreamContainer> container)
10 : m_container(container)
11{
12 if (container) {
13 auto structure = container->get_structure();
14 m_num_channels = structure.get_channel_count();
15
16 container->register_state_change_callback(
17 [this](auto c, auto s) {
18 this->on_container_state_change(c, s);
19 });
20 }
21}
22
23void ContainerToBufferAdapter::processing_function(std::shared_ptr<Buffer> buffer)
24{
25 if (!m_container || !buffer) {
26 return;
27 }
28
29 if (m_container->is_at_end()) {
30 buffer->mark_for_removal();
31 return;
32 }
33
34 try {
35 auto state = m_container->get_processing_state();
36
38 if (m_update_flags) {
39 buffer->mark_for_removal();
40 }
41 return;
42 }
43
46 if (m_container->try_acquire_processing_token(m_source_channel)) {
47 m_container->process_default();
48 }
49 }
50 }
51
52 auto audio_buffer = std::dynamic_pointer_cast<AudioBuffer>(buffer);
53 auto& buffer_data = audio_buffer->get_data();
54 uint32_t buffer_size = audio_buffer->get_num_samples();
55
56 auto read_positions = m_container->get_read_position();
57 uint64_t current_pos = (m_source_channel < read_positions.size())
58 ? read_positions[m_source_channel]
59 : 0;
60
61 if (buffer_data.size() != buffer_size) {
62 buffer_data.resize(buffer_size);
63 }
64
65 extract_channel_data(buffer_data);
66
67 if (m_auto_advance) {
68 m_container->update_read_position_for_channel(m_source_channel, current_pos + buffer_size);
69 }
70
71 if (m_update_flags) {
72 buffer->mark_for_processing(true);
73 }
74
75 m_container->mark_dimension_consumed(m_source_channel, m_reader_id);
76
77 if (m_container->all_dimensions_consumed()) {
78 m_container->update_processing_state(Kakshya::ProcessingState::READY);
79 std::dynamic_pointer_cast<Kakshya::SoundFileContainer>(m_container)->clear_all_consumption();
80 m_container->reset_processing_token();
81 }
82
83 } catch (const std::exception& e) {
84 std::cerr << "Error in ContainerToBufferAdapter::process: " << e.what() << '\n';
85 }
86}
87
89{
90 if (!m_container) {
91 std::ranges::fill(output, 0.0);
92 return;
93 }
94
95 auto sound_container = std::dynamic_pointer_cast<Kakshya::SoundStreamContainer>(m_container);
96 if (!sound_container) {
97 std::ranges::fill(output, 0.0);
98 return;
99 }
100
101 auto& processed_data = sound_container->get_processed_data();
102 if (processed_data.empty()) {
103 std::ranges::fill(output, 0.0);
104 return;
105 }
106
107 auto structure = sound_container->get_structure();
108
109 if (structure.organization == Kakshya::OrganizationStrategy::INTERLEAVED) {
110 thread_local std::vector<double> temp_storage;
111 auto data_span = Kakshya::extract_from_variant<double>(processed_data[0], temp_storage);
112
113 auto num_channels = structure.get_channel_count();
114 auto samples_to_copy = std::min(static_cast<size_t>(output.size()),
115 static_cast<size_t>(data_span.size() / num_channels));
116
117 for (auto i : std::views::iota(0UZ, samples_to_copy)) {
118 auto interleaved_idx = i * num_channels + m_source_channel;
119 output[i] = (interleaved_idx < data_span.size()) ? data_span[interleaved_idx] : 0.0;
120 }
121
122 if (samples_to_copy < output.size()) {
123 std::ranges::fill(output | std::views::drop(samples_to_copy), 0.0);
124 }
125
126 } else {
127 if (m_source_channel >= processed_data.size()) {
128 std::ranges::fill(output, 0.0);
129 return;
130 }
131
132 thread_local std::vector<double> temp_storage;
133 auto channel_data_span = Kakshya::extract_from_variant<double>(processed_data[m_source_channel], temp_storage);
134
135 auto samples_to_copy = std::min(output.size(), channel_data_span.size());
136 std::ranges::copy_n(channel_data_span.begin(), samples_to_copy, output.begin());
137
138 if (samples_to_copy < output.size()) {
139 std::ranges::fill(output | std::views::drop(samples_to_copy), 0.0);
140 }
141 }
142}
143
144void ContainerToBufferAdapter::on_attach(std::shared_ptr<Buffer> buffer)
145{
146 if (!m_container || !buffer) {
147 return;
148 }
149
150 m_reader_id = m_container->register_dimension_reader(m_source_channel);
151
152 if (!m_container->is_ready_for_processing()) {
153 throw std::runtime_error("Container not ready for processing");
154 }
155
156 try {
157 auto& buffer_data = std::dynamic_pointer_cast<AudioBuffer>(buffer)->get_data();
158 uint32_t num_samples = std::dynamic_pointer_cast<AudioBuffer>(buffer)->get_num_samples();
159
160 extract_channel_data(buffer_data);
161
162 if (m_update_flags) {
163 buffer->mark_for_processing(true);
164 }
165
166 } catch (const std::exception& e) {
167 std::cerr << "Error pre-filling buffer: " << e.what() << '\n';
168 }
169}
170
171void ContainerToBufferAdapter::on_detach(std::shared_ptr<Buffer> buffer)
172{
173 if (m_container) {
174 m_container->unregister_state_change_callback();
175 m_container->unregister_dimension_reader(m_source_channel);
176 }
177}
178
180{
181 if (channel_index >= m_num_channels) {
182 throw std::out_of_range("Channel index exceeds container channel count");
183 }
184 m_source_channel = channel_index;
185}
186
187void ContainerToBufferAdapter::set_container(std::shared_ptr<Kakshya::StreamContainer> container)
188{
189 if (m_container) {
190 m_container->unregister_state_change_callback();
191 }
192
193 m_container = container;
194
195 if (container) {
196 auto structure = container->get_structure();
197 m_num_channels = structure.get_channel_count();
198
199 container->register_state_change_callback(
200 [this](std::shared_ptr<Kakshya::SignalSourceContainer> c, Kakshya::ProcessingState s) {
201 this->on_container_state_change(c, s);
202 });
203 }
204}
205
207 std::shared_ptr<Kakshya::SignalSourceContainer> container,
209{
210 switch (state) {
212 break;
213
215 std::cerr << "Container entered error state" << '\n';
216 break;
217
218 default:
219 break;
220 }
221}
222
223ContainerBuffer::ContainerBuffer(uint32_t channel_id, uint32_t num_samples,
224 std::shared_ptr<Kakshya::StreamContainer> container,
225 uint32_t source_channel)
226 : AudioBuffer(channel_id, num_samples)
227 , m_container(container)
228 , m_source_channel(source_channel)
229{
230 if (!m_container) {
231 throw std::invalid_argument("ContainerBuffer: container must not be null");
232 }
233
234 m_pending_adapter = std::make_shared<ContainerToBufferAdapter>(m_container);
235 std::dynamic_pointer_cast<ContainerToBufferAdapter>(m_pending_adapter)->set_source_channel(m_source_channel);
236
238}
239
248
249void ContainerBuffer::set_container(std::shared_ptr<Kakshya::StreamContainer> container)
250{
251 m_container = container;
252
253 if (auto adapter = std::dynamic_pointer_cast<ContainerToBufferAdapter>(m_default_processor)) {
254 adapter->set_container(container);
255 }
256
258}
259
261{
262 // Check if we can use zero-copy mode
263 // This would be possible if:
264 // 1. Container data is contiguous doubles
265 // 2. Channel is deinterleaved (column-major for audio)
266 // 3. Buffer size matches container frame size
267
268 if (!m_container) {
269 m_zero_copy_mode = false;
270 return;
271 }
272
273 auto dimensions = m_container->get_dimensions();
274 auto layout = m_container->get_memory_layout();
275
276 // For now, disable zero-copy until we have direct memory access APIs
277 m_zero_copy_mode = false;
278
279 // TODO: Implement zero-copy when container provides direct memory access
280}
281
282std::shared_ptr<BufferProcessor> ContainerBuffer::create_default_processor()
283{
284 if (m_pending_adapter) {
285 return m_pending_adapter;
286 }
287
288 auto adapter = std::make_shared<ContainerToBufferAdapter>(m_container);
289 adapter->set_source_channel(m_source_channel);
290 return adapter;
291}
292
293} // namespace MayaFlux::Buffers
std::shared_ptr< BufferProcessor > m_default_processor
Default audio transformation processor for this buffer.
virtual void set_default_processor(std::shared_ptr< BufferProcessor > processor) override
Sets the default audio transformation processor for this buffer.
virtual void enforce_default_processing(bool should_process) override
Controls whether the audio buffer should use default processing.
Concrete audio implementation of the Buffer interface for double-precision audio data.
std::shared_ptr< Kakshya::StreamContainer > m_container
ContainerBuffer(uint32_t channel_id, uint32_t num_samples, std::shared_ptr< Kakshya::StreamContainer > container, uint32_t source_channel=0)
Construct a ContainerBuffer for a specific channel and container.
std::shared_ptr< BufferProcessor > m_pending_adapter
void initialize()
Initialize the buffer after construction.
std::shared_ptr< BufferProcessor > create_default_processor() override
Create the default processor (ContainerToBufferAdapter) for this buffer.
void setup_zero_copy_if_possible()
Attempt to enable zero-copy operation if container layout allows.
void set_container(std::shared_ptr< Kakshya::StreamContainer > container)
Update the container reference.
void on_container_state_change(std::shared_ptr< Kakshya::SignalSourceContainer > container, Kakshya::ProcessingState state)
Respond to container state changes (e.g., READY, PROCESSED, NEEDS_REMOVAL).
ContainerToBufferAdapter(std::shared_ptr< Kakshya::StreamContainer > container)
void on_attach(std::shared_ptr< Buffer > buffer) override
Attach the adapter to an AudioBuffer.
void processing_function(std::shared_ptr< Buffer > buffer) override
Extracts and processes data from the container into the target AudioBuffer.
void set_container(std::shared_ptr< Kakshya::StreamContainer > container)
Set the container to adapt.
void extract_channel_data(std::span< double > output)
Extract channel data from the container into the output buffer.
void set_source_channel(uint32_t channel_index)
Set which channel dimension to extract from the container.
void on_detach(std::shared_ptr< Buffer > buffer) override
Detach the adapter from its AudioBuffer.
std::shared_ptr< Kakshya::StreamContainer > m_container
ProcessingState
Represents the current processing lifecycle state of a container.
@ READY
Container has data loaded and is ready for processing.
@ NEEDS_REMOVAL
Container is marked for removal from the system.
@ ERROR
Container is in an error state and cannot proceed.
@ PROCESSED
Container has completed processing and results are available.
@ INTERLEAVED
Single DataVariant with interleaved data (LRLRLR for stereo)