6#include <pipewire/pipewire.h>
7#include <spa/param/audio/format-utils.h>
8#include <spa/pod/builder.h>
9#include <spa/utils/result.h>
23 : m_loop(pw_main_loop_new(nullptr))
25 pw_init(
nullptr,
nullptr);
28 error(C, X, std::source_location::current(),
"pw_main_loop_new failed");
30 m_context = pw_context_new(pw_main_loop_get_loop(m_loop),
nullptr, 0);
32 pw_main_loop_destroy(m_loop);
33 error(C, X, std::source_location::current(),
"pw_context_new failed");
36 m_core = pw_context_connect(m_context,
nullptr, 0);
38 pw_context_destroy(m_context);
39 pw_main_loop_destroy(m_loop);
40 error(C, X, std::source_location::current(),
41 "pw_context_connect failed - is PipeWire running?");
44 MF_INFO(C, X,
"PipeWire backend initialised (version: {})", pw_get_library_version());
47PipewireBackend::~PipewireBackend()
52std::unique_ptr<AudioDevice> PipewireBackend::create_device_manager()
54 auto dev = std::make_unique<PipewireDevice>(m_loop, m_context, m_core);
55 m_device_manager = dev.get();
59std::unique_ptr<AudioStream> PipewireBackend::create_stream(
60 unsigned int output_node_id,
61 unsigned int input_node_id,
62 GlobalStreamInfo& stream_info,
65 auto out_id =
static_cast<uint32_t
>(output_node_id);
66 auto in_id =
static_cast<uint32_t
>(input_node_id);
68 if (m_device_manager) {
69 auto* pw_dev =
static_cast<PipewireDevice*
>(m_device_manager);
70 if (!stream_info.output.device_name.empty())
71 out_id = pw_dev->find_output_node_id(stream_info.output.device_name);
72 if (stream_info.input.enabled && !stream_info.input.device_name.empty())
73 in_id = pw_dev->find_input_node_id(stream_info.input.device_name);
76 return std::make_unique<PipewireStream>(
82std::string PipewireBackend::get_version_string()
const
84 return pw_get_library_version();
87void PipewireBackend::cleanup()
90 pw_core_disconnect(m_core);
94 pw_context_destroy(m_context);
98 pw_main_loop_destroy(m_loop);
111 pw_main_loop* loop {};
112 pw_registry* registry {};
113 spa_hook registry_listener {};
114 spa_hook core_listener {};
115 std::vector<PipewireDevice::NodeEntry>* nodes {};
125 const struct spa_dict* props)
127 auto* st =
static_cast<EnumState*
>(userdata);
128 if (!props || std::strcmp(type, PW_TYPE_INTERFACE_Node) != 0)
131 const char* media_class = spa_dict_lookup(props, PW_KEY_MEDIA_CLASS);
135 const bool is_sink = std::strcmp(media_class,
"Audio/Sink") == 0;
136 const bool is_source = std::strcmp(media_class,
"Audio/Source") == 0;
137 if (!is_sink && !is_source)
140 PipewireDevice::NodeEntry e;
142 e.is_output = is_sink;
144 const char* name = spa_dict_lookup(props, PW_KEY_NODE_DESCRIPTION);
146 name = spa_dict_lookup(props, PW_KEY_NODE_NAME);
151 const char* prio = spa_dict_lookup(props,
"priority.session");
152 e.priority_session = prio ?
static_cast<uint32_t
>(std::atoi(prio)) : 0;
155 e.info.output_channels = 2;
157 e.info.input_channels = 2;
160 st->nodes->push_back(e);
163 void on_global_remove(
void*, uint32_t) { }
165 void on_core_done(
void* userdata, uint32_t
id,
int)
167 if (
id != PW_ID_CORE)
169 pw_main_loop_quit(
static_cast<EnumState*
>(userdata)->loop);
172 const pw_core_events k_core_events = {
173 .version = PW_VERSION_CORE_EVENTS,
174 .done = on_core_done,
177 const pw_registry_events k_registry_events = {
178 .version = PW_VERSION_REGISTRY_EVENTS,
180 .global_remove = on_global_remove,
185PipewireDevice::PipewireDevice(pw_main_loop* loop, pw_context* ctx, pw_core* core)
187 enumerate(loop, ctx, core);
190void PipewireDevice::enumerate(pw_main_loop* loop, pw_context*, pw_core* core)
196 st.registry = pw_core_get_registry(core, PW_VERSION_REGISTRY, 0);
198 error(C, X, std::source_location::current(),
"pw_core_get_registry failed");
200 spa_zero(st.registry_listener);
201 pw_registry_add_listener(st.registry, &st.registry_listener, &k_registry_events, &st);
203 spa_zero(st.core_listener);
204 pw_core_add_listener(core, &st.core_listener, &k_core_events, &st);
206 st.pending = pw_core_sync(core, PW_ID_CORE, 0);
207 pw_main_loop_run(loop);
209 spa_hook_remove(&st.core_listener);
210 spa_hook_remove(&st.registry_listener);
211 pw_proxy_destroy(
reinterpret_cast<pw_proxy*
>(st.registry));
213 uint32_t best_out = 0, best_in = 0;
214 for (
const auto& e : m_nodes) {
216 if (m_default_output_id == 0 || e.priority_session > best_out) {
217 m_default_output_id = e.id;
218 best_out = e.priority_session;
221 if (m_default_input_id == 0 || e.priority_session > best_in) {
222 m_default_input_id = e.id;
223 best_in = e.priority_session;
228 MF_INFO(C, X,
"PipeWire enumeration: {} sink(s), {} source(s)",
229 std::count_if(m_nodes.begin(), m_nodes.end(), [](
const auto& e) { return e.is_output; }),
230 std::count_if(m_nodes.begin(), m_nodes.end(), [](
const auto& e) { return !e.is_output; }));
232 for (
const auto& e : m_nodes)
233 MF_INFO(
C,
X,
" id={} '{}' output={} priority={}", e.id, e.info.name, e.is_output, e.priority_session);
235 MF_INFO(C, X,
"default output={} input={}", m_default_output_id, m_default_input_id);
238std::vector<DeviceInfo> PipewireDevice::get_output_devices()
const
240 std::vector<DeviceInfo> out;
241 for (
const auto& e : m_nodes) {
243 out.push_back(e.info);
248std::vector<DeviceInfo> PipewireDevice::get_input_devices()
const
250 std::vector<DeviceInfo> out;
251 for (
const auto& e : m_nodes) {
253 out.push_back(e.info);
258unsigned int PipewireDevice::get_default_output_device()
const
260 return static_cast<unsigned int>(m_default_output_id);
263unsigned int PipewireDevice::get_default_input_device()
const
265 return static_cast<unsigned int>(m_default_input_id);
268uint32_t PipewireDevice::find_output_node_id(std::string_view name)
const
270 for (
const auto& e : m_nodes) {
271 if (e.is_output && e.info.name == name)
275 return m_default_output_id;
278uint32_t PipewireDevice::find_input_node_id(std::string_view name)
const
280 for (
const auto& e : m_nodes) {
281 if (!e.is_output && e.info.name == name)
285 return m_default_input_id;
292PipewireStream::PipewireStream(
293 uint32_t output_node_id,
294 uint32_t input_node_id,
295 GlobalStreamInfo& stream_info,
297 : m_output_node_id(output_node_id)
298 , m_input_node_id(input_node_id)
299 , m_stream_info(stream_info)
300 , m_user_data(user_data)
304PipewireStream::~PipewireStream()
312pw_time PipewireStream::get_stream_time()
const
316 pw_stream_get_time_n(m_output_stream, &t,
sizeof(t));
320void PipewireStream::set_process_callback(std::function<
int(
void*,
void*,
unsigned int)> cb)
322 m_process_callback = std::move(cb);
325void PipewireStream::on_input_process(
void* userdata)
327 auto* self =
static_cast<PipewireStream*
>(userdata);
329 pw_buffer* pb = pw_stream_dequeue_buffer(self->m_input_stream);
333 spa_meta_header*
h =
static_cast<spa_meta_header*
>(
334 spa_buffer_find_meta_data(pb->buffer, SPA_META_Header,
sizeof(*
h)));
336 if (
h && (
h->flags & SPA_META_HEADER_FLAG_CORRUPTED)) {
337 MF_RT_WARN(C, X,
"xrun detected on input buffer");
340 spa_buffer* sb = pb->buffer;
341 const uint32_t ch = self->m_stream_info.input.channels;
342 const uint32_t frames = self->m_negotiated_frames.load(std::memory_order_relaxed);
343 const size_t n_samples =
static_cast<size_t>(frames) * ch;
345 if (sb->datas[0].data && n_samples > 0) {
346 const auto src =
static_cast<const double*
>(sb->datas[0].data);
347 std::memcpy(self->m_input_staging.data(), src, n_samples *
sizeof(
double));
349 std::ranges::fill(self->m_input_staging, 0.0);
352 pw_stream_queue_buffer(self->m_input_stream, pb);
353 self->m_input_ready.store(
true, std::memory_order_release);
356void PipewireStream::build_output_format_params(
357 uint8_t* buf, uint32_t buf_size,
358 const struct spa_pod** params, uint32_t& n_params)
360 spa_pod_builder
b = SPA_POD_BUILDER_INIT(buf, buf_size);
361 struct spa_audio_info_raw raw = SPA_AUDIO_INFO_RAW_INIT(
362 .format = SPA_AUDIO_FORMAT_F64,
363 .rate = m_stream_info.sample_rate,
364 .channels = m_stream_info.output.channels);
365 params[0] =
static_cast<const struct spa_pod*
>(
366 spa_format_audio_raw_build(&
b, SPA_PARAM_EnumFormat, &raw));
370void PipewireStream::build_input_format_params(
371 uint8_t* buf, uint32_t buf_size,
372 const struct spa_pod** params, uint32_t& n_params)
374 spa_pod_builder
b = SPA_POD_BUILDER_INIT(buf, buf_size);
375 struct spa_audio_info_raw raw = SPA_AUDIO_INFO_RAW_INIT(
376 .format = SPA_AUDIO_FORMAT_F64,
377 .rate = m_stream_info.sample_rate,
378 .channels = m_stream_info.input.channels);
379 params[0] =
static_cast<const struct spa_pod*
>(
380 spa_format_audio_raw_build(&
b, SPA_PARAM_EnumFormat, &raw));
384void PipewireStream::on_output_process(
void* userdata)
386 auto* self =
static_cast<PipewireStream*
>(userdata);
387 if (!self->m_process_callback)
390 pw_buffer* pb = pw_stream_dequeue_buffer(self->m_output_stream);
392 MF_RT_WARN(C, X,
"on_process: dequeue returned null");
396 spa_buffer* sb = pb->buffer;
397 void* data = sb->datas[0].data;
398 const uint32_t ch = self->m_stream_info.output.channels;
400 uint32_t frames = self->m_rate_match
401 ? self->m_rate_match->size
402 : self->m_negotiated_frames.load(std::memory_order_relaxed);
404 spa_meta_header*
h =
static_cast<spa_meta_header*
>(
405 spa_buffer_find_meta_data(pb->buffer, SPA_META_Header,
sizeof(*
h)));
407 const bool xrun =
h && (
h->flags & SPA_META_HEADER_FLAG_CORRUPTED);
410 if (self->m_stream_info.handle_xruns) {
411 std::memset(data, 0, frames * ch *
sizeof(
double));
412 pw_stream_queue_buffer(self->m_output_stream, pb);
418 frames = sb->datas[0].chunk->size / (ch *
sizeof(double));
420 if (frames == 0 || !data) {
421 MF_RT_WARN(C, X,
"on_process: zero frames or null data, skipping");
422 pw_stream_queue_buffer(self->m_output_stream, pb);
426 sb->datas[0].chunk->offset = 0;
427 sb->datas[0].chunk->stride =
static_cast<int32_t
>(ch *
sizeof(double));
428 sb->datas[0].chunk->size =
static_cast<size_t>(frames * ch) *
sizeof(
double);
430 void* input_ptr =
nullptr;
431 if (self->m_stream_info.input.enabled && self->m_input_ready.load(std::memory_order_acquire)) {
432 input_ptr = self->m_input_staging.data();
433 self->m_input_ready.store(
false, std::memory_order_release);
436 self->m_process_callback(data, input_ptr, frames);
438 pw_stream_queue_buffer(self->m_output_stream, pb);
441void PipewireStream::on_param_changed(
void* userdata, uint32_t
id,
const struct spa_pod* param)
443 auto* self =
static_cast<PipewireStream*
>(userdata);
444 if (!param ||
id != SPA_PARAM_Format)
447 struct spa_audio_info info {};
448 if (spa_format_audio_raw_parse(param, &info.info.raw) < 0) {
449 MF_WARN(C, X,
"on_param_changed: failed to parse audio format");
453 self->m_negotiated_frames.store(self->m_stream_info.buffer_size, std::memory_order_relaxed);
455 MF_INFO(C, X,
"format negotiated: F64 {} ch @ {} Hz",
456 info.info.raw.channels, info.info.raw.rate);
459 spa_pod_builder
b = SPA_POD_BUILDER_INIT(buf,
sizeof(buf));
460 const uint32_t ch = self->m_stream_info.output.channels;
462 const auto* bufparam =
static_cast<const struct spa_pod*
>(
463 spa_pod_builder_add_object(&
b,
464 SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers,
465 SPA_PARAM_BUFFERS_blocks, SPA_POD_Int(1),
466 SPA_PARAM_BUFFERS_size, SPA_POD_Int(self->m_stream_info.buffer_size * ch *
sizeof(
double)),
467 SPA_PARAM_BUFFERS_stride, SPA_POD_Int(ch *
sizeof(
double))));
469 pw_stream_update_params(self->m_output_stream, &bufparam, 1);
472void PipewireStream::on_state_changed(
474 enum pw_stream_state,
475 enum pw_stream_state new_state,
476 const char* error_str)
478 auto* self =
static_cast<PipewireStream*
>(userdata);
481 MF_WARN(C, X,
"stream error: {}", error_str);
484 case PW_STREAM_STATE_STREAMING:
485 self->m_is_running.store(
true, std::memory_order_release);
486 MF_INFO(C, X,
"stream streaming");
488 case PW_STREAM_STATE_PAUSED:
489 self->m_is_running.store(
false, std::memory_order_release);
490 MF_INFO(C, X,
"stream paused");
492 case PW_STREAM_STATE_ERROR:
493 self->m_is_running.store(
false, std::memory_order_release);
494 MF_WARN(C, X,
"stream error state");
501void PipewireStream::on_io_changed(
void* userdata, uint32_t
id,
void* area, uint32_t)
503 auto* self =
static_cast<PipewireStream*
>(userdata);
504 if (
id == SPA_IO_RateMatch && area) {
505 self->m_rate_match =
static_cast<spa_io_rate_match*
>(area);
509void PipewireStream::open()
511 if (m_is_open.load())
514 m_thread_loop = pw_thread_loop_new(
"mayaflux-audio",
nullptr);
516 error(C, X, std::source_location::current(),
"pw_thread_loop_new failed");
518 pw_loop* loop = pw_thread_loop_get_loop(m_thread_loop);
520 auto read_opt = [&](
const char* key) -> std::optional<std::string> {
521 auto it = m_stream_info.backend_options.find(key);
522 if (it == m_stream_info.backend_options.end())
525 return std::any_cast<std::string>(it->second);
526 }
catch (
const std::bad_any_cast&) {
531 auto read_bool = [&](
const char* key) -> std::optional<bool> {
532 auto it = m_stream_info.backend_options.find(key);
533 if (it == m_stream_info.backend_options.end())
536 return std::any_cast<bool>(it->second);
537 }
catch (
const std::bad_any_cast&) {
542 const bool force_quantum = (m_stream_info.priority == GlobalStreamInfo::StreamPriority::REALTIME) && !read_bool(
"pipewire.disable_force_quantum").value_or(
false);
544 const std::string latency_str = std::to_string(m_stream_info.buffer_size) +
"/" + std::to_string(m_stream_info.sample_rate);
546 const std::string node_name = read_opt(
"pipewire.node.name").value_or(
"mayaflux-output");
547 const std::string node_desc = read_opt(
"pipewire.node.description").value_or(
"MayaFlux Audio");
548 const std::string media_role = read_opt(
"pipewire.role").value_or(
"Production");
549 const std::string pid_str = std::to_string(getpid());
551 pw_properties* props = pw_properties_new(
552 PW_KEY_MEDIA_TYPE,
"Audio",
553 PW_KEY_MEDIA_CATEGORY,
"Playback",
554 PW_KEY_MEDIA_ROLE, media_role.c_str(),
555 PW_KEY_MEDIA_CLASS,
"Stream/Output/Audio",
556 PW_KEY_APP_NAME,
"MayaFlux",
557 PW_KEY_APP_PROCESS_ID, pid_str.c_str(),
558 PW_KEY_APP_PROCESS_BINARY,
"mayaflux",
559 PW_KEY_NODE_NAME, node_name.c_str(),
560 PW_KEY_NODE_DESCRIPTION, node_desc.c_str(),
561 PW_KEY_NODE_LATENCY, latency_str.c_str(),
565 pw_properties_set(props,
"node.force-quantum",
566 std::to_string(m_stream_info.buffer_size).c_str());
569 m_output_stream_events = pw_stream_events {
570 .version = PW_VERSION_STREAM_EVENTS,
571 .state_changed = on_state_changed,
572 .io_changed = on_io_changed,
573 .param_changed = on_param_changed,
574 .process = on_output_process,
577 pw_thread_loop_lock(m_thread_loop);
579 m_output_stream = pw_stream_new_simple(
580 loop,
"mayaflux-output", props, &m_output_stream_events,
this);
582 if (!m_output_stream) {
583 pw_thread_loop_unlock(m_thread_loop);
584 pw_thread_loop_destroy(m_thread_loop);
585 m_thread_loop =
nullptr;
586 error(C, X, std::source_location::current(),
"pw_stream_new_simple failed (output)");
590 uint32_t n_params = 0;
591 const struct spa_pod* params[1];
592 build_output_format_params(buf,
sizeof(buf), params, n_params);
594 const uint32_t flags = PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS;
596 if (
auto v = read_opt(
"pipewire.node.name"))
597 pw_properties_set(props, PW_KEY_NODE_NAME, v->c_str());
599 if (
auto v = read_opt(
"pipewire.role"))
600 pw_properties_set(props, PW_KEY_MEDIA_ROLE, v->c_str());
602 if (
auto v = read_opt(
"pipewire.target_object"))
603 pw_properties_set(props, PW_KEY_TARGET_OBJECT, v->c_str());
605 int ret = pw_stream_connect(
609 static_cast<pw_stream_flags
>(flags),
613 pw_thread_loop_unlock(m_thread_loop);
614 pw_stream_destroy(m_output_stream);
615 m_output_stream =
nullptr;
616 pw_thread_loop_destroy(m_thread_loop);
617 m_thread_loop =
nullptr;
618 error(C, X, std::source_location::current(),
619 "pw_stream_connect failed (output): {}", spa_strerror(ret));
622 if (m_stream_info.input.enabled && m_stream_info.input.channels > 0) {
623 const size_t staging_samples =
static_cast<size_t>(m_stream_info.buffer_size) * m_stream_info.input.channels;
624 m_input_staging.assign(staging_samples, 0.0);
626 pw_properties* in_props = pw_properties_new(
627 PW_KEY_MEDIA_TYPE,
"Audio",
628 PW_KEY_MEDIA_CATEGORY,
"Capture",
629 PW_KEY_MEDIA_ROLE,
"Production",
630 PW_KEY_MEDIA_CLASS,
"Stream/Input/Audio",
631 PW_KEY_APP_NAME,
"MayaFlux",
632 PW_KEY_NODE_NAME,
"mayaflux-input",
633 PW_KEY_NODE_LATENCY, latency_str.c_str(),
636 m_input_stream_events = pw_stream_events {
637 .version = PW_VERSION_STREAM_EVENTS,
638 .process = on_input_process,
641 m_input_stream = pw_stream_new_simple(
642 loop,
"mayaflux-input", in_props, &m_input_stream_events,
this);
644 if (!m_input_stream) {
645 pw_thread_loop_unlock(m_thread_loop);
646 pw_stream_destroy(m_output_stream);
647 m_output_stream =
nullptr;
648 pw_thread_loop_destroy(m_thread_loop);
649 m_thread_loop =
nullptr;
650 error(C, X, std::source_location::current(),
"pw_stream_new_simple failed (input)");
653 uint8_t in_buf[1024];
654 uint32_t in_n_params = 0;
655 const struct spa_pod* in_params[1];
656 build_input_format_params(in_buf,
sizeof(in_buf), in_params, in_n_params);
658 ret = pw_stream_connect(
661 m_input_node_id != 0 ? m_input_node_id : PW_ID_ANY,
662 static_cast<pw_stream_flags>(flags),
663 in_params, in_n_params);
666 pw_thread_loop_unlock(m_thread_loop);
667 pw_stream_destroy(m_input_stream);
668 m_input_stream =
nullptr;
669 pw_stream_destroy(m_output_stream);
670 m_output_stream =
nullptr;
671 pw_thread_loop_destroy(m_thread_loop);
672 m_thread_loop =
nullptr;
673 error(C, X, std::source_location::current(),
674 "pw_stream_connect failed (input): {}", spa_strerror(ret));
678 pw_thread_loop_unlock(m_thread_loop);
680 m_is_open.store(
true, std::memory_order_release);
682 MF_INFO(C, X,
"stream opened (quantum: {}/{}, force: {})",
683 m_stream_info.buffer_size, m_stream_info.sample_rate, force_quantum);
686void PipewireStream::start()
688 if (!m_is_open.load())
689 error(C, X, std::source_location::current(),
"start() called before open()");
690 if (m_is_running.load())
693 int ret = pw_thread_loop_start(m_thread_loop);
695 error(C, X, std::source_location::current(),
696 "pw_thread_loop_start failed: {}", spa_strerror(ret));
699 pw_thread_loop_lock(m_thread_loop);
700 pw_stream_set_active(m_output_stream,
true);
702 if (m_input_stream) {
703 pw_stream_set_active(m_input_stream,
true);
705 pw_thread_loop_unlock(m_thread_loop);
708void PipewireStream::stop()
710 if (!m_is_running.load())
713 pw_thread_loop_stop(m_thread_loop);
714 m_is_running.store(
false, std::memory_order_release);
717void PipewireStream::pause()
719 if (!m_is_running.load() || m_is_paused.load())
722 pw_thread_loop_stop(m_thread_loop);
723 m_is_paused.store(
true, std::memory_order_release);
724 m_is_running.store(
false, std::memory_order_release);
727void PipewireStream::resume()
729 if (!m_is_paused.load())
732 int ret = pw_thread_loop_start(m_thread_loop);
734 error(C, X, std::source_location::current(),
735 "pw_thread_loop_start failed on resume: {}", spa_strerror(ret));
738 pw_thread_loop_lock(m_thread_loop);
739 pw_stream_set_active(m_output_stream,
true);
741 pw_stream_set_active(m_input_stream,
true);
742 pw_thread_loop_unlock(m_thread_loop);
744 m_is_paused.store(
false, std::memory_order_release);
745 m_is_running.store(
true, std::memory_order_release);
748void PipewireStream::close()
750 if (!m_is_open.load())
753 if (m_is_running.load() || m_is_paused.load())
754 pw_thread_loop_stop(m_thread_loop);
756 m_is_running.store(
false, std::memory_order_release);
757 m_is_paused.store(
false, std::memory_order_release);
759 if (m_input_stream) {
760 pw_stream_destroy(m_input_stream);
761 m_input_stream =
nullptr;
763 if (m_output_stream) {
764 pw_stream_destroy(m_output_stream);
765 m_output_stream =
nullptr;
768 pw_thread_loop_destroy(m_thread_loop);
769 m_thread_loop =
nullptr;
772 m_is_open.store(
false, std::memory_order_release);
775bool PipewireStream::is_running()
const {
return m_is_running.load(std::memory_order_acquire); }
776bool PipewireStream::is_open()
const {
return m_is_open.load(std::memory_order_acquire); }
#define MF_INFO(comp, ctx,...)
#define MF_RT_WARN(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
@ AudioBackend
Audio processing backend (Pipewire, wasapi, coreaudio)
void error(Component component, Context context, std::source_location location, std::string_view message)
Log an error message and optionally throw an exception.
@ Core
Core engine, backend, subsystems.
void stop()
Stop all Portal::Graphics operations.