MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
PipewireBackend.cpp
Go to the documentation of this file.
1#include "PipewireBackend.hpp"
3
4#ifdef PIPEWIRE_BACKEND
5
6#include <pipewire/pipewire.h>
7#include <spa/param/audio/format-utils.h>
8#include <spa/pod/builder.h>
9#include <spa/utils/result.h>
10
11namespace MayaFlux::Core {
12
13namespace {
14 constexpr auto C = Journal::Component::Core;
15 constexpr auto X = Journal::Context::AudioBackend;
16}
17
18// ---------------------------------------------------------------------------
19// PipewireBackend
20// ---------------------------------------------------------------------------
21
23 : m_loop(pw_main_loop_new(nullptr))
24{
25 pw_init(nullptr, nullptr);
26
27 if (!m_loop)
28 error(C, X, std::source_location::current(), "pw_main_loop_new failed");
29
30 m_context = pw_context_new(pw_main_loop_get_loop(m_loop), nullptr, 0);
31 if (!m_context) {
32 pw_main_loop_destroy(m_loop);
33 error(C, X, std::source_location::current(), "pw_context_new failed");
34 }
35
36 m_core = pw_context_connect(m_context, nullptr, 0);
37 if (!m_core) {
38 pw_context_destroy(m_context);
39 pw_main_loop_destroy(m_loop);
40 error(C, X, std::source_location::current(),
41 "pw_context_connect failed - is PipeWire running?");
42 }
43
44 MF_INFO(C, X, "PipeWire backend initialised (version: {})", pw_get_library_version());
45}
46
47PipewireBackend::~PipewireBackend()
48{
49 cleanup();
50}
51
52std::unique_ptr<AudioDevice> PipewireBackend::create_device_manager()
53{
54 auto dev = std::make_unique<PipewireDevice>(m_loop, m_context, m_core);
55 m_device_manager = dev.get();
56 return dev;
57}
58
59std::unique_ptr<AudioStream> PipewireBackend::create_stream(
60 unsigned int output_node_id,
61 unsigned int input_node_id,
62 GlobalStreamInfo& stream_info,
63 void* user_data)
64{
65 auto out_id = static_cast<uint32_t>(output_node_id);
66 auto in_id = static_cast<uint32_t>(input_node_id);
67
68 if (m_device_manager) {
69 auto* pw_dev = static_cast<PipewireDevice*>(m_device_manager);
70 if (!stream_info.output.device_name.empty())
71 out_id = pw_dev->find_output_node_id(stream_info.output.device_name);
72 if (stream_info.input.enabled && !stream_info.input.device_name.empty())
73 in_id = pw_dev->find_input_node_id(stream_info.input.device_name);
74 }
75
76 return std::make_unique<PipewireStream>(
77 out_id, in_id,
78 stream_info,
79 user_data);
80}
81
82std::string PipewireBackend::get_version_string() const
83{
84 return pw_get_library_version();
85}
86
87void PipewireBackend::cleanup()
88{
89 if (m_core) {
90 pw_core_disconnect(m_core);
91 m_core = nullptr;
92 }
93 if (m_context) {
94 pw_context_destroy(m_context);
95 m_context = nullptr;
96 }
97 if (m_loop) {
98 pw_main_loop_destroy(m_loop);
99 m_loop = nullptr;
100 }
101 pw_deinit();
102}
103
104// ---------------------------------------------------------------------------
105// PipewireDevice
106// ---------------------------------------------------------------------------
107
108namespace {
109
110 struct EnumState {
111 pw_main_loop* loop {};
112 pw_registry* registry {};
113 spa_hook registry_listener {};
114 spa_hook core_listener {};
115 std::vector<PipewireDevice::NodeEntry>* nodes {};
116 int pending = 0;
117 };
118
119 void on_global(
120 void* userdata,
121 uint32_t id,
122 uint32_t,
123 const char* type,
124 uint32_t,
125 const struct spa_dict* props)
126 {
127 auto* st = static_cast<EnumState*>(userdata);
128 if (!props || std::strcmp(type, PW_TYPE_INTERFACE_Node) != 0)
129 return;
130
131 const char* media_class = spa_dict_lookup(props, PW_KEY_MEDIA_CLASS);
132 if (!media_class)
133 return;
134
135 const bool is_sink = std::strcmp(media_class, "Audio/Sink") == 0;
136 const bool is_source = std::strcmp(media_class, "Audio/Source") == 0;
137 if (!is_sink && !is_source)
138 return;
139
140 PipewireDevice::NodeEntry e;
141 e.id = id;
142 e.is_output = is_sink;
143
144 const char* name = spa_dict_lookup(props, PW_KEY_NODE_DESCRIPTION);
145 if (!name)
146 name = spa_dict_lookup(props, PW_KEY_NODE_NAME);
147 if (!name)
148 name = "(unknown)";
149 e.info.name = name;
150
151 const char* prio = spa_dict_lookup(props, "priority.session");
152 e.priority_session = prio ? static_cast<uint32_t>(std::atoi(prio)) : 0;
153
154 if (is_sink) {
155 e.info.output_channels = 2;
156 } else {
157 e.info.input_channels = 2;
158 }
159
160 st->nodes->push_back(e);
161 }
162
163 void on_global_remove(void*, uint32_t) { }
164
165 void on_core_done(void* userdata, uint32_t id, int)
166 {
167 if (id != PW_ID_CORE)
168 return;
169 pw_main_loop_quit(static_cast<EnumState*>(userdata)->loop);
170 }
171
172 const pw_core_events k_core_events = {
173 .version = PW_VERSION_CORE_EVENTS,
174 .done = on_core_done,
175 };
176
177 const pw_registry_events k_registry_events = {
178 .version = PW_VERSION_REGISTRY_EVENTS,
179 .global = on_global,
180 .global_remove = on_global_remove,
181 };
182
183} // namespace
184
185PipewireDevice::PipewireDevice(pw_main_loop* loop, pw_context* ctx, pw_core* core)
186{
187 enumerate(loop, ctx, core);
188}
189
190void PipewireDevice::enumerate(pw_main_loop* loop, pw_context*, pw_core* core)
191{
192 EnumState st;
193 st.loop = loop;
194 st.nodes = &m_nodes;
195
196 st.registry = pw_core_get_registry(core, PW_VERSION_REGISTRY, 0);
197 if (!st.registry)
198 error(C, X, std::source_location::current(), "pw_core_get_registry failed");
199
200 spa_zero(st.registry_listener);
201 pw_registry_add_listener(st.registry, &st.registry_listener, &k_registry_events, &st);
202
203 spa_zero(st.core_listener);
204 pw_core_add_listener(core, &st.core_listener, &k_core_events, &st);
205
206 st.pending = pw_core_sync(core, PW_ID_CORE, 0);
207 pw_main_loop_run(loop);
208
209 spa_hook_remove(&st.core_listener);
210 spa_hook_remove(&st.registry_listener);
211 pw_proxy_destroy(reinterpret_cast<pw_proxy*>(st.registry));
212
213 uint32_t best_out = 0, best_in = 0;
214 for (const auto& e : m_nodes) {
215 if (e.is_output) {
216 if (m_default_output_id == 0 || e.priority_session > best_out) {
217 m_default_output_id = e.id;
218 best_out = e.priority_session;
219 }
220 } else {
221 if (m_default_input_id == 0 || e.priority_session > best_in) {
222 m_default_input_id = e.id;
223 best_in = e.priority_session;
224 }
225 }
226 }
227
228 MF_INFO(C, X, "PipeWire enumeration: {} sink(s), {} source(s)",
229 std::count_if(m_nodes.begin(), m_nodes.end(), [](const auto& e) { return e.is_output; }),
230 std::count_if(m_nodes.begin(), m_nodes.end(), [](const auto& e) { return !e.is_output; }));
231
232 for (const auto& e : m_nodes)
233 MF_INFO(C, X, " id={} '{}' output={} priority={}", e.id, e.info.name, e.is_output, e.priority_session);
234
235 MF_INFO(C, X, "default output={} input={}", m_default_output_id, m_default_input_id);
236}
237
238std::vector<DeviceInfo> PipewireDevice::get_output_devices() const
239{
240 std::vector<DeviceInfo> out;
241 for (const auto& e : m_nodes) {
242 if (e.is_output)
243 out.push_back(e.info);
244 }
245 return out;
246}
247
248std::vector<DeviceInfo> PipewireDevice::get_input_devices() const
249{
250 std::vector<DeviceInfo> out;
251 for (const auto& e : m_nodes) {
252 if (!e.is_output)
253 out.push_back(e.info);
254 }
255 return out;
256}
257
258unsigned int PipewireDevice::get_default_output_device() const
259{
260 return static_cast<unsigned int>(m_default_output_id);
261}
262
263unsigned int PipewireDevice::get_default_input_device() const
264{
265 return static_cast<unsigned int>(m_default_input_id);
266}
267
268uint32_t PipewireDevice::find_output_node_id(std::string_view name) const
269{
270 for (const auto& e : m_nodes) {
271 if (e.is_output && e.info.name == name)
272 return e.id;
273 }
274
275 return m_default_output_id;
276}
277
278uint32_t PipewireDevice::find_input_node_id(std::string_view name) const
279{
280 for (const auto& e : m_nodes) {
281 if (!e.is_output && e.info.name == name)
282 return e.id;
283 }
284
285 return m_default_input_id;
286}
287
288// ---------------------------------------------------------------------------
289// PipewireStream
290// ---------------------------------------------------------------------------
291
292PipewireStream::PipewireStream(
293 uint32_t output_node_id,
294 uint32_t input_node_id,
295 GlobalStreamInfo& stream_info,
296 void* user_data)
297 : m_output_node_id(output_node_id)
298 , m_input_node_id(input_node_id)
299 , m_stream_info(stream_info)
300 , m_user_data(user_data)
301{
302}
303
304PipewireStream::~PipewireStream()
305{
306 if (is_running())
307 stop();
308 if (is_open())
309 close();
310}
311
312pw_time PipewireStream::get_stream_time() const
313{
314 pw_time t {};
315 if (m_output_stream)
316 pw_stream_get_time_n(m_output_stream, &t, sizeof(t));
317 return t;
318}
319
320void PipewireStream::set_process_callback(std::function<int(void*, void*, unsigned int)> cb)
321{
322 m_process_callback = std::move(cb);
323}
324
325void PipewireStream::on_input_process(void* userdata)
326{
327 auto* self = static_cast<PipewireStream*>(userdata);
328
329 pw_buffer* pb = pw_stream_dequeue_buffer(self->m_input_stream);
330 if (!pb)
331 return;
332
333 spa_meta_header* h = static_cast<spa_meta_header*>(
334 spa_buffer_find_meta_data(pb->buffer, SPA_META_Header, sizeof(*h)));
335
336 if (h && (h->flags & SPA_META_HEADER_FLAG_CORRUPTED)) {
337 MF_RT_WARN(C, X, "xrun detected on input buffer");
338 }
339
340 spa_buffer* sb = pb->buffer;
341 const uint32_t ch = self->m_stream_info.input.channels;
342 const uint32_t frames = self->m_negotiated_frames.load(std::memory_order_relaxed);
343 const size_t n_samples = static_cast<size_t>(frames) * ch;
344
345 if (sb->datas[0].data && n_samples > 0) {
346 const auto src = static_cast<const double*>(sb->datas[0].data);
347 std::memcpy(self->m_input_staging.data(), src, n_samples * sizeof(double));
348 } else {
349 std::ranges::fill(self->m_input_staging, 0.0);
350 }
351
352 pw_stream_queue_buffer(self->m_input_stream, pb);
353 self->m_input_ready.store(true, std::memory_order_release);
354}
355
356void PipewireStream::build_output_format_params(
357 uint8_t* buf, uint32_t buf_size,
358 const struct spa_pod** params, uint32_t& n_params)
359{
360 spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, buf_size);
361 struct spa_audio_info_raw raw = SPA_AUDIO_INFO_RAW_INIT(
362 .format = SPA_AUDIO_FORMAT_F64,
363 .rate = m_stream_info.sample_rate,
364 .channels = m_stream_info.output.channels);
365 params[0] = static_cast<const struct spa_pod*>(
366 spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &raw));
367 n_params = 1;
368}
369
370void PipewireStream::build_input_format_params(
371 uint8_t* buf, uint32_t buf_size,
372 const struct spa_pod** params, uint32_t& n_params)
373{
374 spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, buf_size);
375 struct spa_audio_info_raw raw = SPA_AUDIO_INFO_RAW_INIT(
376 .format = SPA_AUDIO_FORMAT_F64,
377 .rate = m_stream_info.sample_rate,
378 .channels = m_stream_info.input.channels);
379 params[0] = static_cast<const struct spa_pod*>(
380 spa_format_audio_raw_build(&b, SPA_PARAM_EnumFormat, &raw));
381 n_params = 1;
382}
383
384void PipewireStream::on_output_process(void* userdata)
385{
386 auto* self = static_cast<PipewireStream*>(userdata);
387 if (!self->m_process_callback)
388 return;
389
390 pw_buffer* pb = pw_stream_dequeue_buffer(self->m_output_stream);
391 if (!pb) {
392 MF_RT_WARN(C, X, "on_process: dequeue returned null");
393 return;
394 }
395
396 spa_buffer* sb = pb->buffer;
397 void* data = sb->datas[0].data;
398 const uint32_t ch = self->m_stream_info.output.channels;
399
400 uint32_t frames = self->m_rate_match
401 ? self->m_rate_match->size
402 : self->m_negotiated_frames.load(std::memory_order_relaxed);
403
404 spa_meta_header* h = static_cast<spa_meta_header*>(
405 spa_buffer_find_meta_data(pb->buffer, SPA_META_Header, sizeof(*h)));
406
407 const bool xrun = h && (h->flags & SPA_META_HEADER_FLAG_CORRUPTED);
408 if (xrun) {
409 MF_RT_WARN(C, X, "output xrun");
410 if (self->m_stream_info.handle_xruns) {
411 std::memset(data, 0, frames * ch * sizeof(double));
412 pw_stream_queue_buffer(self->m_output_stream, pb);
413 return;
414 }
415 }
416
417 if (frames == 0)
418 frames = sb->datas[0].chunk->size / (ch * sizeof(double));
419
420 if (frames == 0 || !data) {
421 MF_RT_WARN(C, X, "on_process: zero frames or null data, skipping");
422 pw_stream_queue_buffer(self->m_output_stream, pb);
423 return;
424 }
425
426 sb->datas[0].chunk->offset = 0;
427 sb->datas[0].chunk->stride = static_cast<int32_t>(ch * sizeof(double));
428 sb->datas[0].chunk->size = static_cast<size_t>(frames * ch) * sizeof(double);
429
430 void* input_ptr = nullptr;
431 if (self->m_stream_info.input.enabled && self->m_input_ready.load(std::memory_order_acquire)) {
432 input_ptr = self->m_input_staging.data();
433 self->m_input_ready.store(false, std::memory_order_release);
434 }
435
436 self->m_process_callback(data, input_ptr, frames);
437
438 pw_stream_queue_buffer(self->m_output_stream, pb);
439}
440
441void PipewireStream::on_param_changed(void* userdata, uint32_t id, const struct spa_pod* param)
442{
443 auto* self = static_cast<PipewireStream*>(userdata);
444 if (!param || id != SPA_PARAM_Format)
445 return;
446
447 struct spa_audio_info info {};
448 if (spa_format_audio_raw_parse(param, &info.info.raw) < 0) {
449 MF_WARN(C, X, "on_param_changed: failed to parse audio format");
450 return;
451 }
452
453 self->m_negotiated_frames.store(self->m_stream_info.buffer_size, std::memory_order_relaxed);
454
455 MF_INFO(C, X, "format negotiated: F64 {} ch @ {} Hz",
456 info.info.raw.channels, info.info.raw.rate);
457
458 uint8_t buf[1024];
459 spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
460 const uint32_t ch = self->m_stream_info.output.channels;
461
462 const auto* bufparam = static_cast<const struct spa_pod*>(
463 spa_pod_builder_add_object(&b,
464 SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers,
465 SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
466 SPA_PARAM_BUFFERS_size, SPA_POD_Int(self->m_stream_info.buffer_size * ch * sizeof(double)),
467 SPA_PARAM_BUFFERS_stride, SPA_POD_Int(ch * sizeof(double))));
468
469 pw_stream_update_params(self->m_output_stream, &bufparam, 1);
470}
471
472void PipewireStream::on_state_changed(
473 void* userdata,
474 enum pw_stream_state,
475 enum pw_stream_state new_state,
476 const char* error_str)
477{
478 auto* self = static_cast<PipewireStream*>(userdata);
479
480 if (error_str)
481 MF_WARN(C, X, "stream error: {}", error_str);
482
483 switch (new_state) {
484 case PW_STREAM_STATE_STREAMING:
485 self->m_is_running.store(true, std::memory_order_release);
486 MF_INFO(C, X, "stream streaming");
487 break;
488 case PW_STREAM_STATE_PAUSED:
489 self->m_is_running.store(false, std::memory_order_release);
490 MF_INFO(C, X, "stream paused");
491 break;
492 case PW_STREAM_STATE_ERROR:
493 self->m_is_running.store(false, std::memory_order_release);
494 MF_WARN(C, X, "stream error state");
495 break;
496 default:
497 break;
498 }
499}
500
501void PipewireStream::on_io_changed(void* userdata, uint32_t id, void* area, uint32_t)
502{
503 auto* self = static_cast<PipewireStream*>(userdata);
504 if (id == SPA_IO_RateMatch && area) {
505 self->m_rate_match = static_cast<spa_io_rate_match*>(area);
506 }
507}
508
509void PipewireStream::open()
510{
511 if (m_is_open.load())
512 return;
513
514 m_thread_loop = pw_thread_loop_new("mayaflux-audio", nullptr);
515 if (!m_thread_loop)
516 error(C, X, std::source_location::current(), "pw_thread_loop_new failed");
517
518 pw_loop* loop = pw_thread_loop_get_loop(m_thread_loop);
519
520 auto read_opt = [&](const char* key) -> std::optional<std::string> {
521 auto it = m_stream_info.backend_options.find(key);
522 if (it == m_stream_info.backend_options.end())
523 return std::nullopt;
524 try {
525 return std::any_cast<std::string>(it->second);
526 } catch (const std::bad_any_cast&) {
527 return std::nullopt;
528 }
529 };
530
531 auto read_bool = [&](const char* key) -> std::optional<bool> {
532 auto it = m_stream_info.backend_options.find(key);
533 if (it == m_stream_info.backend_options.end())
534 return std::nullopt;
535 try {
536 return std::any_cast<bool>(it->second);
537 } catch (const std::bad_any_cast&) {
538 return std::nullopt;
539 }
540 };
541
542 const bool force_quantum = (m_stream_info.priority == GlobalStreamInfo::StreamPriority::REALTIME) && !read_bool("pipewire.disable_force_quantum").value_or(false);
543
544 const std::string latency_str = std::to_string(m_stream_info.buffer_size) + "/" + std::to_string(m_stream_info.sample_rate);
545
546 const std::string node_name = read_opt("pipewire.node.name").value_or("mayaflux-output");
547 const std::string node_desc = read_opt("pipewire.node.description").value_or("MayaFlux Audio");
548 const std::string media_role = read_opt("pipewire.role").value_or("Production");
549 const std::string pid_str = std::to_string(getpid());
550
551 pw_properties* props = pw_properties_new(
552 PW_KEY_MEDIA_TYPE, "Audio",
553 PW_KEY_MEDIA_CATEGORY, "Playback",
554 PW_KEY_MEDIA_ROLE, media_role.c_str(),
555 PW_KEY_MEDIA_CLASS, "Stream/Output/Audio",
556 PW_KEY_APP_NAME, "MayaFlux",
557 PW_KEY_APP_PROCESS_ID, pid_str.c_str(),
558 PW_KEY_APP_PROCESS_BINARY, "mayaflux",
559 PW_KEY_NODE_NAME, node_name.c_str(),
560 PW_KEY_NODE_DESCRIPTION, node_desc.c_str(),
561 PW_KEY_NODE_LATENCY, latency_str.c_str(),
562 nullptr);
563
564 if (force_quantum) {
565 pw_properties_set(props, "node.force-quantum",
566 std::to_string(m_stream_info.buffer_size).c_str());
567 }
568
569 m_output_stream_events = pw_stream_events {
570 .version = PW_VERSION_STREAM_EVENTS,
571 .state_changed = on_state_changed,
572 .io_changed = on_io_changed,
573 .param_changed = on_param_changed,
574 .process = on_output_process,
575 };
576
577 pw_thread_loop_lock(m_thread_loop);
578
579 m_output_stream = pw_stream_new_simple(
580 loop, "mayaflux-output", props, &m_output_stream_events, this);
581
582 if (!m_output_stream) {
583 pw_thread_loop_unlock(m_thread_loop);
584 pw_thread_loop_destroy(m_thread_loop);
585 m_thread_loop = nullptr;
586 error(C, X, std::source_location::current(), "pw_stream_new_simple failed (output)");
587 }
588
589 uint8_t buf[1024];
590 uint32_t n_params = 0;
591 const struct spa_pod* params[1];
592 build_output_format_params(buf, sizeof(buf), params, n_params);
593
594 const uint32_t flags = PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS;
595
596 if (auto v = read_opt("pipewire.node.name"))
597 pw_properties_set(props, PW_KEY_NODE_NAME, v->c_str());
598
599 if (auto v = read_opt("pipewire.role"))
600 pw_properties_set(props, PW_KEY_MEDIA_ROLE, v->c_str());
601
602 if (auto v = read_opt("pipewire.target_object"))
603 pw_properties_set(props, PW_KEY_TARGET_OBJECT, v->c_str());
604
605 int ret = pw_stream_connect(
606 m_output_stream,
607 PW_DIRECTION_OUTPUT,
608 PW_ID_ANY,
609 static_cast<pw_stream_flags>(flags),
610 params, n_params);
611
612 if (ret < 0) {
613 pw_thread_loop_unlock(m_thread_loop);
614 pw_stream_destroy(m_output_stream);
615 m_output_stream = nullptr;
616 pw_thread_loop_destroy(m_thread_loop);
617 m_thread_loop = nullptr;
618 error(C, X, std::source_location::current(),
619 "pw_stream_connect failed (output): {}", spa_strerror(ret));
620 }
621
622 if (m_stream_info.input.enabled && m_stream_info.input.channels > 0) {
623 const size_t staging_samples = static_cast<size_t>(m_stream_info.buffer_size) * m_stream_info.input.channels;
624 m_input_staging.assign(staging_samples, 0.0);
625
626 pw_properties* in_props = pw_properties_new(
627 PW_KEY_MEDIA_TYPE, "Audio",
628 PW_KEY_MEDIA_CATEGORY, "Capture",
629 PW_KEY_MEDIA_ROLE, "Production",
630 PW_KEY_MEDIA_CLASS, "Stream/Input/Audio",
631 PW_KEY_APP_NAME, "MayaFlux",
632 PW_KEY_NODE_NAME, "mayaflux-input",
633 PW_KEY_NODE_LATENCY, latency_str.c_str(),
634 nullptr);
635
636 m_input_stream_events = pw_stream_events {
637 .version = PW_VERSION_STREAM_EVENTS,
638 .process = on_input_process,
639 };
640
641 m_input_stream = pw_stream_new_simple(
642 loop, "mayaflux-input", in_props, &m_input_stream_events, this);
643
644 if (!m_input_stream) {
645 pw_thread_loop_unlock(m_thread_loop);
646 pw_stream_destroy(m_output_stream);
647 m_output_stream = nullptr;
648 pw_thread_loop_destroy(m_thread_loop);
649 m_thread_loop = nullptr;
650 error(C, X, std::source_location::current(), "pw_stream_new_simple failed (input)");
651 }
652
653 uint8_t in_buf[1024];
654 uint32_t in_n_params = 0;
655 const struct spa_pod* in_params[1];
656 build_input_format_params(in_buf, sizeof(in_buf), in_params, in_n_params);
657
658 ret = pw_stream_connect(
659 m_input_stream,
660 PW_DIRECTION_INPUT,
661 m_input_node_id != 0 ? m_input_node_id : PW_ID_ANY,
662 static_cast<pw_stream_flags>(flags),
663 in_params, in_n_params);
664
665 if (ret < 0) {
666 pw_thread_loop_unlock(m_thread_loop);
667 pw_stream_destroy(m_input_stream);
668 m_input_stream = nullptr;
669 pw_stream_destroy(m_output_stream);
670 m_output_stream = nullptr;
671 pw_thread_loop_destroy(m_thread_loop);
672 m_thread_loop = nullptr;
673 error(C, X, std::source_location::current(),
674 "pw_stream_connect failed (input): {}", spa_strerror(ret));
675 }
676 }
677
678 pw_thread_loop_unlock(m_thread_loop);
679
680 m_is_open.store(true, std::memory_order_release);
681
682 MF_INFO(C, X, "stream opened (quantum: {}/{}, force: {})",
683 m_stream_info.buffer_size, m_stream_info.sample_rate, force_quantum);
684}
685
686void PipewireStream::start()
687{
688 if (!m_is_open.load())
689 error(C, X, std::source_location::current(), "start() called before open()");
690 if (m_is_running.load())
691 return;
692
693 int ret = pw_thread_loop_start(m_thread_loop);
694 if (ret < 0) {
695 error(C, X, std::source_location::current(),
696 "pw_thread_loop_start failed: {}", spa_strerror(ret));
697 }
698
699 pw_thread_loop_lock(m_thread_loop);
700 pw_stream_set_active(m_output_stream, true);
701
702 if (m_input_stream) {
703 pw_stream_set_active(m_input_stream, true);
704 }
705 pw_thread_loop_unlock(m_thread_loop);
706}
707
708void PipewireStream::stop()
709{
710 if (!m_is_running.load())
711 return;
712
713 pw_thread_loop_stop(m_thread_loop);
714 m_is_running.store(false, std::memory_order_release);
715}
716
717void PipewireStream::pause()
718{
719 if (!m_is_running.load() || m_is_paused.load())
720 return;
721
722 pw_thread_loop_stop(m_thread_loop);
723 m_is_paused.store(true, std::memory_order_release);
724 m_is_running.store(false, std::memory_order_release);
725}
726
727void PipewireStream::resume()
728{
729 if (!m_is_paused.load())
730 return;
731
732 int ret = pw_thread_loop_start(m_thread_loop);
733 if (ret < 0) {
734 error(C, X, std::source_location::current(),
735 "pw_thread_loop_start failed on resume: {}", spa_strerror(ret));
736 }
737
738 pw_thread_loop_lock(m_thread_loop);
739 pw_stream_set_active(m_output_stream, true);
740 if (m_input_stream)
741 pw_stream_set_active(m_input_stream, true);
742 pw_thread_loop_unlock(m_thread_loop);
743
744 m_is_paused.store(false, std::memory_order_release);
745 m_is_running.store(true, std::memory_order_release);
746}
747
748void PipewireStream::close()
749{
750 if (!m_is_open.load())
751 return;
752
753 if (m_is_running.load() || m_is_paused.load())
754 pw_thread_loop_stop(m_thread_loop);
755
756 m_is_running.store(false, std::memory_order_release);
757 m_is_paused.store(false, std::memory_order_release);
758
759 if (m_input_stream) {
760 pw_stream_destroy(m_input_stream);
761 m_input_stream = nullptr;
762 }
763 if (m_output_stream) {
764 pw_stream_destroy(m_output_stream);
765 m_output_stream = nullptr;
766 }
767 if (m_thread_loop) {
768 pw_thread_loop_destroy(m_thread_loop);
769 m_thread_loop = nullptr;
770 }
771
772 m_is_open.store(false, std::memory_order_release);
773}
774
775bool PipewireStream::is_running() const { return m_is_running.load(std::memory_order_acquire); }
776bool PipewireStream::is_open() const { return m_is_open.load(std::memory_order_acquire); }
777
778} // namespace MayaFlux::Core
779
780#endif // PIPEWIRE_BACKEND
#define MF_INFO(comp, ctx,...)
#define MF_RT_WARN(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
uint32_t h
Definition InkPress.cpp:28
size_t b
@ AudioBackend
Audio processing backend (Pipewire, wasapi, coreaudio)
void error(Component component, Context context, std::source_location location, std::string_view message)
Log an error message and optionally throw an exception.
@ Core
Core engine, backend, subsystems.
void stop()
Stop all Portal::Graphics operations.
Definition Graphics.cpp:69