MayaFlux 0.2.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
RootNode.cpp
Go to the documentation of this file.
1#include "RootNode.hpp"
2
4
5namespace MayaFlux::Nodes {
6
8 : m_is_processing(false)
9 , m_pending_count(0)
10 , m_channel(channel)
11 , m_skip_state_management(false)
12 , m_token(token)
13{
14}
15
16void RootNode::register_node(const std::shared_ptr<Node>& node)
17{
18 if (m_is_processing.load(std::memory_order_acquire)) {
19 if (m_Nodes.end() != std::ranges::find(m_Nodes, node)) {
20 uint32_t state = node->m_state.load();
21 if (state & Utils::NodeState::INACTIVE) {
24 }
25 return;
26 }
27
28 for (auto& m_pending_op : m_pending_ops) {
29 bool expected = false;
30 if (m_pending_op.active.compare_exchange_strong(
31 expected, true,
32 std::memory_order_acquire,
33 std::memory_order_relaxed)) {
34 m_pending_op.node = node;
37 m_pending_count.fetch_add(1, std::memory_order_relaxed);
38 return;
39 }
40 }
41
42 while (m_is_processing.load(std::memory_order_acquire)) {
43 m_is_processing.wait(true, std::memory_order_acquire);
44 }
45 }
46
47 m_Nodes.push_back(node);
48 uint32_t state = node->m_state.load();
50}
51
52void RootNode::unregister_node(const std::shared_ptr<Node>& node)
53{
54 if (!node)
55 return;
56
57 uint32_t state = node->m_state.load();
59
60 if (m_is_processing.load(std::memory_order_acquire)) {
61 for (auto& m_pending_op : m_pending_ops) {
62 bool expected = false;
63 if (m_pending_op.active.compare_exchange_strong(
64 expected, true,
65 std::memory_order_acquire,
66 std::memory_order_relaxed)) {
67
68 m_pending_op.node = node;
69 m_pending_count.fetch_add(1, std::memory_order_relaxed);
70 return;
71 }
72 }
73
74 while (m_is_processing.load(std::memory_order_acquire)) {
75 m_is_processing.wait(true, std::memory_order_acquire);
76 }
77 }
78
79 auto it = m_Nodes.begin();
80 while (it != m_Nodes.end()) {
81 if ((*it).get() == node.get()) {
82 it = m_Nodes.erase(it);
83 break;
84 }
85 ++it;
86 }
87 node->reset_processed_state();
88
89 uint32_t flag = node->m_state.load();
90 flag &= ~static_cast<uint32_t>(Utils::NodeState::PENDING_REMOVAL);
91 flag &= ~static_cast<uint32_t>(Utils::NodeState::ACTIVE);
92 flag |= static_cast<uint32_t>(Utils::NodeState::INACTIVE);
93 atomic_set_flag_strong(node->m_state, static_cast<Utils::NodeState>(flag));
94}
95
97{
99 return true;
100
101 bool expected = false;
102 if (m_request_terminate.load(std::memory_order_acquire)) {
103 return false;
104 }
105
106 if (!m_is_processing.compare_exchange_strong(expected, true,
107 std::memory_order_acquire, std::memory_order_relaxed)) {
108 return false;
109 }
110
111 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
113 }
114
115 return true;
116}
117
119{
120 if (!preprocess())
121 return 0.;
122
123 auto sample = 0.;
124
125 for (auto& node : m_Nodes) {
126 if (!node)
127 continue;
128
129 uint32_t state = node->m_state.load();
130 if (!(state & Utils::NodeState::PROCESSED)) {
131 auto generator = std::dynamic_pointer_cast<Nodes::Generator::Generator>(node);
132 if (generator && generator->should_mock_process()) {
133 generator->process_sample();
134 } else {
135 sample += node->process_sample();
136 }
138 } else {
139 sample += node->get_last_output();
140 }
141 }
142
143 postprocess();
144
145 return sample;
146}
147
149{
150 if (!preprocess())
151 return;
152
153 for (auto& node : m_Nodes) {
154 uint32_t state = node->m_state.load();
155 if (!(state & Utils::NodeState::PROCESSED)) {
156 node->process_sample();
158 }
159 }
160
161 postprocess();
162}
163
165{
167 return;
168
169 for (auto& node : m_Nodes) {
170 node->request_reset_from_channel(m_channel);
171 }
172
173 m_is_processing.store(false, std::memory_order_release);
174 m_is_processing.notify_all();
175
176 if (m_request_terminate.load(std::memory_order_acquire)) {
178 }
179}
180
181std::vector<double> RootNode::process_batch(uint32_t num_samples)
182{
183 std::vector<double> output(num_samples);
184
185 for (unsigned int i = 0; i < num_samples; i++) {
186 output[i] = process_sample();
187 }
188 return output;
189}
190
191void RootNode::process_batch_frame(uint32_t num_frames)
192{
193 for (uint32_t i = 0; i < num_frames; i++) {
195 }
196}
197
199{
200
201 for (auto& m_pending_op : m_pending_ops) {
202 if (m_pending_op.active.load(std::memory_order_acquire)) {
203 auto& op = m_pending_op;
204 uint32_t state = op.node->m_state.load();
205
206 if (!(state & Utils::NodeState::ACTIVE)) {
207 m_Nodes.push_back(op.node);
208
209 state &= ~static_cast<uint32_t>(Utils::NodeState::INACTIVE);
210 state |= static_cast<uint32_t>(Utils::NodeState::ACTIVE);
211 atomic_set_flag_strong(op.node->m_state, static_cast<Utils::NodeState>(state));
212 } else if (state & Utils::NodeState::PENDING_REMOVAL) {
213 auto it = m_Nodes.begin();
214 while (it != m_Nodes.end()) {
215 if ((*it).get() == op.node.get()) {
216 it = m_Nodes.erase(it);
217 break;
218 }
219 ++it;
220 }
221 op.node->reset_processed_state();
222
223 state &= ~static_cast<uint32_t>(Utils::NodeState::PENDING_REMOVAL);
224 state &= ~static_cast<uint32_t>(Utils::NodeState::ACTIVE);
225 state |= static_cast<uint32_t>(Utils::NodeState::INACTIVE);
226 atomic_set_flag_strong(op.node->m_state, static_cast<Utils::NodeState>(state));
227 }
228
229 op.node.reset();
230 op.active.store(false, std::memory_order_release);
231 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
232 }
233 }
234}
235
237{
238 m_request_terminate.store(true, std::memory_order_release);
239
240 m_is_processing.store(false, std::memory_order_release);
241
242 for (auto& node : m_Nodes) {
243 unregister_node(node);
244 }
245
247}
248
249}
static MayaFlux::Nodes::ProcessingToken token
Definition Timers.cpp:8
void process_pending_operations()
Processes any pending node registration/unregistration operations.
Definition RootNode.cpp:198
std::vector< std::shared_ptr< Node > > m_Nodes
Collection of nodes registered with this root node.
Definition RootNode.hpp:159
uint32_t m_channel
The processing channel index for this root node.
Definition RootNode.hpp:227
RootNode(ProcessingToken token=ProcessingToken::AUDIO_RATE, uint32_t channel=0)
Constructs a RootNode for a specific processing token and channel.
Definition RootNode.cpp:7
void unregister_node(const std::shared_ptr< Node > &node)
Removes a node from this root node.
Definition RootNode.cpp:52
void process_frame()
Processes a single frame from all registered nodes.
Definition RootNode.cpp:148
bool preprocess()
Checks if the root node can process pending operations.
Definition RootNode.cpp:96
void register_node(const std::shared_ptr< Node > &node)
Adds a node to this root node.
Definition RootNode.cpp:16
std::atomic< bool > m_is_processing
Flag indicating if the root node is currently processing nodes.
Definition RootNode.hpp:170
double process_sample()
Processes a single sample from all registered nodes.
Definition RootNode.cpp:118
bool m_skip_state_management
Flag indicating whether to skip preprocessing and post processing.
Definition RootNode.hpp:237
void process_batch_frame(uint32_t num_frames)
Processes multiple frames from all registered nodes.
Definition RootNode.cpp:191
struct MayaFlux::Nodes::RootNode::PendingOp m_pending_ops[2048]
std::atomic< uint32_t > m_pending_count
Counter tracking the number of pending operations.
Definition RootNode.hpp:207
std::atomic< bool > m_request_terminate
Flag to request termination of processing.
Definition RootNode.hpp:178
void postprocess()
Performs post-processing after all nodes have been processed.
Definition RootNode.cpp:164
std::vector< double > process_batch(uint32_t num_samples)
Processes all registered nodes and combines their outputs.
Definition RootNode.cpp:181
void terminate_all_nodes()
Terminates all nodes registered with this root node.
Definition RootNode.cpp:236
ProcessingToken
Enumerates the different processing domains for nodes.
void atomic_add_flag(std::atomic< Utils::NodeState > &state, Utils::NodeState flag)
Atomically adds a flag to a node state.
Definition NodeUtils.cpp:96
void atomic_set_flag_strong(std::atomic< Utils::NodeState > &flag, const Utils::NodeState &desired)
Atomically sets a node state flag to a specific value.
Definition NodeUtils.cpp:90
void atomic_remove_flag(std::atomic< Utils::NodeState > &state, Utils::NodeState flag)
Atomically removes a flag from a node state.
Contains the node-based computational processing system components.
Definition Chronie.hpp:5
@ PENDING_REMOVAL
Node is marked for removal.
Definition Utils.hpp:31
@ INACTIVE
Engine is not processing this node.
Definition Utils.hpp:29
@ ACTIVE
Engine is processing this node.
Definition Utils.hpp:30
@ PROCESSED
Node has been processed this cycle.
Definition Utils.hpp:34