MayaFlux 0.3.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferUtils.cpp
Go to the documentation of this file.
1#include "BufferUtils.hpp"
2
5
7
8namespace MayaFlux::Buffers {
9
11{
12 bool preferred_sample = preferred & SAMPLE_RATE;
13 bool preferred_frame = preferred & FRAME_RATE;
14 bool current_sample = current & SAMPLE_RATE;
15 bool current_frame = current & FRAME_RATE;
16
17 // If preferred is FRAME_RATE, only FRAME_RATE is compatible
18 if (preferred_frame && !current_frame)
19 return false;
20 // If preferred is SAMPLE_RATE, FRAME_RATE can be compatible (sample can "delay" to frame)
21 if (preferred_sample && current_frame)
22 return true;
23 // If both are SAMPLE_RATE or both are FRAME_RATE, compatible
24 if ((preferred_sample && current_sample) || (preferred_frame && current_frame))
25 return true;
26
27 // Device compatibility: SAMPLE_RATE can't run on GPU, but FRAME_RATE can run on CPU
28 bool preferred_cpu = preferred & CPU_PROCESS;
29 bool preferred_gpu = preferred & GPU_PROCESS;
30 bool current_cpu = current & CPU_PROCESS;
31 bool current_gpu = current & GPU_PROCESS;
32
33 if (preferred_sample && current_gpu)
34 return false; // Can't run sample rate on GPU
35 if (preferred_gpu && current_cpu)
36 return false; // If preferred is GPU, but current is CPU, not compatible
37 // If preferred is CPU, but current is GPU, allow only for FRAME_RATE
38 if (preferred_cpu && current_gpu && !current_frame)
39 return false;
40
41 // Sequential/Parallel compatibility: allow if rates align
42 bool preferred_seq = preferred & SEQUENTIAL;
43 bool preferred_par = preferred & PARALLEL;
44 bool current_seq = current & SEQUENTIAL;
45 bool current_par = current & PARALLEL;
46
47 if ((preferred_seq && current_par) || (preferred_par && current_seq)) {
48 // Allow if rates align (already checked above)
49 if ((preferred_sample && current_sample) || (preferred_frame && current_frame)) {
50 return true;
51 }
52 // If preferred is SAMPLE_RATE and current is FRAME_RATE, already handled above
53 // Otherwise, not compatible
54 return false;
55 }
56
57 // If all checks pass, compatible
58 return true;
59}
60
62{
63 if ((token & SAMPLE_RATE) && (token & FRAME_RATE)) {
64 error<std::invalid_argument>(
67 std::source_location::current(),
68 "SAMPLE_RATE and FRAME_RATE are mutually exclusive.");
69 }
70
71 if ((token & CPU_PROCESS) && (token & GPU_PROCESS)) {
72 error<std::invalid_argument>(Journal::Component::Buffers,
74 std::source_location::current(),
75 "CPU_PROCESS and GPU_PROCESS are mutually exclusive.");
76 }
77
78 if ((token & SEQUENTIAL) && (token & PARALLEL)) {
79 error<std::invalid_argument>(Journal::Component::Buffers,
81 std::source_location::current(),
82 "SEQUENTIAL and PARALLEL are mutually exclusive.");
83 }
84}
85
86ProcessingToken get_optimal_token(const std::string& buffer_type, uint32_t system_capabilities)
87{
88 if (buffer_type == "audio") {
89 return (system_capabilities & 0x1) ? AUDIO_PARALLEL : AUDIO_BACKEND;
90 }
91
92 if (buffer_type == "video" || buffer_type == "texture") {
93 return GRAPHICS_BACKEND;
94 }
95 return AUDIO_BACKEND;
96}
97
99 const std::shared_ptr<Nodes::Node>& node,
100 uint64_t active_context_id,
101 int max_spins)
102{
103 int spin_count = 0;
104
105 while (node->is_in_snapshot_context(active_context_id) && spin_count < max_spins) {
106 if (spin_count < 10) {
107 for (int i = 0; i < (1 << spin_count); ++i) {
108 MF_PAUSE_INSTRUCTION();
109 }
110 } else {
111 std::this_thread::yield();
112 }
113 ++spin_count;
114 }
115
116 if (spin_count >= max_spins) {
118 "Timeout waiting for node snapshot to complete. "
119 "Possible deadlock or very long processing time.");
120 return false;
121 }
122
123 return true;
124}
125
126double extract_single_sample(const std::shared_ptr<Nodes::Node>& node)
127{
128 if (!node) {
130 "extract_single_sample: null node");
131 return 0.0;
132 }
133
134 static std::atomic<uint64_t> s_context_counter { 1 };
135 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
136
137 const auto state = node->m_state.load(std::memory_order_acquire);
138
139 if (state == Nodes::NodeState::INACTIVE && !node->is_buffer_processed()) {
140 double value = node->process_sample(0.F);
141 node->mark_buffer_processed();
142 return value;
143 }
144
145 bool claimed = node->try_claim_snapshot_context(my_context_id);
146
147 if (claimed) {
148 try {
149 node->save_state();
150 double value = node->process_sample(0.F);
151 node->restore_state();
152
153 if (node->is_buffer_processed()) {
154 node->request_buffer_reset();
155 }
156
157 node->release_snapshot_context(my_context_id);
158 return value;
159
160 } catch (const std::exception& e) {
161 node->release_snapshot_context(my_context_id);
163 "Error processing node: {}", e.what());
164 return 0.0;
165 }
166 } else {
167 uint64_t active_context = node->get_active_snapshot_context();
168
169 if (!wait_for_snapshot_completion(node, active_context)) {
170 return 0.0;
171 }
172
173 node->save_state();
174 double value = node->process_sample(0.F);
175 node->restore_state();
176
177 if (node->is_buffer_processed()) {
178 node->request_buffer_reset();
179 }
180
181 return value;
182 }
183}
184
185std::vector<double> extract_multiple_samples(
186 const std::shared_ptr<Nodes::Node>& node,
187 size_t num_samples)
188{
189 std::vector<double> output(num_samples);
190
191 if (!node) {
193 "extract_multiple_samples: null node");
194 return output;
195 }
196
197 static std::atomic<uint64_t> s_context_counter { 1 };
198 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
199
200 const auto state = node->m_state.load(std::memory_order_acquire);
201
202 // Fast path: inactive node
203 if (state == Nodes::NodeState::INACTIVE && !node->is_buffer_processed()) {
204 for (size_t i = 0; i < num_samples; i++) {
205 output[i] = node->process_sample(0.F);
206 }
207 node->mark_buffer_processed();
208 return output;
209 }
210
211 bool claimed = node->try_claim_snapshot_context(my_context_id);
212
213 if (claimed) {
214 try {
215 node->save_state();
216
217 for (size_t i = 0; i < num_samples; i++) {
218 output[i] = node->process_sample(0.F);
219 }
220
221 node->restore_state();
222
223 if (node->is_buffer_processed()) {
224 node->request_buffer_reset();
225 }
226
227 node->release_snapshot_context(my_context_id);
228
229 } catch (const std::exception& e) {
230 node->release_snapshot_context(my_context_id);
232 "Error processing node: {}", e.what());
233 output.clear();
234 }
235 } else {
236 uint64_t active_context = node->get_active_snapshot_context();
237
238 if (!wait_for_snapshot_completion(node, active_context)) {
239 output.clear();
240 return output;
241 }
242
243 node->save_state();
244 for (size_t i = 0; i < num_samples; i++) {
245 output[i] = node->process_sample(0.F);
246 }
247 node->restore_state();
248
249 if (node->is_buffer_processed()) {
250 node->request_buffer_reset();
251 }
252 }
253
254 return output;
255}
256
258 const std::shared_ptr<Nodes::Node>& node,
259 std::span<double> buffer,
260 double mix)
261{
262 if (!node) {
264 "apply_to_buffer: null node");
265 return;
266 }
267
268 static std::atomic<uint64_t> s_context_counter { 1 };
269 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
270
271 const auto state = node->m_state.load(std::memory_order_acquire);
272
273 if (state == Nodes::NodeState::INACTIVE && !node->is_buffer_processed()) {
274 for (double& sample : buffer) {
275 sample += node->process_sample(0.F) * mix;
276 }
277 node->mark_buffer_processed();
278 return;
279 }
280
281 bool claimed = node->try_claim_snapshot_context(my_context_id);
282
283 if (claimed) {
284 try {
285 node->save_state();
286
287 for (double& sample : buffer) {
288 sample += node->process_sample(0.F) * mix;
289 }
290
291 node->restore_state();
292
293 if (node->is_buffer_processed()) {
294 node->request_buffer_reset();
295 }
296
297 node->release_snapshot_context(my_context_id);
298
299 } catch (const std::exception& e) {
300 node->release_snapshot_context(my_context_id);
302 "Error processing node: {}", e.what());
303 }
304 } else {
305 uint64_t active_context = node->get_active_snapshot_context();
306
307 if (!wait_for_snapshot_completion(node, active_context)) {
308 return;
309 }
310
311 node->save_state();
312 for (double& sample : buffer) {
313 sample += node->process_sample(0.F) * mix;
314 }
315 node->restore_state();
316
317 if (node->is_buffer_processed()) {
318 node->request_buffer_reset();
319 }
320 }
321}
322
324 const std::shared_ptr<Nodes::Network::NodeNetwork>& network,
325 std::string_view name)
326{
327 auto* op = network->get_operator();
328 if (!op) {
330 "Network '{}' has no operator", name);
331 return {};
332 }
333
334 auto* graphics_op = dynamic_cast<Nodes::Network::GraphicsOperator*>(op);
335 if (!graphics_op) {
337 "Network '{}' operator '{}' is not a GraphicsOperator",
338 name, op->get_type_name());
339 return {};
340 }
341
342 auto vertex_data = graphics_op->get_vertex_data();
343 size_t vertex_count = graphics_op->get_vertex_count();
344
345 if (vertex_data.empty() || vertex_count == 0) {
347 "Network '{}' has no vertex data this frame", name);
348 return {};
349 }
350
351 return {
352 .vertex_data = vertex_data,
353 .vertex_count = vertex_count,
354 .layout = graphics_op->get_vertex_layout()
355 };
356}
357
358std::span<const double> extract_network_audio_data(
359 const std::shared_ptr<Nodes::Network::NodeNetwork>& network,
360 std::string_view name)
361{
362 auto buf = network->get_audio_buffer();
363 if (!buf || buf->empty()) {
365 "Network '{}' has no audio buffer this cycle", name);
366 return {};
367 }
368 return { buf->data(), buf->size() };
369}
370
371} // namespace MayaFlux::Buffers
#define MF_RT_WARN(comp, ctx,...)
#define MF_RT_ERROR(comp, ctx,...)
#define MF_RT_TRACE(comp, ctx,...)
Operator that produces GPU-renderable geometry.
std::vector< double > extract_multiple_samples(const std::shared_ptr< Nodes::Node > &node, size_t num_samples)
Extract multiple samples from a node into a vector.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
@ GPU_PROCESS
Executes processing operations on GPU hardware.
@ SAMPLE_RATE
Processes data at audio sample rate with buffer-sized chunks.
@ CPU_PROCESS
Executes processing operations on CPU threads.
@ AUDIO_BACKEND
Standard audio processing backend configuration.
@ PARALLEL
Processes operations in parallel when possible.
@ SEQUENTIAL
Processes operations sequentially, one after another.
@ GRAPHICS_BACKEND
Standard graphics processing backend configuration.
@ FRAME_RATE
Processes data at video frame rate.
@ AUDIO_PARALLEL
High-performance audio processing with GPU acceleration.
void validate_token(ProcessingToken token)
Validates that a processing token has a valid, non-conflicting configuration.
double extract_single_sample(const std::shared_ptr< Nodes::Node > &node)
Extract a single sample from a node with proper snapshot management.
ProcessingToken get_optimal_token(const std::string &buffer_type, uint32_t system_capabilities)
Gets the optimal processing token for a given buffer type and system configuration.
void update_buffer_with_node_data(const std::shared_ptr< Nodes::Node > &node, std::span< double > buffer, double mix)
Apply node output to an existing buffer with mixing.
NetworkGpuData extract_network_gpu_data(const std::shared_ptr< Nodes::Network::NodeNetwork > &network, std::string_view name)
Extract GPU geometry data from a NodeNetwork via its GraphicsOperator.
std::span< const double > extract_network_audio_data(const std::shared_ptr< Nodes::Network::NodeNetwork > &network, std::string_view name)
Extract audio buffer data from a NodeNetwork.
bool wait_for_snapshot_completion(const std::shared_ptr< Nodes::Node > &node, uint64_t active_context_id, int max_spins)
Wait for an active snapshot context to complete using exponential backoff.
@ BufferProcessing
Buffer processing (Buffers::BufferManager, processing chains)
@ Buffers
Buffers, Managers, processors and processing chains.
@ INACTIVE
Engine is not processing this node.
Definition NodeSpec.hpp:44
std::vector< double > mix(const std::vector< std::vector< double > > &streams)
Mix multiple data streams with equal weighting.
Definition Yantra.cpp:1021
Result of extracting GPU-ready data from a NodeNetwork operator.