MayaFlux 0.3.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
RootNode.cpp
Go to the documentation of this file.
1#include "RootNode.hpp"
2
4
5namespace MayaFlux::Nodes {
6
7RootNode::RootNode(ProcessingToken token, uint32_t channel)
8 : m_is_processing(false)
9 , m_pending_count(0)
10 , m_channel(channel)
11 , m_skip_state_management(false)
12 , m_token(token)
13{
14}
15
16void RootNode::register_node(const std::shared_ptr<Node>& node)
17{
18 if (!node)
19 return;
20
21 for (auto& pending_op : m_pending_ops) {
22 bool expected = false;
23 if (pending_op.active.compare_exchange_strong(
24 expected, true,
25 std::memory_order_acquire,
26 std::memory_order_relaxed)) {
27 pending_op.node = node;
28 pending_op.is_addition = true;
31 m_pending_count.fetch_add(1, std::memory_order_relaxed);
32 return;
33 }
34 }
35
36 while (m_is_processing.load(std::memory_order_acquire))
37 m_is_processing.wait(true, std::memory_order_acquire);
38
39 if (m_Nodes.end() == std::ranges::find(m_Nodes, node)) {
40 m_Nodes.push_back(node);
42 atomic_add_flag(node->m_state, NodeState::ACTIVE);
43 }
44}
45
46void RootNode::unregister_node(const std::shared_ptr<Node>& node)
47{
48 if (!node)
49 return;
50
52
53 for (auto& pending_op : m_pending_ops) {
54 bool expected = false;
55 if (pending_op.active.compare_exchange_strong(
56 expected, true,
57 std::memory_order_acquire,
58 std::memory_order_relaxed)) {
59 pending_op.node = node;
60 pending_op.is_addition = false;
61 m_pending_count.fetch_add(1, std::memory_order_relaxed);
62 return;
63 }
64 }
65
66 while (m_is_processing.load(std::memory_order_acquire))
67 m_is_processing.wait(true, std::memory_order_acquire);
68
69 auto it = m_Nodes.begin();
70 while (it != m_Nodes.end()) {
71 if ((*it).get() == node.get()) {
72 it = m_Nodes.erase(it);
73 break;
74 }
75 ++it;
76 }
77
78 node->reset_processed_state();
79
80 uint32_t flag = node->m_state.load();
81 flag &= ~static_cast<uint32_t>(NodeState::PENDING_REMOVAL);
82 flag &= ~static_cast<uint32_t>(NodeState::ACTIVE);
83 flag |= static_cast<uint32_t>(NodeState::INACTIVE);
84 atomic_set_flag_strong(node->m_state, static_cast<NodeState>(flag));
85}
86
88{
90 return true;
91
92 bool expected = false;
93 if (m_request_terminate.load(std::memory_order_acquire)) {
94 return false;
95 }
96
97 if (!m_is_processing.compare_exchange_strong(expected, true,
98 std::memory_order_acquire, std::memory_order_relaxed)) {
99 return false;
100 }
101
102 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
104 }
105
106 return true;
107}
108
110{
111 if (!preprocess())
112 return 0.;
113
114 auto sample = 0.;
115
116 for (auto& node : m_Nodes) {
117 if (!node)
118 continue;
119
120 uint32_t state = node->m_state.load();
121 double node_output = 0.0;
122
123 if (!(state & NodeState::PROCESSED)) {
124 auto generator = std::dynamic_pointer_cast<Nodes::Generator::Generator>(node);
125 if (generator && generator->should_mock_process()) {
126 generator->process_sample();
127 } else {
128 node_output = node->process_sample();
129 }
131 } else {
132 node_output = node->get_last_output();
133 }
134
135 if (node->needs_channel_routing()) {
136 node_output *= node->get_routing_state().amount[m_channel];
137 }
138
139 sample += node_output;
140 }
141
142 postprocess();
143
144 return sample;
145}
146
148{
149 if (!preprocess())
150 return;
151
152 for (auto& node : m_Nodes) {
153 uint32_t state = node->m_state.load();
154 if (!(state & NodeState::PROCESSED)) {
155 node->process_sample();
157 }
158 }
159
160 postprocess();
161}
162
164{
166 return;
167
168 for (auto& node : m_Nodes) {
169 node->request_reset_from_channel(m_channel);
170 }
171
172 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
174 }
175
176 m_is_processing.store(false, std::memory_order_release);
177 m_is_processing.notify_all();
178
179 if (m_request_terminate.load(std::memory_order_acquire)) {
181 }
182}
183
184std::vector<double> RootNode::process_batch(uint32_t num_samples)
185{
186 std::vector<double> output(num_samples);
187
188 for (unsigned int i = 0; i < num_samples; i++) {
189 output[i] = process_sample();
190 }
191 return output;
192}
193
194void RootNode::process_batch_frame(uint32_t num_frames)
195{
196 for (uint32_t i = 0; i < num_frames; i++) {
198 }
199}
200
202{
203 for (auto& pending_op : m_pending_ops) {
204 if (!pending_op.active.load(std::memory_order_acquire))
205 continue;
206
207 auto& op = pending_op;
208
209 if (op.is_addition) {
210 if (m_Nodes.end() == std::ranges::find(m_Nodes, op.node)) {
211 m_Nodes.push_back(op.node);
212 uint32_t state = op.node->m_state.load();
213 state &= ~static_cast<uint32_t>(NodeState::INACTIVE);
214 state |= static_cast<uint32_t>(NodeState::ACTIVE);
215 atomic_set_flag_strong(op.node->m_state, static_cast<NodeState>(state));
216 }
217 } else {
218 auto it = m_Nodes.begin();
219 while (it != m_Nodes.end()) {
220 if ((*it).get() == op.node.get()) {
221 it = m_Nodes.erase(it);
222 break;
223 }
224 ++it;
225 }
226 op.node->reset_processed_state();
227 uint32_t state = op.node->m_state.load();
228 state &= ~static_cast<uint32_t>(NodeState::PENDING_REMOVAL);
229 state &= ~static_cast<uint32_t>(NodeState::ACTIVE);
230 state |= static_cast<uint32_t>(NodeState::INACTIVE);
231 atomic_set_flag_strong(op.node->m_state, static_cast<NodeState>(state));
232 }
233
234 op.node.reset();
235 op.active.store(false, std::memory_order_release);
236 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
237 }
238}
239
241{
242 m_request_terminate.store(true, std::memory_order_release);
243
244 m_is_processing.store(false, std::memory_order_release);
245
246 for (auto& node : m_Nodes) {
247 unregister_node(node);
248 }
249
251}
252
253}
void process_pending_operations()
Processes any pending node registration/unregistration operations.
Definition RootNode.cpp:201
std::vector< std::shared_ptr< Node > > m_Nodes
Collection of nodes registered with this root node.
Definition RootNode.hpp:159
uint32_t m_channel
The processing channel index for this root node.
Definition RootNode.hpp:229
RootNode(ProcessingToken token=ProcessingToken::AUDIO_RATE, uint32_t channel=0)
Constructs a RootNode for a specific processing token and channel.
Definition RootNode.cpp:7
void unregister_node(const std::shared_ptr< Node > &node)
Removes a node from this root node.
Definition RootNode.cpp:46
void process_frame()
Processes a single frame from all registered nodes.
Definition RootNode.cpp:147
bool preprocess()
Checks if the root node can process pending operations.
Definition RootNode.cpp:87
void register_node(const std::shared_ptr< Node > &node)
Adds a node to this root node.
Definition RootNode.cpp:16
std::atomic< bool > m_is_processing
Flag indicating if the root node is currently processing nodes.
Definition RootNode.hpp:170
double process_sample()
Processes a single sample from all registered nodes.
Definition RootNode.cpp:109
bool m_skip_state_management
Flag indicating whether to skip preprocessing and post processing.
Definition RootNode.hpp:239
void process_batch_frame(uint32_t num_frames)
Processes multiple frames from all registered nodes.
Definition RootNode.cpp:194
struct MayaFlux::Nodes::RootNode::PendingOp m_pending_ops[2048]
std::atomic< uint32_t > m_pending_count
Counter tracking the number of pending operations.
Definition RootNode.hpp:209
std::atomic< bool > m_request_terminate
Flag to request termination of processing.
Definition RootNode.hpp:178
void postprocess()
Performs post-processing after all nodes have been processed.
Definition RootNode.cpp:163
std::vector< double > process_batch(uint32_t num_samples)
Processes all registered nodes and combines their outputs.
Definition RootNode.cpp:184
void terminate_all_nodes()
Terminates all nodes registered with this root node.
Definition RootNode.cpp:240
NodeState
Represents the processing state of a node in the audio graph.
Definition NodeSpec.hpp:43
@ PROCESSED
Node has been processed this cycle.
Definition NodeSpec.hpp:49
@ ACTIVE
Engine is processing this node.
Definition NodeSpec.hpp:45
@ INACTIVE
Engine is not processing this node.
Definition NodeSpec.hpp:44
@ PENDING_REMOVAL
Node is marked for removal.
Definition NodeSpec.hpp:46
ProcessingToken
Enumerates the different processing domains for nodes.
void atomic_add_flag(std::atomic< NodeState > &state, NodeState flag)
Atomically adds a flag to a node state.
Definition NodeUtils.cpp:94
void atomic_remove_flag(std::atomic< NodeState > &state, NodeState flag)
Atomically removes a flag from a node state.
void atomic_set_flag_strong(std::atomic< NodeState > &flag, const NodeState &desired)
Atomically sets a node state flag to a specific value.
Definition NodeUtils.cpp:88
Contains the node-based computational processing system components.
Definition Chronie.hpp:11