Extract a single sample from a node with proper snapshot management.
127{
128 if (!node) {
129 MF_RT_ERROR(Journal::Component::Buffers, Journal::Context::BufferProcessing,
130 "extract_single_sample: null node");
131 return 0.0;
132 }
133
134 static std::atomic<uint64_t> s_context_counter { 1 };
135 uint64_t my_context_id = s_context_counter.fetch_add(1, std::memory_order_relaxed);
136
137 const auto state = node->m_state.load(std::memory_order_acquire);
138
139 if (state == Nodes::NodeState::INACTIVE && !node->is_buffer_processed()) {
140 double value = node->process_sample(0.F);
141 node->mark_buffer_processed();
142 return value;
143 }
144
145 bool claimed = node->try_claim_snapshot_context(my_context_id);
146
147 if (claimed) {
148 try {
149 node->save_state();
150 double value = node->process_sample(0.F);
151 node->restore_state();
152
153 if (node->is_buffer_processed()) {
154 node->request_buffer_reset();
155 }
156
157 node->release_snapshot_context(my_context_id);
158 return value;
159
160 } catch (const std::exception& e) {
161 node->release_snapshot_context(my_context_id);
162 MF_RT_ERROR(Journal::Component::Buffers, Journal::Context::BufferProcessing,
163 "Error processing node: {}", e.what());
164 return 0.0;
165 }
166 } else {
167 uint64_t active_context = node->get_active_snapshot_context();
168
170 return 0.0;
171 }
172
173 node->save_state();
174 double value = node->process_sample(0.F);
175 node->restore_state();
176
177 if (node->is_buffer_processed()) {
178 node->request_buffer_reset();
179 }
180
181 return value;
182 }
183}
#define MF_RT_ERROR(comp, ctx,...)
bool wait_for_snapshot_completion(const std::shared_ptr< Nodes::Node > &node, uint64_t active_context_id, int max_spins)
Wait for an active snapshot context to complete using exponential backoff.