MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
ContainerUtils.cpp
Go to the documentation of this file.
1#include "ContainerUtils.hpp"
2#include "DataUtils.hpp"
4#include "RegionUtils.hpp"
5
7
8namespace MayaFlux::Kakshya {
9
10std::unordered_map<std::string, std::any> extract_processing_state_info(const std::shared_ptr<SignalSourceContainer>& container)
11{
12 if (!container) {
13 error<std::invalid_argument>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Container is null");
14 }
15
16 std::unordered_map<std::string, std::any> state_info;
17
18 state_info["processing_state"] = static_cast<int>(container->get_processing_state());
19 state_info["is_ready"] = container->is_ready_for_processing();
20
21 if (auto stream = std::dynamic_pointer_cast<StreamContainer>(container)) {
22 state_info["read_position"] = stream->get_read_position();
23 state_info["is_stream_container"] = true;
24 } else {
25 state_info["is_stream_container"] = false;
26 }
27
28 return state_info;
29}
30
31std::unordered_map<std::string, std::any> extract_processor_info(const std::shared_ptr<SignalSourceContainer>& container)
32{
33 if (!container) {
34 error<std::invalid_argument>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Container is null");
35 }
36
37 std::unordered_map<std::string, std::any> processor_info;
38
39 if (auto processor = container->get_default_processor()) {
40 processor_info["has_processor"] = true;
41 processor_info["processor_processing"] = processor->is_processing();
42 } else {
43 processor_info["has_processor"] = false;
44 }
45
46 if (auto chain = container->get_processing_chain()) {
47 processor_info["has_processing_chain"] = true;
48 // TODO: Could add more chain information here
49 } else {
50 processor_info["has_processing_chain"] = false;
51 }
52
53 return processor_info;
54}
55
56bool transition_state(ProcessingState& current_state, ProcessingState new_state, std::function<void()> on_transition)
57{
58 static const std::unordered_map<ProcessingState, std::unordered_set<ProcessingState>> valid_transitions = {
65 };
66 auto it = valid_transitions.find(current_state);
67 if (it != valid_transitions.end() && (it->second.count(new_state) != 0U)) {
68 current_state = new_state;
69 if (on_transition)
70 on_transition();
71 return true;
72 }
73 return false;
74}
75
76std::unordered_map<std::string, std::any> analyze_access_pattern(const Region& region,
77 const std::shared_ptr<SignalSourceContainer>& container)
78{
79 std::unordered_map<std::string, std::any> analysis;
80
81 if (!container) {
82 error<std::invalid_argument>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Container is null");
83 }
84
85 const auto dimensions = container->get_dimensions();
86 const auto memory_layout = container->get_memory_layout();
87
88 analysis["is_contiguous"] = is_region_access_contiguous(region, container);
89 analysis["memory_layout"] = static_cast<int>(memory_layout);
90
91 uint64_t region_size = 1;
92 for (size_t i = 0; i < region.start_coordinates.size() && i < region.end_coordinates.size(); ++i) {
93 region_size *= (region.end_coordinates[i] - region.start_coordinates[i] + 1);
94 }
95 analysis["region_size"] = region_size;
96
97 if (!dimensions.empty()) {
98 auto strides = calculate_strides(dimensions);
99 analysis["access_stride"] = strides[0]; // Primary dimension stride
100 }
101
102 return analysis;
103}
104
105DataVariant extract_channel_data(const std::shared_ptr<SignalSourceContainer>& container,
106 uint32_t channel_index)
107{
108 if (!container) {
109 error<std::invalid_argument>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Container is null");
110 }
111
112 const auto structure = container->get_structure();
113 auto channel_count = structure.get_channel_count();
114
115 if (channel_index >= channel_count) {
116 error<std::out_of_range>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Channel index out of range: {} (available channels: {})", channel_index, channel_count);
117 }
118
119 auto data = container->get_data();
120
121 if (structure.organization == OrganizationStrategy::PLANAR) {
122 if (channel_index >= data.size()) {
123 error<std::out_of_range>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Channel index out of range for planar data: {} (available channels: {})", channel_index, data.size());
124 }
125 return data[channel_index];
126 }
127
128 std::vector<double> temp_storage;
129 auto interleaved_span = extract_from_variant<double>(data[0], temp_storage);
130
131 if (interleaved_span.empty()) {
132 error<std::runtime_error>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Failed to extract interleaved data");
133 }
134
135 if (interleaved_span.size() % channel_count != 0) {
136 error<std::runtime_error>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Interleaved data size ({}) is not a multiple of channel count ({})", interleaved_span.size(), channel_count);
137 }
138
139#ifdef MAYAFLUX_PLATFORM_MACOS
140 std::vector<double> channel_data;
141 for (size_t i = channel_index; i < interleaved_span.size(); i += channel_count) {
142 channel_data.push_back(interleaved_span[i]);
143 }
144#else
145 auto channel_view = interleaved_span
146 | std::views::drop(channel_index)
147 | std::views::stride(channel_count);
148
149 std::vector<double> channel_data;
150 std::ranges::copy(channel_view, std::back_inserter(channel_data));
151#endif // MAYAFLUX_PLATFORM_MACOS
152
153 return DataVariant { std::move(channel_data) };
154}
155
157 const std::vector<DataVariant>& pd,
158 OrganizationStrategy organization,
159 uint64_t num_channels,
160 uint32_t ch,
161 std::span<double> output)
162{
163 if (pd.empty() || output.empty()) {
164 std::ranges::fill(output, 0.0);
165 return;
166 }
167
168 if (organization == OrganizationStrategy::INTERLEAVED) {
169 thread_local std::vector<double> tmp;
170 auto data_span = extract_from_variant<double>(pd[0], tmp);
171
172 const auto samples_to_copy = std::min<size_t>(
173 output.size(),
174 data_span.size() / std::max<uint64_t>(num_channels, 1));
175
176 for (auto s : std::views::iota(0UZ, samples_to_copy)) {
177 const auto idx = s * num_channels + ch;
178 output[s] = idx < data_span.size() ? data_span[idx] : 0.0;
179 }
180
181 if (samples_to_copy < output.size())
182 std::ranges::fill(output.subspan(samples_to_copy), 0.0);
183 } else {
184 if (ch >= pd.size()) {
185 std::ranges::fill(output, 0.0);
186 return;
187 }
188
189 thread_local std::vector<double> tmp;
190 auto ch_span = extract_from_variant<double>(pd[ch], tmp);
191
192 const auto samples_to_copy = std::min(output.size(), ch_span.size());
193 std::ranges::copy_n(ch_span.begin(), samples_to_copy, output.begin());
194
195 if (samples_to_copy < output.size())
196 std::ranges::fill(output.subspan(samples_to_copy), 0.0);
197 }
198}
199
200std::vector<double> extract_region_channel(
201 const Region& region,
202 const std::shared_ptr<SignalSourceContainer>& container,
203 uint32_t channel)
204{
205 if (!container)
206 return {};
207
208 auto variants = container->get_region_data(region);
209
210 if (channel >= variants.size())
211 return {};
212
213 std::vector<double> storage;
214 Kakshya::extract_from_variant<double>(variants[channel], storage);
215 return storage;
216}
217
218std::pair<std::shared_ptr<SignalSourceContainer>, std::vector<DataDimension>>
219validate_container_for_analysis(const std::shared_ptr<SignalSourceContainer>& container)
220{
221 if (!container || !container->has_data()) {
222 error<std::invalid_argument>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Container is null or has no data");
223 }
224
225 auto dimensions = container->get_dimensions();
226 if (dimensions.empty()) {
227 error<std::runtime_error>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Container has no dimensions");
228 }
229
230 return std::make_pair(container, std::move(dimensions));
231}
232
233std::vector<std::span<double>> extract_numeric_data(const std::shared_ptr<SignalSourceContainer>& container)
234{
235 if (!container || !container->has_data()) {
236 error<std::invalid_argument>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Container is null or has no data");
237 }
238
239 auto container_data = container->get_data();
240
241 return container_data
242 | std::views::transform([](DataVariant& variant) -> std::span<double> {
243 return convert_variant_to_double(variant);
244 })
245 | std::ranges::to<std::vector>();
246}
247
248void validate_numeric_data_for_analysis(const std::vector<double>& data,
249 const std::string& operation_name,
250 size_t min_size)
251{
252 if (data.empty()) {
253 error<std::invalid_argument>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "Cannot perform {} on empty data", operation_name);
254 }
255
256 if (data.size() < min_size) {
257 error<std::invalid_argument>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "{} requires at least {} data points, got {}", operation_name, min_size, data.size());
258 }
259
260 if (auto invalid_it = std::ranges::find_if_not(data, [](double val) {
261 return std::isfinite(val);
262 });
263 invalid_it != data.end()) {
264
265 const auto index = std::distance(data.begin(), invalid_it);
266 error<std::invalid_argument>(Journal::Component::Kakshya, Journal::Context::Runtime, std::source_location::current(), "{} data contains NaN or infinite values at index {}", operation_name, index);
267 }
268}
269
270}
Eigen::MatrixXd storage
uint32_t channel
@ Runtime
General runtime operations (default fallback)
@ Kakshya
Containers[Signalsource, Stream, File], Regions, DataProcessors.
ProcessingState
Represents the current processing lifecycle state of a container.
@ READY
Container has data loaded and is ready for processing.
@ NEEDS_REMOVAL
Container is marked for removal from the system.
@ IDLE
Container is inactive with no data or not ready for processing.
@ ERROR
Container is in an error state and cannot proceed.
@ PROCESSING
Container is actively being processed.
@ PROCESSED
Container has completed processing and results are available.
std::unordered_map< std::string, std::any > analyze_access_pattern(const Region &region, const std::shared_ptr< SignalSourceContainer > &container)
Determine optimal memory access pattern for region.
bool is_region_access_contiguous(const Region &region, const std::shared_ptr< SignalSourceContainer > &container)
Check if region access will be contiguous in memory.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
Definition NDData.hpp:76
std::span< double > convert_variant_to_double(DataVariant &data, ComplexConversionStrategy strategy=ComplexConversionStrategy::MAGNITUDE)
Convert variant to double span.
std::vector< uint64_t > calculate_strides(const std::vector< DataDimension > &dimensions)
Calculate memory strides for each dimension (row-major order).
DataVariant extract_channel_data(const std::shared_ptr< SignalSourceContainer > &container, uint32_t channel_index)
Extract data from a specific channel.
OrganizationStrategy
Data organization strategy for multi-channel/multi-frame data.
Definition NDData.hpp:49
@ PLANAR
Separate DataVariant per logical unit (LLL...RRR for stereo)
@ INTERLEAVED
Single DataVariant with interleaved data (LRLRLR for stereo)
std::unordered_map< std::string, std::any > extract_processing_state_info(const std::shared_ptr< SignalSourceContainer > &container)
Extract processing state information from container.
std::vector< std::span< double > > extract_numeric_data(const std::shared_ptr< SignalSourceContainer > &container)
Extracts numeric data from container with fallback handling.
void validate_numeric_data_for_analysis(const std::vector< double > &data, const std::string &operation_name, size_t min_size)
Validates numeric data for analysis operations.
void extract_processed_data(const std::vector< DataVariant > &pd, OrganizationStrategy organization, uint64_t num_channels, uint32_t ch, std::span< double > output)
Extract one channel's samples from a processed dynamic data block.
std::pair< std::shared_ptr< SignalSourceContainer >, std::vector< DataDimension > > validate_container_for_analysis(const std::shared_ptr< SignalSourceContainer > &container)
Validates container for analysis operations with comprehensive checks.
std::vector< double > extract_region_channel(const Region &region, const std::shared_ptr< SignalSourceContainer > &container, uint32_t channel)
Extract samples for a single channel within a Region from a container.
std::unordered_map< std::string, std::any > extract_processor_info(const std::shared_ptr< SignalSourceContainer > &container)
Extract processor information from container.
bool transition_state(ProcessingState &current_state, ProcessingState new_state, std::function< void()> on_transition)
Perform a state transition for a ProcessingState, with optional callback.
std::vector< uint64_t > end_coordinates
Ending frame index (inclusive)
Definition Region.hpp:72
std::vector< uint64_t > start_coordinates
Starting frame index (inclusive)
Definition Region.hpp:69
Represents a point or span in N-dimensional space.
Definition Region.hpp:67