MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
RootNode.cpp
Go to the documentation of this file.
1#include "RootNode.hpp"
2
4
5namespace MayaFlux::Nodes {
6
8 : m_is_processing(false)
9 , m_pending_count(0)
10 , m_channel(channel)
11 , m_skip_state_management(false)
12 , m_token(token)
13{
14}
15
16void RootNode::register_node(const std::shared_ptr<Node>& node)
17{
18 if (m_is_processing.load(std::memory_order_acquire)) {
19 if (m_Nodes.end() != std::ranges::find(m_Nodes, node)) {
20 uint32_t state = node->m_state.load();
21 if (state & Utils::NodeState::INACTIVE) {
24 }
25 return;
26 }
27
28 for (auto& m_pending_op : m_pending_ops) {
29 bool expected = false;
30 if (m_pending_op.active.compare_exchange_strong(
31 expected, true,
32 std::memory_order_acquire,
33 std::memory_order_relaxed)) {
34 m_pending_op.node = node;
37 m_pending_count.fetch_add(1, std::memory_order_relaxed);
38 return;
39 }
40 }
41
42 while (m_is_processing.load(std::memory_order_acquire)) {
43 m_is_processing.wait(true, std::memory_order_acquire);
44 }
45 }
46
47 m_Nodes.push_back(node);
48 uint32_t state = node->m_state.load();
50}
51
52void RootNode::unregister_node(const std::shared_ptr<Node>& node)
53{
54 uint32_t state = node->m_state.load();
56
57 if (m_is_processing.load(std::memory_order_acquire)) {
58 for (auto& m_pending_op : m_pending_ops) {
59 bool expected = false;
60 if (m_pending_op.active.compare_exchange_strong(
61 expected, true,
62 std::memory_order_acquire,
63 std::memory_order_relaxed)) {
64
65 m_pending_op.node = node;
66 m_pending_count.fetch_add(1, std::memory_order_relaxed);
67 return;
68 }
69 }
70
71 while (m_is_processing.load(std::memory_order_acquire)) {
72 m_is_processing.wait(true, std::memory_order_acquire);
73 }
74 }
75
76 auto it = m_Nodes.begin();
77 while (it != m_Nodes.end()) {
78 if ((*it).get() == node.get()) {
79 it = m_Nodes.erase(it);
80 break;
81 }
82 ++it;
83 }
84 node->reset_processed_state();
85
86 uint32_t flag = node->m_state.load();
87 flag &= ~static_cast<uint32_t>(Utils::NodeState::PENDING_REMOVAL);
88 flag &= ~static_cast<uint32_t>(Utils::NodeState::ACTIVE);
89 flag |= static_cast<uint32_t>(Utils::NodeState::INACTIVE);
90 atomic_set_flag_strong(node->m_state, static_cast<Utils::NodeState>(flag));
91}
92
94{
96 return true;
97
98 bool expected = false;
99 if (!m_is_processing.compare_exchange_strong(expected, true,
100 std::memory_order_acquire, std::memory_order_relaxed)) {
101 return false;
102 }
103
104 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
106 }
107
108 return true;
109}
110
112{
113 if (!preprocess())
114 return 0.;
115
116 auto sample = 0.;
117
118 for (auto& node : m_Nodes) {
119
120 uint32_t state = node->m_state.load();
121 if (!(state & Utils::NodeState::PROCESSED)) {
122 auto generator = std::dynamic_pointer_cast<Nodes::Generator::Generator>(node);
123 if (generator && generator->should_mock_process()) {
124 generator->process_sample();
125 } else {
126 sample += node->process_sample();
127 }
129 } else {
130 sample += node->get_last_output();
131 }
132 }
133
134 postprocess();
135
136 return sample;
137}
138
140{
141 if (!preprocess())
142 return;
143
144 for (auto& node : m_Nodes) {
145 uint32_t state = node->m_state.load();
146 if (!(state & Utils::NodeState::PROCESSED)) {
147 node->process_sample();
149 }
150 }
151
152 postprocess();
153}
154
156{
158 return;
159
160 for (auto& node : m_Nodes) {
161 node->request_reset_from_channel(m_channel);
162 }
163
164 m_is_processing.store(false, std::memory_order_release);
165 m_is_processing.notify_all();
166}
167
168std::vector<double> RootNode::process_batch(uint32_t num_samples)
169{
170 std::vector<double> output(num_samples);
171
172 for (unsigned int i = 0; i < num_samples; i++) {
173 output[i] = process_sample();
174 }
175 return output;
176}
177
178void RootNode::process_batch_frame(uint32_t num_frames)
179{
180 for (uint32_t i = 0; i < num_frames; i++) {
182 }
183}
184
186{
187
188 for (auto& m_pending_op : m_pending_ops) {
189 if (m_pending_op.active.load(std::memory_order_acquire)) {
190 auto& op = m_pending_op;
191 uint32_t state = op.node->m_state.load();
192
193 if (!(state & Utils::NodeState::ACTIVE)) {
194 m_Nodes.push_back(op.node);
195
196 state &= ~static_cast<uint32_t>(Utils::NodeState::INACTIVE);
197 state |= static_cast<uint32_t>(Utils::NodeState::ACTIVE);
198 atomic_set_flag_strong(op.node->m_state, static_cast<Utils::NodeState>(state));
199 } else if (state & Utils::NodeState::PENDING_REMOVAL) {
200 auto it = m_Nodes.begin();
201 while (it != m_Nodes.end()) {
202 if ((*it).get() == op.node.get()) {
203 it = m_Nodes.erase(it);
204 break;
205 }
206 ++it;
207 }
208 op.node->reset_processed_state();
209
210 state &= ~static_cast<uint32_t>(Utils::NodeState::PENDING_REMOVAL);
211 state &= ~static_cast<uint32_t>(Utils::NodeState::ACTIVE);
212 state |= static_cast<uint32_t>(Utils::NodeState::INACTIVE);
213 atomic_set_flag_strong(op.node->m_state, static_cast<Utils::NodeState>(state));
214 }
215
216 op.node.reset();
217 op.active.store(false, std::memory_order_release);
218 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
219 }
220 }
221}
222}
static MayaFlux::Nodes::ProcessingToken token
Definition Timers.cpp:8
void process_pending_operations()
Processes any pending node registration/unregistration operations.
Definition RootNode.cpp:185
std::vector< std::shared_ptr< Node > > m_Nodes
Collection of nodes registered with this root node.
Definition RootNode.hpp:151
uint32_t m_channel
The processing channel index for this root node.
Definition RootNode.hpp:211
RootNode(ProcessingToken token=ProcessingToken::AUDIO_RATE, uint32_t channel=0)
Constructs a RootNode for a specific processing token and channel.
Definition RootNode.cpp:7
void unregister_node(const std::shared_ptr< Node > &node)
Removes a node from this root node.
Definition RootNode.cpp:52
void process_frame()
Processes a single frame from all registered nodes.
Definition RootNode.cpp:139
bool preprocess()
Checks if the root node can process pending operations.
Definition RootNode.cpp:93
void register_node(const std::shared_ptr< Node > &node)
Adds a node to this root node.
Definition RootNode.cpp:16
std::atomic< bool > m_is_processing
Flag indicating if the root node is currently processing nodes.
Definition RootNode.hpp:162
double process_sample()
Processes a single sample from all registered nodes.
Definition RootNode.cpp:111
bool m_skip_state_management
Flag indicating whether to skip preprocessing and post processing.
Definition RootNode.hpp:221
void process_batch_frame(uint32_t num_frames)
Processes multiple frames from all registered nodes.
Definition RootNode.cpp:178
struct MayaFlux::Nodes::RootNode::PendingOp m_pending_ops[2048]
std::atomic< uint32_t > m_pending_count
Counter tracking the number of pending operations.
Definition RootNode.hpp:191
void postprocess()
Performs post-processing after all nodes have been processed.
Definition RootNode.cpp:155
std::vector< double > process_batch(uint32_t num_samples)
Processes all registered nodes and combines their outputs.
Definition RootNode.cpp:168
ProcessingToken
Enumerates the different processing domains for nodes.
void atomic_add_flag(std::atomic< Utils::NodeState > &state, Utils::NodeState flag)
Atomically adds a flag to a node state.
Definition NodeUtils.cpp:96
void atomic_set_flag_strong(std::atomic< Utils::NodeState > &flag, const Utils::NodeState &desired)
Atomically sets a node state flag to a specific value.
Definition NodeUtils.cpp:90
void atomic_remove_flag(std::atomic< Utils::NodeState > &state, Utils::NodeState flag)
Atomically removes a flag from a node state.
Contains the node-based computational processing system components.
Definition Chronie.hpp:5
@ PENDING_REMOVAL
Node is marked for removal.
Definition Utils.hpp:31
@ INACTIVE
Engine is not processing this node.
Definition Utils.hpp:29
@ ACTIVE
Engine is processing this node.
Definition Utils.hpp:30
@ PROCESSED
Node has been processed this cycle.
Definition Utils.hpp:34