10 const std::string& aggregate_name,
11 const std::shared_ptr<Nodes::Node>& node,
12 const std::shared_ptr<VKBuffer>& target,
17 "Attempted to add null node to aggregate '{}'", aggregate_name);
22 error<std::invalid_argument>(
25 std::source_location::current(),
26 "Cannot add node to aggregate '{}' with null target buffer", aggregate_name);
31 if (aggregate.nodes.empty()) {
32 aggregate.target_buffer = target;
33 aggregate.processing_mode.store(mode, std::memory_order_release);
34 }
else if (aggregate.target_buffer != target) {
36 "Aggregate '{}' already has a different target buffer. Ignoring new target.",
40 aggregate.nodes.push_back(node);
41 aggregate.staging_data.resize(aggregate.nodes.size());
43 size_t required_size = aggregate.nodes.size() *
sizeof(float);
44 if (aggregate.target_buffer->get_size_bytes() < required_size) {
46 "Target buffer for aggregate '{}' may be too small: {} nodes require {} bytes, buffer has {} bytes",
47 aggregate_name, aggregate.nodes.size(), required_size,
48 aggregate.target_buffer->get_size_bytes());
52 "Added node to aggregate '{}' (total: {})", aggregate_name, aggregate.nodes.size());
56 const std::string& aggregate_name,
57 const std::shared_ptr<Nodes::Node>& node)
62 "Attempted to remove node from non-existent aggregate '{}'", aggregate_name);
66 auto& nodes = agg_it->second.nodes;
67 auto node_it = std::ranges::find(nodes, node);
69 if (node_it != nodes.end()) {
71 agg_it->second.staging_data.resize(nodes.size());
74 "Removed node from aggregate '{}' (remaining: {})",
75 aggregate_name, nodes.size());
80 "Removed empty aggregate '{}'", aggregate_name);
84 "Attempted to remove node not in aggregate '{}'", aggregate_name);
92 "Attempted to clear non-existent aggregate '{}'", aggregate_name);
95 "Cleared aggregate '{}'", aggregate_name);
105 "Cleared all aggregates ({})",
count);
111 return it !=
m_aggregates.end() ? it->second.nodes.size() : 0;
118 total += aggregate.nodes.size();
125 std::vector<std::string> names;
128 names.push_back(name);
144 auto vk_buffer = std::dynamic_pointer_cast<VKBuffer>(buffer);
147 "AggregateBindingsProcessor requires VKBuffer, got different buffer type");
151 for (
auto& [aggregate_name, aggregate] :
m_aggregates) {
152 if (aggregate.nodes.empty()) {
156 auto& target = aggregate.target_buffer ? aggregate.target_buffer : vk_buffer;
158 ProcessingMode mode = aggregate.processing_mode.load(std::memory_order_acquire);
160 for (
size_t i = 0; i < aggregate.nodes.size(); ++i) {
161 if (!aggregate.nodes[i]) {
163 "Aggregate '{}' node at index {} is null", aggregate_name, i);
164 aggregate.staging_data[i] = 0.0F;
170 : aggregate.nodes[i]->get_last_output();
172 aggregate.staging_data[i] =
static_cast<float>(value);
176 aggregate.staging_data.data(),
177 aggregate.staging_data.size() *
sizeof(
float),
#define MF_RT_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
size_t get_node_count(const std::string &aggregate_name) const
Get number of nodes in an aggregate.
std::vector< std::string > get_aggregate_names() const
Get all aggregate names.
@ INTERNAL
Processor calls extract_single_sample() or processes node context.
void clear_all_aggregates()
Clear all aggregates.
void add_node(const std::string &aggregate_name, const std::shared_ptr< Nodes::Node > &node, const std::shared_ptr< VKBuffer > &target, ProcessingMode mode=ProcessingMode::INTERNAL)
Add a node to a named aggregate.
std::unordered_map< std::string, AggregateBinding > m_aggregates
void clear_aggregate(const std::string &aggregate_name)
Clear all nodes from an aggregate.
void processing_function(const std::shared_ptr< Buffer > &buffer) override
BufferProcessor interface - uploads all aggregates.
size_t get_total_node_count() const
Get total number of nodes across all aggregates.
size_t get_aggregate_count() const
Get number of aggregates.
void remove_node(const std::string &aggregate_name, const std::shared_ptr< Nodes::Node > &node)
Remove a node from an aggregate.
double extract_single_sample(const std::shared_ptr< Nodes::Node > &node)
Extract a single sample from a node with proper snapshot management.
void upload_to_gpu(const void *data, size_t size, const std::shared_ptr< VKBuffer > &target, const std::shared_ptr< VKBuffer > &staging)
Upload raw data to GPU buffer (auto-detects host-visible vs device-local)
@ BufferProcessing
Buffer processing (Buffers::BufferManager, processing chains)
@ Buffers
Buffers, Managers, processors and processing chains.