MayaFlux 0.2.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferUtils.cpp
Go to the documentation of this file.
1#include "BufferUtils.hpp"
2
4
6
7namespace MayaFlux::Buffers {
8
10{
11 bool preferred_sample = preferred & SAMPLE_RATE;
12 bool preferred_frame = preferred & FRAME_RATE;
13 bool current_sample = current & SAMPLE_RATE;
14 bool current_frame = current & FRAME_RATE;
15
16 // If preferred is FRAME_RATE, only FRAME_RATE is compatible
17 if (preferred_frame && !current_frame)
18 return false;
19 // If preferred is SAMPLE_RATE, FRAME_RATE can be compatible (sample can "delay" to frame)
20 if (preferred_sample && current_frame)
21 return true;
22 // If both are SAMPLE_RATE or both are FRAME_RATE, compatible
23 if ((preferred_sample && current_sample) || (preferred_frame && current_frame))
24 return true;
25
26 // Device compatibility: SAMPLE_RATE can't run on GPU, but FRAME_RATE can run on CPU
27 bool preferred_cpu = preferred & CPU_PROCESS;
28 bool preferred_gpu = preferred & GPU_PPOCESS;
29 bool current_cpu = current & CPU_PROCESS;
30 bool current_gpu = current & GPU_PPOCESS;
31
32 if (preferred_sample && current_gpu)
33 return false; // Can't run sample rate on GPU
34 if (preferred_gpu && current_cpu)
35 return false; // If preferred is GPU, but current is CPU, not compatible
36 // If preferred is CPU, but current is GPU, allow only for FRAME_RATE
37 if (preferred_cpu && current_gpu && !current_frame)
38 return false;
39
40 // Sequential/Parallel compatibility: allow if rates align
41 bool preferred_seq = preferred & SEQUENTIAL;
42 bool preferred_par = preferred & PARALLEL;
43 bool current_seq = current & SEQUENTIAL;
44 bool current_par = current & PARALLEL;
45
46 if ((preferred_seq && current_par) || (preferred_par && current_seq)) {
47 // Allow if rates align (already checked above)
48 if ((preferred_sample && current_sample) || (preferred_frame && current_frame)) {
49 return true;
50 }
51 // If preferred is SAMPLE_RATE and current is FRAME_RATE, already handled above
52 // Otherwise, not compatible
53 return false;
54 }
55
56 // If all checks pass, compatible
57 return true;
58}
59
61{
62 if ((token & SAMPLE_RATE) && (token & FRAME_RATE)) {
63 error<std::invalid_argument>(
66 std::source_location::current(),
67 "SAMPLE_RATE and FRAME_RATE are mutually exclusive.");
68 }
69
70 if ((token & CPU_PROCESS) && (token & GPU_PPOCESS)) {
71 error<std::invalid_argument>(Journal::Component::Buffers,
73 std::source_location::current(),
74 "CPU_PROCESS and GPU_PROCESS are mutually exclusive.");
75 }
76
77 if ((token & SEQUENTIAL) && (token & PARALLEL)) {
78 error<std::invalid_argument>(Journal::Component::Buffers,
80 std::source_location::current(),
81 "SEQUENTIAL and PARALLEL are mutually exclusive.");
82 }
83}
84
85ProcessingToken get_optimal_token(const std::string& buffer_type, uint32_t system_capabilities)
86{
87 if (buffer_type == "audio") {
88 return (system_capabilities & 0x1) ? AUDIO_PARALLEL : AUDIO_BACKEND;
89 }
90
91 if (buffer_type == "video" || buffer_type == "texture") {
92 return GRAPHICS_BACKEND;
93 }
94 return AUDIO_BACKEND;
95}
96
98 const std::shared_ptr<Nodes::Node>& node,
99 uint64_t active_context_id,
100 int max_spins)
101{
102 int spin_count = 0;
103
104 while (node->is_in_snapshot_context(active_context_id) && spin_count < max_spins) {
105 if (spin_count < 10) {
106 for (int i = 0; i < (1 << spin_count); ++i) {
107 MF_PAUSE_INSTRUCTION();
108 }
109 } else {
110 std::this_thread::yield();
111 }
112 ++spin_count;
113 }
114
115 if (spin_count >= max_spins) {
117 "Timeout waiting for node snapshot to complete. "
118 "Possible deadlock or very long processing time.");
119 return false;
120 }
121
122 return true;
123}
124
125double extract_single_sample(const std::shared_ptr<Nodes::Node>& node)
126{
127 if (!node) {
129 "extract_single_sample: null node");
130 return 0.0;
131 }
132
133 static std::atomic<uint64_t> s_context_counter { 1 };
134 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
135
136 const auto state = node->m_state.load(std::memory_order_acquire);
137
138 if (state == Utils::NodeState::INACTIVE && !node->is_buffer_processed()) {
139 double value = node->process_sample(0.F);
140 node->mark_buffer_processed();
141 return value;
142 }
143
144 bool claimed = node->try_claim_snapshot_context(my_context_id);
145
146 if (claimed) {
147 try {
148 node->save_state();
149 double value = node->process_sample(0.F);
150 node->restore_state();
151
152 if (node->is_buffer_processed()) {
153 node->request_buffer_reset();
154 }
155
156 node->release_snapshot_context(my_context_id);
157 return value;
158
159 } catch (const std::exception& e) {
160 node->release_snapshot_context(my_context_id);
162 "Error processing node: {}", e.what());
163 return 0.0;
164 }
165 } else {
166 uint64_t active_context = node->get_active_snapshot_context();
167
168 if (!wait_for_snapshot_completion(node, active_context)) {
169 return 0.0;
170 }
171
172 node->save_state();
173 double value = node->process_sample(0.F);
174 node->restore_state();
175
176 if (node->is_buffer_processed()) {
177 node->request_buffer_reset();
178 }
179
180 return value;
181 }
182}
183
184std::vector<double> extract_multiple_samples(
185 const std::shared_ptr<Nodes::Node>& node,
186 size_t num_samples)
187{
188 std::vector<double> output(num_samples);
189
190 if (!node) {
192 "extract_multiple_samples: null node");
193 return output;
194 }
195
196 static std::atomic<uint64_t> s_context_counter { 1 };
197 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
198
199 const auto state = node->m_state.load(std::memory_order_acquire);
200
201 // Fast path: inactive node
202 if (state == Utils::NodeState::INACTIVE && !node->is_buffer_processed()) {
203 for (size_t i = 0; i < num_samples; i++) {
204 output[i] = node->process_sample(0.F);
205 }
206 node->mark_buffer_processed();
207 return output;
208 }
209
210 bool claimed = node->try_claim_snapshot_context(my_context_id);
211
212 if (claimed) {
213 try {
214 node->save_state();
215
216 for (size_t i = 0; i < num_samples; i++) {
217 output[i] = node->process_sample(0.F);
218 }
219
220 node->restore_state();
221
222 if (node->is_buffer_processed()) {
223 node->request_buffer_reset();
224 }
225
226 node->release_snapshot_context(my_context_id);
227
228 } catch (const std::exception& e) {
229 node->release_snapshot_context(my_context_id);
231 "Error processing node: {}", e.what());
232 output.clear();
233 }
234 } else {
235 uint64_t active_context = node->get_active_snapshot_context();
236
237 if (!wait_for_snapshot_completion(node, active_context)) {
238 output.clear();
239 return output;
240 }
241
242 node->save_state();
243 for (size_t i = 0; i < num_samples; i++) {
244 output[i] = node->process_sample(0.F);
245 }
246 node->restore_state();
247
248 if (node->is_buffer_processed()) {
249 node->request_buffer_reset();
250 }
251 }
252
253 return output;
254}
255
257 const std::shared_ptr<Nodes::Node>& node,
258 std::span<double> buffer,
259 double mix)
260{
261 if (!node) {
263 "apply_to_buffer: null node");
264 return;
265 }
266
267 static std::atomic<uint64_t> s_context_counter { 1 };
268 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
269
270 const auto state = node->m_state.load(std::memory_order_acquire);
271
272 if (state == Utils::NodeState::INACTIVE && !node->is_buffer_processed()) {
273 for (double& sample : buffer) {
274 sample += node->process_sample(0.F) * mix;
275 }
276 node->mark_buffer_processed();
277 return;
278 }
279
280 bool claimed = node->try_claim_snapshot_context(my_context_id);
281
282 if (claimed) {
283 try {
284 node->save_state();
285
286 for (double& sample : buffer) {
287 sample += node->process_sample(0.F) * mix;
288 }
289
290 node->restore_state();
291
292 if (node->is_buffer_processed()) {
293 node->request_buffer_reset();
294 }
295
296 node->release_snapshot_context(my_context_id);
297
298 } catch (const std::exception& e) {
299 node->release_snapshot_context(my_context_id);
301 "Error processing node: {}", e.what());
302 }
303 } else {
304 uint64_t active_context = node->get_active_snapshot_context();
305
306 if (!wait_for_snapshot_completion(node, active_context)) {
307 return;
308 }
309
310 node->save_state();
311 for (double& sample : buffer) {
312 sample += node->process_sample(0.F) * mix;
313 }
314 node->restore_state();
315
316 if (node->is_buffer_processed()) {
317 node->request_buffer_reset();
318 }
319 }
320}
321
322} // namespace MayaFlux::Buffers
#define MF_RT_ERROR(comp, ctx,...)
static MayaFlux::Nodes::ProcessingToken token
Definition Timers.cpp:8
std::vector< double > extract_multiple_samples(const std::shared_ptr< Nodes::Node > &node, size_t num_samples)
Extract multiple samples from a node into a vector.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
@ SAMPLE_RATE
Processes data at audio sample rate with buffer-sized chunks.
@ CPU_PROCESS
Executes processing operations on CPU threads.
@ AUDIO_BACKEND
Standard audio processing backend configuration.
@ PARALLEL
Processes operations in parallel when possible.
@ SEQUENTIAL
Processes operations sequentially, one after another.
@ GRAPHICS_BACKEND
Standard graphics processing backend configuration.
@ FRAME_RATE
Processes data at video frame rate.
@ GPU_PPOCESS
Executes processing operations on GPU hardware.
@ AUDIO_PARALLEL
High-performance audio processing with GPU acceleration.
void validate_token(ProcessingToken token)
Validates that a processing token has a valid, non-conflicting configuration.
double extract_single_sample(const std::shared_ptr< Nodes::Node > &node)
Extract a single sample from a node with proper snapshot management.
ProcessingToken get_optimal_token(const std::string &buffer_type, uint32_t system_capabilities)
Gets the optimal processing token for a given buffer type and system configuration.
void update_buffer_with_node_data(const std::shared_ptr< Nodes::Node > &node, std::span< double > buffer, double mix)
Apply node output to an existing buffer with mixing.
bool wait_for_snapshot_completion(const std::shared_ptr< Nodes::Node > &node, uint64_t active_context_id, int max_spins)
Wait for an active snapshot context to complete using exponential backoff.
@ BufferProcessing
Buffer processing (Buffers::BufferManager, processing chains)
@ Buffers
Buffers, Managers, processors and processing chains.
@ INACTIVE
Engine is not processing this node.
Definition Utils.hpp:29
std::vector< double > mix(const std::vector< std::vector< double > > &streams)
Mix multiple data streams with equal weighting.
Definition Yantra.cpp:1019