MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
AudioSubsystem.cpp
Go to the documentation of this file.
1#include "AudioSubsystem.hpp"
2
5
7
8namespace MayaFlux::Core {
9
11 : m_stream_info(stream_info)
12 , m_audiobackend(AudioBackendFactory::create_backend(stream_info.backend))
13 , m_audio_device(m_audiobackend->create_device_manager())
14 , m_subsystem_tokens {
15 .Buffer = MayaFlux::Buffers::ProcessingToken::AUDIO_BACKEND,
16 .Node = MayaFlux::Nodes::ProcessingToken::AUDIO_RATE,
17 .Task = MayaFlux::Vruta::ProcessingToken::SAMPLE_ACCURATE
18 }
19{
20}
21
23{
24 m_handle = &handle;
25
26 m_audio_stream = m_audiobackend->create_stream(
27 m_audio_device->get_default_output_device(),
28 m_audio_device->get_default_input_device(),
30 this);
31
33 m_notify_running.store(true, std::memory_order_release);
34
35#ifdef MAYAFLUX_PLATFORM_MACOS
36 m_observers_ptr.store(new ObserverMap(), std::memory_order_release);
37#endif
38
39 m_notify_thread = std::thread(&AudioSubsystem::notify_loop, this);
40 m_is_ready = true;
41}
42
44{
45 if (!m_is_ready || !m_audio_stream) {
46 error<std::runtime_error>(
49 std::source_location::current(),
50 "AudioSubsystem not initialized");
51 }
52
53 m_audio_stream->set_process_callback(
54 [this](void* output_buffer, void* input_buffer, unsigned int num_frames) -> int {
55 auto input_ptr = static_cast<double*>(input_buffer);
56 auto output_ptr = static_cast<double*>(output_buffer);
57
58 if (input_ptr && output_ptr) {
59 return this->process_audio(input_ptr, output_ptr, num_frames);
60 }
61
62 if (output_ptr) {
63 return this->process_output(output_ptr, num_frames);
64 }
65
66 if (input_ptr) {
67 return this->process_input(input_ptr, num_frames);
68 }
69 return 0;
70 });
71}
72
73int AudioSubsystem::process_output(double* output_buffer, unsigned int num_frames)
74{
75 m_callback_active.fetch_add(1, std::memory_order_acquire);
76
77 if (output_buffer == nullptr) {
79 "No output available");
80 m_callback_active.fetch_sub(1, std::memory_order_release);
81 return 1;
82 }
83
84 if (!m_is_running.load(std::memory_order_acquire)) {
85 if (output_buffer) {
86 auto total_samples = static_cast<uint32_t>(num_frames * m_stream_info.output.channels);
87 std::memset(output_buffer, 0, total_samples * sizeof(double));
88 }
89 m_callback_active.fetch_sub(1, std::memory_order_release);
90 return 0;
91 }
92
93 if (m_handle == nullptr) {
95 "Invalid processing handle");
96 m_callback_active.fetch_sub(1, std::memory_order_release);
97 return 1;
98 }
99
100 try {
101 uint32_t num_channels = m_stream_info.output.channels;
102 size_t total_samples = static_cast<size_t>(num_frames) * num_channels;
103 std::span<double> output_span(output_buffer, total_samples);
104
107
108 std::vector<std::span<const double>> buffer_data(num_channels);
109 std::vector<std::vector<std::vector<double>>> all_network_outputs(num_channels);
110 bool has_underrun = false;
111
113
114 for (uint32_t channel = 0; channel < num_channels; channel++) {
115 m_handle->buffers.process_channel(channel, num_frames);
116 all_network_outputs[channel] = m_handle->nodes.process_audio_networks(num_frames, channel);
117
118 auto channel_data = m_handle->buffers.read_channel_data(channel);
119
120 if (channel_data.size() < num_frames) {
122 "Channel buffer underrun");
123 has_underrun = true;
124
125 buffer_data[channel] = std::span<const double>();
126 } else {
127 buffer_data[channel] = channel_data;
128 }
129 }
130
131 for (size_t i = 0; i < num_frames; ++i) {
132
134
135 for (size_t j = 0; j < num_channels; ++j) {
136 double buffer_sample = 0.0;
137 if (!buffer_data[j].empty() && i < buffer_data[j].size()) {
138 buffer_sample = buffer_data[j][i];
139 }
140
141 double sample = m_handle->nodes.process_sample(j) + buffer_sample;
142
143 for (const auto& network_buffer : all_network_outputs[j]) {
144 if (i < network_buffer.size()) {
145 sample += network_buffer[i];
146 }
147 }
148
149 size_t index = i * num_channels + j;
150 output_span[index] = std::clamp(sample, -1., 1.);
151 }
152 }
153
156
157 if (m_snapshot_copy.size() != total_samples)
158 m_snapshot_copy.resize(total_samples);
159
160 std::memcpy(m_snapshot_copy.data(), output_buffer, total_samples * sizeof(double));
161 m_snapshot_size.store(static_cast<uint32_t>(total_samples), std::memory_order_release);
162 m_snapshot_ptr.store(m_snapshot_copy.data(), std::memory_order_release);
163 m_snapshot_generation.fetch_add(1, std::memory_order_release);
164 m_snapshot_generation.notify_all();
165
166 m_callback_active.fetch_sub(1, std::memory_order_release);
167 return has_underrun ? 1 : 0;
168
169 } catch (const std::exception& e) {
171 "Exception during audio output processing: {}", e.what());
172
173 if (output_buffer) {
174 auto total_samples = static_cast<uint32_t>(num_frames * m_stream_info.output.channels);
175 std::memset(output_buffer, 0, total_samples * sizeof(double));
176 }
177
178 m_callback_active.fetch_sub(1, std::memory_order_release);
179 return 1;
180 }
181}
182
183int AudioSubsystem::process_input(double* input_buffer, unsigned int num_frames)
184{
185 m_callback_active.fetch_add(1, std::memory_order_acquire);
186 if (m_handle == nullptr) {
188 "Invalid processing handle");
189 m_callback_active.fetch_sub(1, std::memory_order_release);
190 return 1;
191 }
192
193 if (!m_is_running.load(std::memory_order_acquire)) {
194 if (input_buffer) {
195 auto total_samples = static_cast<uint32_t>(num_frames * m_stream_info.input.channels);
196 std::memset(input_buffer, 0, total_samples * sizeof(double));
197 }
198 m_callback_active.fetch_sub(1, std::memory_order_release);
199 return 0;
200 }
201
202 m_handle->buffers.process_input(input_buffer, m_stream_info.input.channels, num_frames);
203
204 m_callback_active.fetch_sub(1, std::memory_order_release);
205 return 0;
206}
207
208int AudioSubsystem::process_audio(double* input_buffer, double* output_buffer, unsigned int num_frames)
209{
210 for (const auto& [name, hook] : m_handle->pre_process_hooks) {
211 hook(num_frames);
212 }
213
214 process_input(input_buffer, num_frames);
215 process_output(output_buffer, num_frames);
216
217 for (const auto& [name, hook] : m_handle->post_process_hooks) {
218 hook(num_frames);
219 }
220
221 return 0;
222}
223
225{
226 auto svc = std::make_shared<Registry::Service::AudioBackendService>();
227
228 svc->get_output_snapshot = [this]() -> std::span<const double> {
229 const double* ptr = m_snapshot_ptr.load(std::memory_order_acquire);
230 uint32_t sz = m_snapshot_size.load(std::memory_order_acquire);
231 if (!ptr || sz == 0)
232 return {};
233 return { ptr, sz };
234 };
235
236 svc->register_output_observer = [this](
237 std::function<void(const double*, uint32_t)> cb) -> uint32_t {
238 uint32_t id = m_next_observer_id.fetch_add(1, std::memory_order_relaxed);
239#ifdef MAYAFLUX_PLATFORM_MACOS
240 const ObserverMap* current = m_observers_ptr.load(std::memory_order_acquire);
241 const ObserverMap* next;
242 do {
243 auto* copy = new ObserverMap(*current);
244 (*copy)[id] = cb;
245 next = copy;
246 } while (!m_observers_ptr.compare_exchange_weak(
247 current, next,
248 std::memory_order_release, std::memory_order_acquire));
249 retire_observers(current);
250#else
251 auto current = m_observers.load(std::memory_order_acquire);
252 std::shared_ptr<ObserverMap> next;
253 do {
254 next = std::make_shared<ObserverMap>(*current);
255 (*next)[id] = cb;
256 } while (!m_observers.compare_exchange_weak(
257 current, next,
258 std::memory_order_release, std::memory_order_acquire));
259#endif
260 return id;
261 };
262
263 svc->unregister_output_observer = [this](uint32_t id) {
264#ifdef MAYAFLUX_PLATFORM_MACOS
265 const ObserverMap* current = m_observers_ptr.load(std::memory_order_acquire);
266 const ObserverMap* next;
267 do {
268 auto* copy = new ObserverMap(*current);
269 copy->erase(id);
270 next = copy;
271 } while (!m_observers_ptr.compare_exchange_weak(
272 current, next,
273 std::memory_order_release, std::memory_order_acquire));
274 retire_observers(current);
275#else
276 auto current = m_observers.load(std::memory_order_acquire);
277 std::shared_ptr<ObserverMap> next;
278 do {
279 next = std::make_shared<ObserverMap>(*current);
280 next->erase(id);
281 } while (!m_observers.compare_exchange_weak(
282 current, next,
283 std::memory_order_release, std::memory_order_acquire));
284#endif
285 };
286
290 [svc]() -> void* { return svc.get(); });
291}
292
294{
295 uint64_t last_gen = 0;
296
297 while (m_notify_running.load(std::memory_order_acquire)) {
298 m_snapshot_generation.wait(last_gen, std::memory_order_relaxed);
299
300 if (!m_notify_running.load(std::memory_order_acquire))
301 break;
302
303 last_gen = m_snapshot_generation.load(std::memory_order_acquire);
304
305 const double* ptr = m_snapshot_ptr.load(std::memory_order_acquire);
306 uint32_t sz = m_snapshot_size.load(std::memory_order_acquire);
307
308 if (!ptr || sz == 0)
309 continue;
310
311#ifdef MAYAFLUX_PLATFORM_MACOS
312 auto [obs, slot] = acquire_observers();
313 if (obs) {
314 for (auto& [id, cb] : *obs)
315 cb(ptr, sz);
316 }
317 release_observers(slot);
318#else
319 auto obs = m_observers.load(std::memory_order_acquire);
320 for (auto& [id, cb] : *obs)
321 cb(ptr, sz);
322#endif
323 }
324}
325
326#ifdef MAYAFLUX_PLATFORM_MACOS
327std::pair<const AudioSubsystem::ObserverMap*, size_t>
328AudioSubsystem::acquire_observers() const
329{
330 size_t slot = m_obs_hazard_counter.fetch_add(1, std::memory_order_relaxed)
331 % MAX_OBSERVER_READERS;
332 const ObserverMap* current;
333 do {
334 current = m_observers_ptr.load(std::memory_order_acquire);
335 m_obs_hazard_ptrs[slot].store(current, std::memory_order_release);
336 } while (current != m_observers_ptr.load(std::memory_order_acquire));
337 return { current, slot };
338}
339
340void AudioSubsystem::release_observers(size_t slot) const
341{
342 m_obs_hazard_ptrs[slot].store(nullptr, std::memory_order_release);
343}
344
345void AudioSubsystem::retire_observers(const ObserverMap* old)
346{
347 m_obs_retired.push_back(old);
348 auto it = m_obs_retired.begin();
349 while (it != m_obs_retired.end()) {
350 bool referenced = false;
351 for (size_t i = 0; i < MAX_OBSERVER_READERS; ++i) {
352 if (m_obs_hazard_ptrs[i].load(std::memory_order_acquire) == *it) {
353 referenced = true;
354 break;
355 }
356 }
357 if (!referenced) {
358 delete *it;
359 it = m_obs_retired.erase(it);
360 } else {
361 ++it;
362 }
363 }
364}
365#endif
366
368{
369 if (!m_is_ready || !m_audio_stream) {
370 error<std::runtime_error>(
373 std::source_location::current(),
374 "Cannot start AudioSubsystem: not initialized");
375 }
376
377 m_audio_stream->open();
378 m_audio_stream->start();
379 m_is_running.store(true);
380}
381
383{
384 if (!m_is_running.load()) {
385 return;
386 }
387
389 "Stopping AudioSubsystem...");
390
391 m_is_running.store(false, std::memory_order_release);
392
393 if (m_audio_stream && m_audio_stream->is_running()) {
394 m_audio_stream->stop();
395 }
396
397 if (m_callback_active.load() > 0) {
399 "Stopped while {} callback(s) active", m_callback_active.load());
400 }
401
402 m_notify_running.store(false, std::memory_order_release);
403 m_snapshot_generation.fetch_add(1, std::memory_order_release);
404 m_snapshot_generation.notify_all();
405
406 if (m_notify_thread.joinable())
407 m_notify_thread.join();
408
410 "AudioSubsystem stopped");
411}
412
414{
415 if (m_audio_stream && m_is_running.load()) {
416 m_audio_stream->pause();
417 m_is_paused = true;
418 }
419}
420
422{
424 m_audio_stream->resume();
425 m_is_paused = false;
426 }
427}
428
430{
431 while (!m_is_running.load(std::memory_order_acquire))
432 std::this_thread::yield();
433}
434
436{
437 stop();
438
439 if (m_audio_stream) {
440 m_audio_stream->close();
441 }
442 m_audio_stream.reset();
443 m_audio_device.reset();
444 m_audiobackend->cleanup();
445 m_audiobackend.reset();
446
449
451
452#ifdef MAYAFLUX_PLATFORM_MACOS
453 delete m_observers_ptr.exchange(nullptr, std::memory_order_acq_rel);
454#endif
455
456 m_is_ready = false;
457}
458
459}
#define MF_INFO(comp, ctx,...)
#define MF_RT_WARN(comp, ctx,...)
#define MF_RT_ERROR(comp, ctx,...)
glm::vec2 current
const uint8_t * ptr
Factory pattern implementation for audio backend instantiation.
void initialize(SubsystemProcessingHandle &handle) override
Initialize audio processing with provided handle.
bool m_is_ready
Subsystem ready state.
void stop() override
Stop audio processing and streaming.
void shutdown() override
Shutdown and cleanup audio resources.
void wait_until_running() override
Block until the subsystem's processing loop is confirmed live.
std::atomic< uint64_t > m_snapshot_generation
std::unique_ptr< IAudioBackend > m_audiobackend
Audio backend implementation.
int process_output(double *output_buffer, unsigned int num_frames)
Processes output data for audio interface.
std::atomic< int > m_callback_active
Active callback counter.
int process_audio(double *input_buffer, double *output_buffer, unsigned int num_frames)
Processes both input and output data in full-duplex mode.
void pause() override
Pause audio processing without stopping the stream.
std::atomic< bool > m_is_running
Subsystem running state.
void start() override
Start audio processing and streaming.
GlobalStreamInfo m_stream_info
Audio stream configuration.
std::atomic< std::shared_ptr< ObserverMap > > m_observers
SubsystemProcessingHandle * m_handle
Reference to processing handle.
bool m_is_paused
Subsystem paused state.
std::atomic< uint32_t > m_snapshot_size
int process_input(double *input_buffer, unsigned int num_frames)
Processes input data from audio interface.
std::atomic< const double * > m_snapshot_ptr
std::atomic< bool > m_notify_running
std::unordered_map< uint32_t, std::function< void(const double *, uint32_t)> > ObserverMap
std::vector< double > m_snapshot_copy
Owned copy of the last output cycle; decouples notification-thread reads from the callback write buff...
std::shared_ptr< Registry::Service::AudioBackendService > m_audio_backend_service
AudioSubsystem(GlobalStreamInfo &stream_info)
Constructs AudioSubsystem with stream configuration.
void resume() override
Resume audio processing after pause.
std::unique_ptr< AudioDevice > m_audio_device
Audio device manager.
std::unique_ptr< AudioStream > m_audio_stream
Audio stream manager.
void register_callbacks() override
Register audio backend callbacks for real-time processing.
std::atomic< uint32_t > m_next_observer_id
std::span< const double > read_channel_data(uint32_t channel) const
Get read-only access to channel data.
void process_channel(uint32_t channel, uint32_t processing_units)
Process specific channel.
void process_input(double *input_data, uint32_t num_channels, uint32_t num_frames)
@brienf Process Input from backend into buffer manager
std::vector< std::vector< double > > process_audio_networks(uint32_t num_samples, uint32_t channel=0)
std::map< std::string, ProcessHook > post_process_hooks
NodeProcessingHandle nodes
Node processing interface.
std::map< std::string, ProcessHook > pre_process_hooks
BufferProcessingHandle buffers
Buffer processing interface.
Unified interface combining buffer and node processing for subsystems.
void process_buffer_cycle()
Process all tasks scheduled for current buffer cycle.
void process(uint64_t processing_units)
Process all tasks in token domain.
void register_service(ServiceFactory factory)
Register a backend service capability.
static BackendRegistry & instance()
Get the global registry instance.
void unregister_service()
Unregister a service.
@ AudioSubsystem
Audio subsystem operations (backend, device, stream management)
@ AudioCallback
Audio callback thread - strictest real-time requirements.
@ Core
Core engine, backend, subsystems.
Main namespace for the Maya Flux audio engine.
Definition Runtime.cpp:12
uint32_t channels
Number of discrete channels in this set.
ChannelConfig input
Configuration for input signal channels (disabled by default)
ChannelConfig output
Configuration for output signal channels.
Comprehensive configuration for digital audio stream processing.
Backend audio subsystem service interface.