Extract multiple samples from a node into a vector.
188{
189 std::vector<double> output(num_samples);
190
191 if (!node) {
192 MF_RT_ERROR(Journal::Component::Buffers, Journal::Context::BufferProcessing,
193 "extract_multiple_samples: null node");
194 return output;
195 }
196
197 static std::atomic<uint64_t> s_context_counter { 1 };
198 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
199
200 const auto state = node->m_state.load(std::memory_order_acquire);
201
202
203 if (state == Nodes::NodeState::INACTIVE && !node->is_buffer_processed()) {
204 for (size_t i = 0; i < num_samples; i++) {
205 output[i] = node->process_sample(0.F);
206 }
207 node->mark_buffer_processed();
208 return output;
209 }
210
211 bool claimed = node->try_claim_snapshot_context(my_context_id);
212
213 if (claimed) {
214 try {
215 node->save_state();
216
217 for (size_t i = 0; i < num_samples; i++) {
218 output[i] = node->process_sample(0.F);
219 }
220
221 node->restore_state();
222
223 if (node->is_buffer_processed()) {
224 node->request_buffer_reset();
225 }
226
227 node->release_snapshot_context(my_context_id);
228
229 } catch (const std::exception& e) {
230 node->release_snapshot_context(my_context_id);
231 MF_RT_ERROR(Journal::Component::Buffers, Journal::Context::BufferProcessing,
232 "Error processing node: {}", e.what());
233 output.clear();
234 }
235 } else {
236 uint64_t active_context = node->get_active_snapshot_context();
237
239 output.clear();
240 return output;
241 }
242
243 node->save_state();
244 for (size_t i = 0; i < num_samples; i++) {
245 output[i] = node->process_sample(0.F);
246 }
247 node->restore_state();
248
249 if (node->is_buffer_processed()) {
250 node->request_buffer_reset();
251 }
252 }
253
254 return output;
255}
#define MF_RT_ERROR(comp, ctx,...)
bool wait_for_snapshot_completion(const std::shared_ptr< Nodes::Node > &node, uint64_t active_context_id, int max_spins)
Wait for an active snapshot context to complete using exponential backoff.