MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
SoundFileWriter.cpp
Go to the documentation of this file.
1#include "SoundFileWriter.hpp"
2
5
9extern "C" {
10#include <libavcodec/avcodec.h>
11}
12
13#include <chrono>
14#include <cstddef>
15
16namespace MayaFlux::IO {
17
18// =========================================================================
19// Constructor / destructor
20// =========================================================================
21
23 : m_queue(std::make_unique<Memory::LockFreeQueue<WorkItem, k_queue_capacity>>())
24{
25}
26
28{
29 if (m_open.load(std::memory_order_acquire) && !m_closing.exchange(true)) {
30 auto fut = close();
31 if (fut.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) {
33 "SoundFileWriter destructor timed out; worker detached, file may be incomplete");
34 m_worker.detach();
35 return;
36 }
37 }
38 if (m_worker.joinable())
39 m_worker.join();
40}
41
42// =========================================================================
43// Lifecycle
44// =========================================================================
45
46bool SoundFileWriter::open(const std::string& filepath,
47 uint32_t channels,
48 uint32_t sample_rate,
49 AVCodecID explicit_codec)
50{
51 if (m_open.load(std::memory_order_acquire)) {
52 set_error("open() called while already open");
53 return false;
54 }
55
56 m_channels = channels;
57 m_close_promise = std::promise<bool> {};
58 m_close_future = m_close_promise.get_future().share();
59 m_closing.store(false, std::memory_order_release);
60
61 m_worker = std::thread(&SoundFileWriter::worker_loop, this,
62 filepath, channels, sample_rate, explicit_codec);
63
64 constexpr int k_spin_ms = 500;
65 constexpr int k_sleep_us = 500;
66 for (int i = 0; i < (k_spin_ms * 1000 / k_sleep_us); ++i) {
67 if (m_open.load(std::memory_order_acquire))
68 return true;
69 std::this_thread::sleep_for(std::chrono::microseconds(k_sleep_us));
70 }
71
72 if (m_worker.joinable())
73 m_worker.join();
74 return false;
75}
76
77std::future<bool> SoundFileWriter::close()
78{
79 if (!m_closing.exchange(true)) {
80 post(CloseCmd {});
81 }
82 return std::async(std::launch::deferred,
83 [f = m_close_future]() { return f.get(); });
84}
85
86// =========================================================================
87// Write — interleaved doubles
88// =========================================================================
89
90void SoundFileWriter::write(std::span<const double> interleaved, uint32_t num_frames)
91{
92 if (!m_open.load(std::memory_order_acquire) || interleaved.empty())
93 return;
94
95 uint32_t frames = num_frames > 0
96 ? num_frames
97 : static_cast<uint32_t>(interleaved.size() / (m_channels > 0 ? m_channels : 1));
98
100 .samples = std::vector<double>(interleaved.begin(), interleaved.end()),
101 .num_frames = frames });
102}
103
104void SoundFileWriter::write(const std::vector<Kakshya::DataVariant>& planar)
105{
106 if (!m_open.load(std::memory_order_acquire) || planar.empty())
107 return;
108
109 const auto& ch0 = std::get<std::vector<double>>(planar[0]);
110 auto frames = static_cast<uint32_t>(ch0.size());
111 if (frames == 0)
112 return;
113
114 PlanarChunk chunk;
115 chunk.num_frames = frames;
116 chunk.channels.reserve(planar.size());
117 for (const auto& v : planar)
118 chunk.channels.push_back(std::get<std::vector<double>>(v));
119
120 post(std::move(chunk));
121}
122
123void SoundFileWriter::write(const std::shared_ptr<Buffers::AudioBuffer>& buffer)
124{
125 if (!m_open.load(std::memory_order_acquire) || !buffer)
126 return;
127
128 const auto& data = buffer->get_data();
129 if (data.empty())
130 return;
131
132 post(FrameChunk { .samples = data, .num_frames = static_cast<uint32_t>(data.size()) });
133}
134
135void SoundFileWriter::write(const std::shared_ptr<Kakshya::SoundStreamContainer>& container)
136{
137 if (!m_open.load(std::memory_order_acquire) || !container)
138 return;
139
140 const auto& data = container->get_data();
141 if (data.empty())
142 return;
143
144 write(data);
145}
146
147// =========================================================================
148// Error
149// =========================================================================
150
152{
153 std::lock_guard lock(m_error_mutex);
154 return m_last_error;
155}
156
157void SoundFileWriter::set_error(std::string msg)
158{
159 std::lock_guard lock(m_error_mutex);
160 m_last_error = std::move(msg);
161}
162
163// =========================================================================
164// Internal helpers
165// =========================================================================
166
168{
169 return m_queue->push(item);
170}
171
172// =========================================================================
173// Worker loop
174// =========================================================================
175
176void SoundFileWriter::worker_loop(const std::string& filepath,
177 uint32_t channels,
178 uint32_t sample_rate,
179 AVCodecID codec_id)
180{
183
184 auto fail = [&](std::string msg) {
185 set_error(std::move(msg));
186 m_open.store(false, std::memory_order_release);
187 m_close_promise.set_value(false);
188 };
189
190 if (!mux.open(filepath)) {
191 fail(mux.last_error());
192 return;
193 }
194
195 if (!enc.open(mux, sample_rate, channels, codec_id)) {
196 fail(enc.last_error());
197 return;
198 }
199
200 if (!mux.write_header()) {
201 fail(mux.last_error());
202 return;
203 }
204
205 m_open.store(true, std::memory_order_release);
206
207 bool ok = true;
208
209 while (true) {
210 auto item = m_queue->pop();
211 if (!item) {
212 std::this_thread::yield();
213 continue;
214 }
215
216 if (std::holds_alternative<FrameChunk>(*item)) {
217 auto& fc = std::get<FrameChunk>(*item);
218 if (!enc.encode_frames(std::span<const double>(fc.samples), fc.num_frames, mux)) {
219 set_error(enc.last_error());
220 ok = false;
221 }
222 } else if (std::holds_alternative<PlanarChunk>(*item)) {
223 auto& pc = std::get<PlanarChunk>(*item);
224 const auto ch = static_cast<uint32_t>(pc.channels.size());
225 std::vector<double> interleaved(static_cast<size_t>(pc.num_frames) * ch);
226
227 for (uint32_t f = 0; f < pc.num_frames; ++f) {
228 for (uint32_t c = 0; c < ch; ++c)
229 interleaved[(size_t)f * ch + c] = pc.channels[c][f];
230 }
231
232 if (!enc.encode_frames(interleaved, pc.num_frames, mux)) {
233 set_error(enc.last_error());
234 ok = false;
235 }
236 } else {
237 if (ok && !enc.drain(mux)) {
238 set_error(enc.last_error());
239 ok = false;
240 }
241 mux.close();
242 m_open.store(false, std::memory_order_release);
243 m_close_promise.set_value(ok);
244 return;
245 }
246 }
247}
248
249} // namespace MayaFlux::IO
#define MF_ERROR(comp, ctx,...)
const std::string & last_error() const
bool drain(FFmpegMuxContext &mux)
Flush the FIFO remainder and encoder internal delay to the mux.
bool open(FFmpegMuxContext &mux, uint32_t sample_rate, uint32_t channels, AVCodecID codec_id)
Open the encoder and register an audio stream in the mux context.
bool encode_frames(std::span< const double > interleaved, uint32_t num_frames, FFmpegMuxContext &mux)
Encode a block of interleaved double-precision PCM frames.
RAII owner of one audio stream's encoder, resampler, and sample FIFO.
bool open(const std::string &filepath, const std::string &explicit_format={})
Allocate an output context and open the avio layer for writing.
bool write_header()
Write the container header to the output file.
const std::string & last_error() const
void close()
Write the container trailer, flush avio, and release all resources.
RAII owner of a single AVFormatContext on the write path.
std::variant< FrameChunk, PlanarChunk, CloseCmd > WorkItem
bool post(const WorkItem &item)
std::future< bool > close()
Post a close command to the worker.
std::promise< bool > m_close_promise
bool open(const std::string &filepath, uint32_t channels, uint32_t sample_rate, AVCodecID explicit_codec)
Open the output file and spawn the worker thread.
void write(std::span< const double > interleaved, uint32_t num_frames=0)
Post interleaved double-precision frames to the work queue.
std::unique_ptr< Memory::LockFreeQueue< WorkItem, k_queue_capacity > > m_queue
void set_error(std::string msg)
std::shared_future< bool > m_close_future
void worker_loop(const std::string &filepath, uint32_t channels, uint32_t sample_rate, AVCodecID codec_id)
@ FileIO
Filesystem I/O operations.
@ IO
Networking, file handling, streaming.
std::vector< std::vector< double > > channels