MayaFlux 0.3.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
AggregateBindingsProcessor.cpp
Go to the documentation of this file.
2
6
7namespace MayaFlux::Buffers {
8
10 const std::string& aggregate_name,
11 const std::shared_ptr<Nodes::Node>& node,
12 const std::shared_ptr<VKBuffer>& target,
13 ProcessingMode mode)
14{
15 if (!node) {
17 "Attempted to add null node to aggregate '{}'", aggregate_name);
18 return;
19 }
20
21 if (!target) {
22 error<std::invalid_argument>(
25 std::source_location::current(),
26 "Cannot add node to aggregate '{}' with null target buffer", aggregate_name);
27 }
28
29 auto& aggregate = m_aggregates[aggregate_name];
30
31 if (aggregate.nodes.empty()) {
32 aggregate.target_buffer = target;
33 aggregate.processing_mode.store(mode, std::memory_order_release);
34 } else if (aggregate.target_buffer != target) {
36 "Aggregate '{}' already has a different target buffer. Ignoring new target.",
37 aggregate_name);
38 }
39
40 aggregate.nodes.push_back(node);
41 aggregate.staging_data.resize(aggregate.nodes.size());
42
43 size_t required_size = aggregate.nodes.size() * sizeof(float);
44 if (aggregate.target_buffer->get_size_bytes() < required_size) {
46 "Target buffer for aggregate '{}' may be too small: {} nodes require {} bytes, buffer has {} bytes",
47 aggregate_name, aggregate.nodes.size(), required_size,
48 aggregate.target_buffer->get_size_bytes());
49 }
50
52 "Added node to aggregate '{}' (total: {})", aggregate_name, aggregate.nodes.size());
53}
54
56 const std::string& aggregate_name,
57 const std::shared_ptr<Nodes::Node>& node)
58{
59 auto agg_it = m_aggregates.find(aggregate_name);
60 if (agg_it == m_aggregates.end()) {
62 "Attempted to remove node from non-existent aggregate '{}'", aggregate_name);
63 return;
64 }
65
66 auto& nodes = agg_it->second.nodes;
67 auto node_it = std::ranges::find(nodes, node);
68
69 if (node_it != nodes.end()) {
70 nodes.erase(node_it);
71 agg_it->second.staging_data.resize(nodes.size());
72
74 "Removed node from aggregate '{}' (remaining: {})",
75 aggregate_name, nodes.size());
76
77 if (nodes.empty()) {
78 m_aggregates.erase(agg_it);
80 "Removed empty aggregate '{}'", aggregate_name);
81 }
82 } else {
84 "Attempted to remove node not in aggregate '{}'", aggregate_name);
85 }
86}
87
88void AggregateBindingsProcessor::clear_aggregate(const std::string& aggregate_name)
89{
90 if (m_aggregates.erase(aggregate_name) == 0) {
92 "Attempted to clear non-existent aggregate '{}'", aggregate_name);
93 } else {
95 "Cleared aggregate '{}'", aggregate_name);
96 }
97}
98
100{
101 size_t count = m_aggregates.size();
102 m_aggregates.clear();
103
105 "Cleared all aggregates ({})", count);
106}
107
108size_t AggregateBindingsProcessor::get_node_count(const std::string& aggregate_name) const
109{
110 auto it = m_aggregates.find(aggregate_name);
111 return it != m_aggregates.end() ? it->second.nodes.size() : 0;
112}
113
115{
116 size_t total = 0;
117 for (const auto& [name, aggregate] : m_aggregates) {
118 total += aggregate.nodes.size();
119 }
120 return total;
121}
122
124{
125 std::vector<std::string> names;
126 names.reserve(m_aggregates.size());
127 for (const auto& [name, _] : m_aggregates) {
128 names.push_back(name);
129 }
130 return names;
131}
132
134{
135 return m_aggregates.size();
136}
137
138void AggregateBindingsProcessor::processing_function(const std::shared_ptr<Buffer>& buffer)
139{
140 if (m_aggregates.empty()) {
141 return;
142 }
143
144 auto vk_buffer = std::dynamic_pointer_cast<VKBuffer>(buffer);
145 if (!vk_buffer) {
147 "AggregateBindingsProcessor requires VKBuffer, got different buffer type");
148 return;
149 }
150
151 for (auto& [aggregate_name, aggregate] : m_aggregates) {
152 if (aggregate.nodes.empty()) {
153 continue;
154 }
155
156 auto& target = aggregate.target_buffer ? aggregate.target_buffer : vk_buffer;
157
158 ProcessingMode mode = aggregate.processing_mode.load(std::memory_order_acquire);
159
160 for (size_t i = 0; i < aggregate.nodes.size(); ++i) {
161 if (!aggregate.nodes[i]) {
163 "Aggregate '{}' node at index {} is null", aggregate_name, i);
164 aggregate.staging_data[i] = 0.0F;
165 continue;
166 }
167
168 double value = (mode == ProcessingMode::INTERNAL)
169 ? Buffers::extract_single_sample(aggregate.nodes[i])
170 : aggregate.nodes[i]->get_last_output();
171
172 aggregate.staging_data[i] = static_cast<float>(value);
173 }
174
176 aggregate.staging_data.data(),
177 aggregate.staging_data.size() * sizeof(float),
178 target);
179 }
180}
181
182} // namespace MayaFlux::Buffers
#define MF_RT_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
Eigen::Index count
size_t get_node_count(const std::string &aggregate_name) const
Get number of nodes in an aggregate.
std::vector< std::string > get_aggregate_names() const
Get all aggregate names.
@ INTERNAL
Processor calls extract_single_sample() or processes node context.
void add_node(const std::string &aggregate_name, const std::shared_ptr< Nodes::Node > &node, const std::shared_ptr< VKBuffer > &target, ProcessingMode mode=ProcessingMode::INTERNAL)
Add a node to a named aggregate.
std::unordered_map< std::string, AggregateBinding > m_aggregates
void clear_aggregate(const std::string &aggregate_name)
Clear all nodes from an aggregate.
void processing_function(const std::shared_ptr< Buffer > &buffer) override
BufferProcessor interface - uploads all aggregates.
size_t get_total_node_count() const
Get total number of nodes across all aggregates.
size_t get_aggregate_count() const
Get number of aggregates.
void remove_node(const std::string &aggregate_name, const std::shared_ptr< Nodes::Node > &node)
Remove a node from an aggregate.
double extract_single_sample(const std::shared_ptr< Nodes::Node > &node)
Extract a single sample from a node with proper snapshot management.
void upload_to_gpu(const void *data, size_t size, const std::shared_ptr< VKBuffer > &target, const std::shared_ptr< VKBuffer > &staging)
Upload raw data to GPU buffer (auto-detects host-visible vs device-local)
@ BufferProcessing
Buffer processing (Buffers::BufferManager, processing chains)
@ Buffers
Buffers, Managers, processors and processing chains.