52 uint32_t channels = 4,
53 double frame_rate = 0.0);
61 [[nodiscard]] std::vector<DataDimension> get_dimensions()
const override;
62 [[nodiscard]] uint64_t get_total_elements()
const override;
66 [[nodiscard]] uint64_t get_frame_size()
const override;
67 [[nodiscard]] uint64_t get_num_frames()
const override;
69 std::vector<DataVariant> get_region_data(
const Region& region)
const override;
70 void set_region_data(
const Region& region,
const std::vector<DataVariant>& data)
override;
72 std::vector<DataVariant> get_region_group_data(
const RegionGroup& group)
const override;
73 std::vector<DataVariant> get_segments_data(
const std::vector<RegionSegment>& segment)
const override;
75 [[nodiscard]] std::span<const double> get_frame(uint64_t frame_index)
const override;
76 void get_frames(std::span<double> output, uint64_t start_frame, uint64_t num_frames)
const override;
78 [[nodiscard]]
double get_value_at(
const std::vector<uint64_t>& coordinates)
const override;
79 void set_value_at(
const std::vector<uint64_t>& coordinates,
double value)
override;
81 [[nodiscard]] uint64_t coordinates_to_linear_index(
const std::vector<uint64_t>& coordinates)
const override;
82 [[nodiscard]] std::vector<uint64_t> linear_index_to_coordinates(uint64_t linear_index)
const override;
84 void clear()
override;
85 void lock()
override { m_data_mutex.lock(); }
86 void unlock()
override { m_data_mutex.unlock(); }
87 bool try_lock()
override {
return m_data_mutex.try_lock(); }
89 [[nodiscard]]
const void* get_raw_data()
const override;
90 [[nodiscard]]
bool has_data()
const override;
117 void setup_ring(uint64_t total_frames,
118 uint32_t ring_capacity,
123 uint32_t refill_threshold,
124 uint64_t reader_id = 0);
131 [[nodiscard]] uint8_t* mutable_slot_ptr(uint64_t frame_index);
138 void commit_frame(uint64_t frame_index);
143 void invalidate_ring();
149 [[nodiscard]]
bool is_frame_available(uint64_t frame_index)
const;
154 [[nodiscard]]
bool is_ring_mode()
const {
return m_ring_capacity > 0; }
166 m_refill_threshold = threshold;
177 uint64_t prev = m_cache_head.load(std::memory_order_relaxed);
178 while (frame_index > prev
179 && !m_cache_head.compare_exchange_weak(prev, frame_index,
180 std::memory_order_release, std::memory_order_relaxed)) { }
189 return m_cache_head.load(std::memory_order_acquire);
198 std::unordered_map<std::string, RegionGroup> get_all_region_groups()
const override;
201 bool is_region_loaded(
const Region& region)
const override;
202 void load_region(
const Region& region)
override;
203 void unload_region(
const Region& region)
override;
209 void set_read_position(
const std::vector<uint64_t>& position)
override;
210 void update_read_position_for_channel(
size_t channel, uint64_t frame)
override;
211 [[nodiscard]]
const std::vector<uint64_t>& get_read_position()
const override;
212 void advance_read_position(
const std::vector<uint64_t>& frames)
override;
213 [[nodiscard]]
bool is_at_end()
const override;
214 void reset_read_position()
override;
216 [[nodiscard]] uint64_t get_temporal_rate()
const override;
220 void set_looping(
bool enable)
override;
221 [[nodiscard]]
bool is_looping()
const override {
return m_looping_enabled; }
222 void set_loop_region(
const Region& region)
override;
223 [[nodiscard]]
Region get_loop_region()
const override;
225 [[nodiscard]]
bool is_ready()
const override;
226 [[nodiscard]] std::vector<uint64_t> get_remaining_frames()
const override;
227 uint64_t read_sequential(std::span<double> output, uint64_t
count)
override;
228 uint64_t peek_sequential(std::span<double> output, uint64_t
count, uint64_t offset)
const override;
237 void register_state_change_callback(
238 std::function<
void(
const std::shared_ptr<SignalSourceContainer>&,
ProcessingState)> callback)
override;
239 void unregister_state_change_callback()
override;
241 [[nodiscard]]
bool is_ready_for_processing()
const override;
242 void mark_ready_for_processing(
bool ready)
override;
244 void create_default_processor()
override;
245 void process_default()
override;
246 void set_default_processor(
const std::shared_ptr<DataProcessor>& processor)
override;
247 [[nodiscard]] std::shared_ptr<DataProcessor> get_default_processor()
const override;
250 void set_processing_chain(
const std::shared_ptr<DataProcessingChain>& chain)
override { m_processing_chain = chain; }
256 uint32_t register_dimension_reader(uint32_t dimension_index)
override;
257 void unregister_dimension_reader(uint32_t dimension_index)
override;
258 [[nodiscard]]
bool has_active_readers()
const override;
259 void mark_dimension_consumed(uint32_t dimension_index, uint32_t reader_id)
override;
260 [[nodiscard]]
bool all_dimensions_consumed()
const override;
271 return m_processing_token_channel.compare_exchange_strong(expected, channel);
276 return m_processing_token_channel.load() == channel;
283 const std::vector<DataVariant>&
get_data()
override {
return m_data; }
285 DataAccess channel_data(
size_t channel)
override;
286 std::vector<DataAccess> all_channel_data()
override;
298 [[nodiscard]] uint32_t
get_width()
const {
return m_width; }
299 [[nodiscard]] uint32_t
get_height()
const {
return m_height; }
308 [[nodiscard]] std::span<const uint8_t> get_frame_pixels(uint64_t frame_index)
const;
313 [[nodiscard]]
size_t get_frame_byte_size()
const;
316 void setup_dimensions();
319 uint32_t m_width = 0;
320 uint32_t m_height = 0;
321 uint32_t m_channels = 4;
322 double m_frame_rate = 0.0;
323 uint64_t m_num_frames = 0;
333 std::atomic<ProcessingState> m_processing_state { ProcessingState::IDLE };
334 std::atomic<int> m_processing_token_channel { -1 };
342 std::atomic<uint64_t> m_read_position { 0 };
343 bool m_looping_enabled =
false;
346 std::atomic<uint32_t> m_registered_readers { 0 };
347 std::atomic<uint32_t> m_consumed_readers { 0 };
353 uint32_t m_ring_capacity { 0 };
354 uint64_t m_total_source_frames { 0 };
358 static constexpr uint32_t READY_QUEUE_CAPACITY = 256;
366 std::atomic<uint64_t> m_cache_head { 0 };
373 uint32_t m_refill_threshold { 0 };
376 uint64_t m_io_reader_id { 0 };
378 [[nodiscard]] uint32_t
slot_for(uint64_t frame_index)
const
380 return static_cast<uint32_t
>(frame_index % m_ring_capacity);