54 uint32_t channels = 4,
55 double frame_rate = 0.0);
63 [[nodiscard]] std::vector<DataDimension> get_dimensions()
const override;
64 [[nodiscard]] uint64_t get_total_elements()
const override;
68 [[nodiscard]] uint64_t get_frame_size()
const override;
69 [[nodiscard]] uint64_t get_num_frames()
const override;
71 std::vector<DataVariant> get_region_data(
const Region& region)
const override;
72 void set_region_data(
const Region& region,
const std::vector<DataVariant>& data)
override;
74 std::vector<DataVariant> get_region_group_data(
const RegionGroup& group)
const override;
75 std::vector<DataVariant> get_segments_data(
const std::vector<RegionSegment>& segment)
const override;
77 [[nodiscard]] std::span<const double> get_frame(uint64_t frame_index)
const override;
78 void get_frames(std::span<double> output, uint64_t start_frame, uint64_t num_frames)
const override;
80 [[nodiscard]]
double get_value_at(
const std::vector<uint64_t>& coordinates)
const override;
81 void set_value_at(
const std::vector<uint64_t>& coordinates,
double value)
override;
83 [[nodiscard]] uint64_t coordinates_to_linear_index(
const std::vector<uint64_t>& coordinates)
const override;
84 [[nodiscard]] std::vector<uint64_t> linear_index_to_coordinates(uint64_t linear_index)
const override;
86 void clear()
override;
87 void lock()
override { m_data_mutex.lock(); }
88 void unlock()
override { m_data_mutex.unlock(); }
89 bool try_lock()
override {
return m_data_mutex.try_lock(); }
91 [[nodiscard]]
const void* get_raw_data()
const override;
92 [[nodiscard]]
bool has_data()
const override;
119 void setup_ring(uint64_t total_frames,
120 uint32_t ring_capacity,
125 uint32_t refill_threshold,
126 uint64_t reader_id = 0);
133 [[nodiscard]] uint8_t* mutable_slot_ptr(uint64_t frame_index);
140 void commit_frame(uint64_t frame_index);
145 void invalidate_ring();
151 [[nodiscard]]
bool is_frame_available(uint64_t frame_index)
const;
156 [[nodiscard]]
bool is_ring_mode()
const {
return m_ring_capacity > 0; }
168 m_refill_threshold = threshold;
179 uint64_t prev = m_cache_head.load(std::memory_order_relaxed);
180 while (frame_index > prev
181 && !m_cache_head.compare_exchange_weak(prev, frame_index,
182 std::memory_order_release, std::memory_order_relaxed)) { }
191 return m_cache_head.load(std::memory_order_acquire);
200 std::unordered_map<std::string, RegionGroup> get_all_region_groups()
const override;
203 bool is_region_loaded(
const Region& region)
const override;
204 void load_region(
const Region& region)
override;
205 void unload_region(
const Region& region)
override;
211 void set_read_position(
const std::vector<uint64_t>& position)
override;
212 void update_read_position_for_channel(
size_t channel, uint64_t frame)
override;
213 [[nodiscard]]
const std::vector<uint64_t>& get_read_position()
const override;
214 void advance_read_position(
const std::vector<uint64_t>& frames)
override;
215 [[nodiscard]]
bool is_at_end()
const override;
216 void reset_read_position()
override;
218 [[nodiscard]] uint64_t get_temporal_rate()
const override;
222 void set_looping(
bool enable)
override;
223 [[nodiscard]]
bool is_looping()
const override {
return m_looping_enabled; }
224 void set_loop_region(
const Region& region)
override;
225 [[nodiscard]]
Region get_loop_region()
const override;
227 [[nodiscard]]
bool is_ready()
const override;
228 [[nodiscard]] std::vector<uint64_t> get_remaining_frames()
const override;
229 uint64_t read_sequential(std::span<double> output, uint64_t
count)
override;
230 uint64_t peek_sequential(std::span<double> output, uint64_t
count, uint64_t offset)
const override;
239 void register_state_change_callback(
240 std::function<
void(
const std::shared_ptr<SignalSourceContainer>&,
ProcessingState)> callback)
override;
241 void unregister_state_change_callback()
override;
243 [[nodiscard]]
bool is_ready_for_processing()
const override;
244 void mark_ready_for_processing(
bool ready)
override;
246 void create_default_processor()
override;
247 void process_default()
override;
248 void set_default_processor(
const std::shared_ptr<DataProcessor>& processor)
override;
249 [[nodiscard]] std::shared_ptr<DataProcessor> get_default_processor()
const override;
252 void set_processing_chain(
const std::shared_ptr<DataProcessingChain>& chain)
override { m_processing_chain = chain; }
258 uint32_t register_dimension_reader(uint32_t dimension_index)
override;
259 void unregister_dimension_reader(uint32_t dimension_index)
override;
260 [[nodiscard]]
bool has_active_readers()
const override;
261 void mark_dimension_consumed(uint32_t dimension_index, uint32_t reader_id)
override;
262 [[nodiscard]]
bool all_dimensions_consumed()
const override;
273 return m_processing_token_channel.compare_exchange_strong(expected, channel);
278 return m_processing_token_channel.load() == channel;
285 const std::vector<DataVariant>&
get_data()
override {
return m_data; }
287 DataAccess channel_data(
size_t channel)
override;
288 std::vector<DataAccess> all_channel_data()
override;
300 [[nodiscard]] uint32_t
get_width()
const {
return m_width; }
301 [[nodiscard]] uint32_t
get_height()
const {
return m_height; }
310 [[nodiscard]] std::span<const uint8_t> get_frame_pixels(uint64_t frame_index)
const;
315 [[nodiscard]]
size_t get_frame_byte_size()
const;
318 void setup_dimensions();
321 uint32_t m_width = 0;
322 uint32_t m_height = 0;
323 uint32_t m_channels = 4;
324 double m_frame_rate = 0.0;
325 uint64_t m_num_frames = 0;
335 std::atomic<ProcessingState> m_processing_state { ProcessingState::IDLE };
336 std::atomic<int> m_processing_token_channel { -1 };
344 std::atomic<uint64_t> m_read_position { 0 };
345 bool m_looping_enabled =
false;
348 std::atomic<uint32_t> m_registered_readers { 0 };
349 std::atomic<uint32_t> m_consumed_readers { 0 };
355 uint32_t m_ring_capacity { 0 };
356 uint64_t m_total_source_frames { 0 };
360 static constexpr uint32_t READY_QUEUE_CAPACITY = 256;
368 std::atomic<uint64_t> m_cache_head { 0 };
375 uint32_t m_refill_threshold { 0 };
378 uint64_t m_io_reader_id { 0 };
380 [[nodiscard]] uint32_t
slot_for(uint64_t frame_index)
const
382 return static_cast<uint32_t
>(frame_index % m_ring_capacity);