Apply node output to an existing buffer with mixing.
260{
261 if (!node) {
262 MF_RT_ERROR(Journal::Component::Buffers, Journal::Context::BufferProcessing,
263 "apply_to_buffer: null node");
264 return;
265 }
266
267 static std::atomic<uint64_t> s_context_counter { 1 };
268 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
269
270 const auto state = node->m_state.load(std::memory_order_acquire);
271
272 if (state == Utils::NodeState::INACTIVE && !node->is_buffer_processed()) {
273 for (double& sample : buffer) {
274 sample += node->process_sample(0.F) * mix;
275 }
276 node->mark_buffer_processed();
277 return;
278 }
279
280 bool claimed = node->try_claim_snapshot_context(my_context_id);
281
282 if (claimed) {
283 try {
284 node->save_state();
285
286 for (double& sample : buffer) {
287 sample += node->process_sample(0.F) *
mix;
288 }
289
290 node->restore_state();
291
292 if (node->is_buffer_processed()) {
293 node->request_buffer_reset();
294 }
295
296 node->release_snapshot_context(my_context_id);
297
298 } catch (const std::exception& e) {
299 node->release_snapshot_context(my_context_id);
300 MF_RT_ERROR(Journal::Component::Buffers, Journal::Context::BufferProcessing,
301 "Error processing node: {}", e.what());
302 }
303 } else {
304 uint64_t active_context = node->get_active_snapshot_context();
305
307 return;
308 }
309
310 node->save_state();
311 for (double& sample : buffer) {
312 sample += node->process_sample(0.F) *
mix;
313 }
314 node->restore_state();
315
316 if (node->is_buffer_processed()) {
317 node->request_buffer_reset();
318 }
319 }
320}
#define MF_RT_ERROR(comp, ctx,...)
bool wait_for_snapshot_completion(const std::shared_ptr< Nodes::Node > &node, uint64_t active_context_id, int max_spins)
Wait for an active snapshot context to complete using exponential backoff.
std::vector< double > mix(const std::vector< std::vector< double > > &streams)
Mix multiple data streams with equal weighting.