MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
PlotProcessor.cpp
Go to the documentation of this file.
1#include "PlotProcessor.hpp"
2
5
7
11
12namespace MayaFlux::Kakshya {
13
14namespace {
15 constexpr auto C = Journal::Component::Kakshya;
17}
18
19// =========================================================================
20// Binding
21// =========================================================================
22
23void PlotProcessor::bind_node(uint32_t series_index,
24 std::shared_ptr<Nodes::Node> node)
25{
26 if (!node) {
27 MF_ERROR(C, X, "PlotProcessor::bind_node: null node for series {}", series_index);
28 return;
29 }
30 auto& b = m_bindings[series_index];
31 b.source_type = SourceType::NODE;
32 b.node = std::move(node);
33 b.node->add_buffer_reference();
34 b.audio_buffer.reset();
35 b.network.reset();
36 b.callable = {};
37}
38
39void PlotProcessor::bind_audio_buffer(uint32_t series_index,
40 std::shared_ptr<Buffers::AudioBuffer> buffer)
41{
42 if (!buffer) {
43 MF_ERROR(C, X, "PlotProcessor::bind_audio_buffer: null buffer for series {}", series_index);
44 return;
45 }
46 auto& b = m_bindings[series_index];
47 b.source_type = SourceType::AUDIO_BUFFER;
48 b.audio_buffer = std::move(buffer);
49 b.node.reset();
50 b.network.reset();
51 b.callable = {};
52}
53
54void PlotProcessor::bind_network(uint32_t series_index,
55 std::shared_ptr<Nodes::Network::NodeNetwork> network)
56{
57 if (!network) {
58 MF_ERROR(C, X, "PlotProcessor::bind_network: null network for series {}", series_index);
59 return;
60 }
61
62 const auto mode = network->get_output_mode();
65 MF_ERROR(C, X,
66 "PlotProcessor::bind_network: network for series {} has no audio output mode", series_index);
67 return;
68 }
69
70 auto& b = m_bindings[series_index];
71 b.source_type = SourceType::NETWORK;
72 b.network = std::move(network);
73 b.node.reset();
74 b.audio_buffer.reset();
75 b.callable = {};
76}
77
78void PlotProcessor::bind_callable(uint32_t series_index,
79 std::function<void(std::vector<double>&)> fn)
80{
81 if (!fn) {
82 MF_ERROR(C, X, "PlotProcessor::bind_callable: null callable for series {}", series_index);
83 return;
84 }
85 auto& b = m_bindings[series_index];
86 b.source_type = SourceType::CALLABLE;
87 b.callable = std::move(fn);
88 b.node.reset();
89 b.audio_buffer.reset();
90 b.network.reset();
91}
92
93void PlotProcessor::set_raw(uint32_t series_index, std::vector<double> data)
94{
95 auto& b = m_bindings[series_index];
96 b.source_type = SourceType::RAW;
97 b.pending_raw = std::move(data);
98 b.raw_dirty.test_and_set(std::memory_order_release);
99}
100
101void PlotProcessor::set_series_semantics(uint32_t series_index,
103 DataModality modality)
104{
105 auto it = m_bindings.find(series_index);
106 if (it == m_bindings.end())
107 return;
108 it->second.role = role;
109 it->second.modality = modality;
110}
111
112void PlotProcessor::unbind(uint32_t series_index)
113{
114 auto it = m_bindings.find(series_index);
115 if (it == m_bindings.end())
116 return;
117 if (it->second.source_type == SourceType::NODE && it->second.node)
118 it->second.node->remove_buffer_reference();
119 m_bindings.erase(it);
120}
121
122bool PlotProcessor::has_binding(uint32_t series_index) const
123{
124 return m_bindings.contains(series_index);
125}
126
127// =========================================================================
128// DataProcessor
129// =========================================================================
130
131void PlotProcessor::on_attach(const std::shared_ptr<SignalSourceContainer>& container)
132{
133 if (!std::dynamic_pointer_cast<PlotContainer>(container))
134 MF_ERROR(C, X, "PlotProcessor requires a PlotContainer");
135}
136
137void PlotProcessor::on_detach(const std::shared_ptr<SignalSourceContainer>&)
138{
139 for (auto& [idx, b] : m_bindings) {
140 if (b.source_type == SourceType::NODE && b.node)
141 b.node->remove_buffer_reference();
142 }
143 m_bindings.clear();
144}
145
146void PlotProcessor::process(const std::shared_ptr<SignalSourceContainer>& container)
147{
148 auto plot = std::dynamic_pointer_cast<PlotContainer>(container);
149 if (!plot) {
150 MF_ERROR(C, X, "PlotProcessor::process: container is not a PlotContainer");
151 return;
152 }
153
154 m_processing.store(true, std::memory_order_release);
155
156 for (auto& [idx, b] : m_bindings) {
157 if (idx >= plot->series_count())
158 continue;
159
160 thread_local std::vector<double> staging;
161 staging.resize(plot->series_size(idx));
162
163 {
164 auto frame = plot->get_frame(idx);
165 if (frame.size() == staging.size()) {
166 std::ranges::copy(frame, staging.begin());
167 } else {
168 std::ranges::fill(staging, 0.0);
169 }
170 }
171
172 switch (b.source_type) {
173 case SourceType::NODE:
174 acquire_from_node(b, staging);
175 break;
178 break;
180 acquire_from_network(b, staging);
181 break;
183 acquire_from_callable(b, staging);
184 break;
185 case SourceType::RAW:
186 acquire_from_raw(b, staging);
187 break;
188 }
189
190 plot->write_series(idx, staging);
191 }
192
193 auto& src = plot->get_data();
194 auto& dst = plot->get_processed_data();
195 dst.resize(src.size());
196 for (size_t i = 0; i < src.size(); ++i)
197 dst[i] = src[i];
198
199 m_processing.store(false, std::memory_order_release);
200}
201
202// =========================================================================
203// Private acquisition
204// =========================================================================
205
206void PlotProcessor::acquire_from_node(SeriesBinding& b, std::vector<double>& series)
207{
208 if (!b.node)
209 return;
210
211 auto samples = Buffers::extract_multiple_samples(b.node, series.size());
212 const size_t n = std::min(series.size(), samples.size());
213 std::copy_n(samples.begin(), n, series.begin());
214 if (n < series.size())
215 std::fill(series.begin() + static_cast<ptrdiff_t>(n), series.end(), 0.0);
216}
217
219{
220 if (!b.audio_buffer)
221 return;
222
223 const auto& data = b.audio_buffer->get_data();
224 const size_t n = std::min(series.size(), data.size());
225 std::copy_n(data.begin(), n, series.begin());
226 if (n < series.size())
227 std::fill(series.begin() + static_cast<ptrdiff_t>(n), series.end(), 0.0);
228}
229
230void PlotProcessor::acquire_from_network(SeriesBinding& b, std::vector<double>& series)
231{
232 if (!b.network)
233 return;
234
235 auto buf = b.network->get_audio_buffer();
236 if (!buf)
237 return;
238
239 const size_t n = std::min(series.size(), buf->size());
240 std::copy_n(buf->begin(), n, series.begin());
241 if (n < series.size())
242 std::fill(series.begin() + static_cast<ptrdiff_t>(n), series.end(), 0.0);
243}
244
245void PlotProcessor::acquire_from_callable(SeriesBinding& b, std::vector<double>& series)
246{
247 if (b.callable)
248 b.callable(series);
249}
250
251void PlotProcessor::acquire_from_raw(SeriesBinding& b, std::vector<double>& series)
252{
253 if (!b.raw_dirty.test(std::memory_order_acquire))
254 return;
255 b.raw_dirty.clear(std::memory_order_release);
256 const size_t n = std::min(series.size(), b.pending_raw.size());
257 std::copy_n(b.pending_raw.begin(), n, series.begin());
258 if (n < series.size())
259 std::fill(series.begin() + static_cast<ptrdiff_t>(n), series.end(), 0.0);
260}
261
262} // namespace MayaFlux::Kakshya
#define MF_ERROR(comp, ctx,...)
Core::GlobalNetworkConfig network
Definition Config.cpp:37
size_t b
void on_detach(const std::shared_ptr< SignalSourceContainer > &container) override
Called when this processor is detached from a container.
void unbind(uint32_t series_index)
Remove a binding from a series slot.
bool has_binding(uint32_t series_index) const
void acquire_from_raw(SeriesBinding &b, std::vector< double > &series)
void bind_network(uint32_t series_index, std::shared_ptr< Nodes::Network::NodeNetwork > network)
Bind a series slot to a NodeNetwork with audio output.
void bind_audio_buffer(uint32_t series_index, std::shared_ptr< Buffers::AudioBuffer > buffer)
Bind a series slot to an AudioBuffer.
void bind_callable(uint32_t series_index, std::function< void(std::vector< double > &)> fn)
Bind a series slot to a callable.
void on_attach(const std::shared_ptr< SignalSourceContainer > &container) override
Called when this processor is attached to a container.
void process(const std::shared_ptr< SignalSourceContainer > &container) override
Acquire data from all bound sources and write into the container.
void bind_node(uint32_t series_index, std::shared_ptr< Nodes::Node > node)
Bind a series slot to a Node.
void set_raw(uint32_t series_index, std::vector< double > data)
Push raw sample data for a series.
std::unordered_map< uint32_t, SeriesBinding > m_bindings
void acquire_from_audio_buffer(SeriesBinding &b, std::vector< double > &series)
void acquire_from_callable(SeriesBinding &b, std::vector< double > &series)
void acquire_from_node(SeriesBinding &b, std::vector< double > &series)
void set_series_semantics(uint32_t series_index, DataDimension::Role role, DataModality modality)
Set the role and modality for a series binding.
void acquire_from_network(SeriesBinding &b, std::vector< double > &series)
std::vector< double > extract_multiple_samples(const std::shared_ptr< Nodes::Node > &node, size_t num_samples)
Extract multiple samples from a node into a vector.
@ ContainerProcessing
Container operations (Kakshya - file/stream/region processing)
@ Kakshya
Containers[Signalsource, Stream, File], Regions, DataProcessors.
DataModality
Data modality types for cross-modal analysis.
Definition NDData.hpp:81
@ AUDIO_COMPUTE
processed each cycle but not sent to output
@ AUDIO_SINK
Aggregated audio samples sent to output.
Role
Semantic role of the dimension.
Definition NDData.hpp:150