Extract a single sample from a node with proper snapshot management.
126{
127 if (!node) {
128 MF_RT_ERROR(Journal::Component::Buffers, Journal::Context::BufferProcessing,
129 "extract_single_sample: null node");
130 return 0.0;
131 }
132
133 static std::atomic<uint64_t> s_context_counter { 1 };
134 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
135
136 const auto state = node->m_state.load(std::memory_order_acquire);
137
138 if (state == Utils::NodeState::INACTIVE && !node->is_buffer_processed()) {
139 double value = node->process_sample(0.F);
140 node->mark_buffer_processed();
141 return value;
142 }
143
144 bool claimed = node->try_claim_snapshot_context(my_context_id);
145
146 if (claimed) {
147 try {
148 node->save_state();
149 double value = node->process_sample(0.F);
150 node->restore_state();
151
152 if (node->is_buffer_processed()) {
153 node->request_buffer_reset();
154 }
155
156 node->release_snapshot_context(my_context_id);
157 return value;
158
159 } catch (const std::exception& e) {
160 node->release_snapshot_context(my_context_id);
161 MF_RT_ERROR(Journal::Component::Buffers, Journal::Context::BufferProcessing,
162 "Error processing node: {}", e.what());
163 return 0.0;
164 }
165 } else {
166 uint64_t active_context = node->get_active_snapshot_context();
167
169 return 0.0;
170 }
171
172 node->save_state();
173 double value = node->process_sample(0.F);
174 node->restore_state();
175
176 if (node->is_buffer_processed()) {
177 node->request_buffer_reset();
178 }
179
180 return value;
181 }
182}
#define MF_RT_ERROR(comp, ctx,...)
bool wait_for_snapshot_completion(const std::shared_ptr< Nodes::Node > &node, uint64_t active_context_id, int max_spins)
Wait for an active snapshot context to complete using exponential backoff.