MayaFlux 0.2.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
ContiguousAccessProcessor.cpp
Go to the documentation of this file.
2
4
7
9
10namespace MayaFlux::Kakshya {
11
12void ContiguousAccessProcessor::on_attach(const std::shared_ptr<SignalSourceContainer>& container)
13{
14 if (!container) {
15 return;
16 }
17
18 m_source_container_weak = container;
19
20 try {
21 store_metadata(container);
22 validate();
23
24 m_prepared = true;
25 container->mark_ready_for_processing(true);
26
28 "ContiguousAccessProcessor attached: {} layout, {} total elements, {} channels",
29 (m_structure.organization == OrganizationStrategy::INTERLEAVED ? "interleaved" : "planar"),
32
33 } catch (const std::exception& e) {
34 m_prepared = false;
36 std::source_location::current(),
37 "Failed to attach ContiguousAccessProcessor: {}",
38 e.what());
39 }
40}
41
42void ContiguousAccessProcessor::store_metadata(const std::shared_ptr<SignalSourceContainer>& container)
43{
44 m_structure = container->get_structure();
46
47 if (m_current_position.empty()) {
48 uint64_t num_channels = m_structure.get_channel_count();
49 m_current_position.assign(num_channels, 0);
50 }
51
52 if (m_output_shape.empty()) {
53 uint64_t num_frames = m_structure.get_samples_count_per_channel();
54 uint64_t num_channels = m_structure.get_channel_count();
55
57 std::min<uint64_t>(1024UL, num_frames),
58 num_channels
59 };
60 }
61
62 if (auto stream = std::dynamic_pointer_cast<StreamContainer>(container)) {
63 m_looping_enabled = stream->is_looping();
64 m_loop_region = stream->get_loop_region();
65
66 auto stream_positions = stream->get_read_position();
67 if (!stream_positions.empty()) {
68 m_current_position = stream_positions;
69 }
70 }
71}
72
74{
75 if (m_total_elements == 0) {
77 "ContiguousAccessProcessor validation: Container has no data elements");
78 }
79
80 if (m_output_shape.size() != 2) {
82 std::source_location::current(),
83 "Audio output shape must be [frames, channels]");
84 }
85
86 uint64_t frames_requested = m_output_shape[0];
87 uint64_t channels_requested = m_output_shape[1];
88 uint64_t available_channels = m_structure.get_channel_count();
89
90 if (frames_requested == 0 || channels_requested == 0) {
92 std::source_location::current(),
93 "Frame and channel counts cannot be zero");
94 }
95
96 if (frames_requested > m_structure.get_samples_count_per_channel()) {
98 std::source_location::current(),
99 "Requested {} frames exceeds available {} samples per channel",
100 frames_requested,
102 }
103
104 if (channels_requested > available_channels) {
106 std::source_location::current(),
107 "Requested {} channels exceeds available {} channels",
108 channels_requested,
109 available_channels);
110 }
111
112 if (m_current_position.size() != available_channels) {
114 "Current position size {} doesn't match available channel count {}, resetting positions",
115 m_current_position.size(),
116 available_channels);
117 m_current_position.resize(available_channels, 0);
118 }
119
121 "ContiguousAccessProcessor validated: memory layout {}, processing {}×{} blocks, current positions for {} channels",
122 (m_structure.organization == OrganizationStrategy::INTERLEAVED ? "interleaved" : "planar"),
123 frames_requested,
124 channels_requested, m_current_position.size());
125}
126
127void ContiguousAccessProcessor::on_detach(const std::shared_ptr<SignalSourceContainer>& /*container*/)
128{
130 m_current_position.clear();
131 m_prepared = false;
133}
134
135void ContiguousAccessProcessor::process(const std::shared_ptr<SignalSourceContainer>& container)
136{
137 if (!m_prepared) {
139 "ContiguousAccessProcessor not prepared for processing");
140 return;
141 }
142
143 auto source_container = m_source_container_weak.lock();
144 if (!source_container || source_container.get() != container.get()) {
146 "ContiguousAccessProcessor: Source container mismatch or expired");
147 return;
148 }
149
150 m_is_processing = true;
151 m_last_process_time = std::chrono::steady_clock::now();
152
153 try {
154 uint64_t min_frame = *std::ranges::min_element(m_current_position);
155 std::vector<uint64_t> region_coords = { min_frame, 0 };
156
157 Region output_region = calculate_output_region(region_coords, m_output_shape);
158
159 auto region_data = container->get_region_data(output_region);
160 auto& processed_data_vector = container->get_processed_data();
161
163 processed_data_vector.resize(1);
164 if (!region_data.empty()) {
165 safe_copy_data_variant(region_data[0], processed_data_vector[0]);
166 }
167 } else {
168 uint64_t channels_to_process = std::min(m_output_shape[1], static_cast<uint64_t>(region_data.size()));
169 processed_data_vector.resize(channels_to_process);
170
171 for (size_t ch = 0; ch < channels_to_process; ++ch) {
172 safe_copy_data_variant(region_data[ch], processed_data_vector[ch]);
173 }
174 }
175
176 if (m_auto_advance) {
177 uint64_t frames_to_advance = m_output_shape[0];
178
181 frames_to_advance,
185
186 if (auto stream = std::dynamic_pointer_cast<StreamContainer>(container)) {
187 stream->set_read_position(m_current_position);
188 }
189 }
190
191 container->update_processing_state(ProcessingState::PROCESSED);
192 } catch (const std::exception& e) {
194 "Error during ContiguousAccessProcessor processing: {}",
195 e.what());
196 container->update_processing_state(ProcessingState::ERROR);
197 }
198
199 m_is_processing = false;
200}
201
202void ContiguousAccessProcessor::set_output_size(const std::vector<uint64_t>& shape)
203{
204 m_output_shape = shape;
205 if (auto container = m_source_container_weak.lock()) {
206 store_metadata(container);
207 validate();
208 }
209}
210
211} // namespace MayaFlux::Kakshya
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_RT_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
void set_output_size(const std::vector< uint64_t > &shape)
Set the output buffer size (shape) for each processing call.
void on_detach(const std::shared_ptr< SignalSourceContainer > &container) override
Detach the processor from its container.
void validate()
Validate the container's structure and output configuration.
std::chrono::steady_clock::time_point m_last_process_time
void on_attach(const std::shared_ptr< SignalSourceContainer > &container) override
Attach the processor to a signal source container.
void store_metadata(const std::shared_ptr< SignalSourceContainer > &container)
Store dimension and layout metadata from the container.
void process(const std::shared_ptr< SignalSourceContainer > &container) override
Process the current region or block of data.
std::weak_ptr< SignalSourceContainer > m_source_container_weak
@ ContainerProcessing
Container operations (Kakshya - file/stream/region processing)
@ Kakshya
Containers[Signalsource, Stream, File], Regions, DataProcessors.
@ ERROR
Container is in an error state and cannot proceed.
@ PROCESSED
Container has completed processing and results are available.
std::vector< uint64_t > advance_position(const std::vector< uint64_t > &current_positions, uint64_t frames_to_advance, const ContainerDataStructure &structure, bool looping_enabled, const Region &loop_region)
Advance current positions by a number of frames, with optional looping.
@ INTERLEAVED
Single DataVariant with interleaved data (LRLRLR for stereo)
void safe_copy_data_variant(const DataVariant &input, DataVariant &output)
Safely copy data from a DataVariant to another DataVariant, handling type conversion.
Definition DataUtils.cpp:34
Region calculate_output_region(const std::vector< uint64_t > &current_pos, const std::vector< uint64_t > &output_shape)
Calculate output region bounds from current position and shape.
static uint64_t get_channel_count(const std::vector< DataDimension > &dimensions)
Extract channel count from dimensions.
static uint64_t get_samples_count_per_channel(const std::vector< DataDimension > &dimensions)
Get samples per channel (time dimension only).
static uint64_t get_total_elements(const std::vector< DataDimension > &dimensions)
Get total elements across all dimensions.
Represents a point or span in N-dimensional space.
Definition Region.hpp:67