25 , m_channels(channels)
26 , m_frame_rate(frame_rate)
31 if (width > 0 && height > 0)
80 uint32_t ring_capacity,
85 uint32_t refill_threshold,
107 auto& pixels =
m_data[0].emplace<std::vector<uint8_t>>();
108 pixels.resize(frame_bytes * ring_capacity, 0);
110 m_slot_frame = std::vector<std::atomic<uint64_t>>(ring_capacity);
112 sf.store(UINT64_MAX, std::memory_order_relaxed);
129 auto* pixels = std::get_if<std::vector<uint8_t>>(&
m_data[0]);
150 sf.store(UINT64_MAX, std::memory_order_relaxed);
153 std::atomic_thread_fence(std::memory_order_release);
185 const auto* pixels = std::get_if<std::vector<uint8_t>>(&
m_data[0]);
189 const size_t offset = frame_index * frame_bytes;
190 if (offset + frame_bytes > pixels->size())
193 return { pixels->data() + offset, frame_bytes };
196 uint32_t slot =
slot_for(frame_index);
198 if (
m_slot_frame[slot].load(std::memory_order_acquire) == frame_index) {
199 const auto* pixels = std::get_if<std::vector<uint8_t>>(&
m_data[0]);
202 return { pixels->data() + slot * frame_bytes, frame_bytes };
215 std::ranges::fill(output, 0.0);
220 if (coordinates.size() < 4 ||
m_data.empty())
223 const auto* pixels = std::get_if<std::vector<uint8_t>>(&
m_data[0]);
227 const uint64_t frame = coordinates[0];
228 const uint64_t y = coordinates[1];
229 const uint64_t x = coordinates[2];
230 const uint64_t c = coordinates[3];
240 if (idx >= pixels->size())
243 return static_cast<double>((*pixels)[idx]) / 255.0;
248 if (coordinates.size() < 4 ||
m_data.empty())
251 auto* pixels = std::get_if<std::vector<uint8_t>>(&
m_data[0]);
255 const uint64_t frame = coordinates[0];
256 const uint64_t y = coordinates[1];
257 const uint64_t x = coordinates[2];
258 const uint64_t c = coordinates[3];
268 if (idx >= pixels->size())
271 (*pixels)[idx] =
static_cast<uint8_t
>(std::clamp(value * 255.0, 0.0, 255.0));
295 const auto* pixels = std::get_if<std::vector<uint8_t>>(&
m_data[0]);
296 if (!pixels || pixels->empty())
299 const std::span<const uint8_t> src { pixels->data(), pixels->size() };
303 }
catch (
const std::exception& e) {
305 "VideoStreamContainer::get_region_data extraction failed — {}", e.what());
313 "VideoStreamContainer::set_region_data — write path not yet implemented");
364 if (!position.empty())
375 const uint64_t head =
m_cache_head.load(std::memory_order_acquire);
376 const uint64_t buffered = (head > frame) ? (head - frame) : 0;
378 if (buffered < m_refill_threshold && m_io_service->request_decode)
384 thread_local std::vector<uint64_t> pos(1);
445 std::ranges::fill(output, 0.0);
454 std::ranges::fill(output, 0.0);
466 std::ranges::for_each(
m_data, [](
auto& v) {
467 std::visit([](
auto& vec) { vec.clear(); }, v);
484 const auto* v = std::get_if<std::vector<uint8_t>>(&
m_data[0]);
485 return (v && !v->empty()) ? v->data() :
nullptr;
496 return std::visit([](
const auto& vec) {
return !vec.empty(); },
m_data[0]);
506 if (old != new_state)
518 std::function<
void(
const std::shared_ptr<SignalSourceContainer>&,
ProcessingState)> callback)
547 auto processor = std::make_shared<FrameAccessProcessor>();
565 old->on_detach(shared_from_this());
567 processor->on_attach(shared_from_this());
614 "VideoStreamContainer::channel_data — not meaningful for interleaved pixel data; returning full surface");
617 static DataVariant empty_variant = std::vector<uint8_t>();
#define MF_WARN(comp, ctx,...)
Type-erased accessor for NDData with semantic view construction.
std::vector< uint64_t > get_remaining_frames() const override
Get the number of remaining frames from the current position, per channel.
std::shared_ptr< DataProcessor > m_default_processor
void mark_dimension_consumed(uint32_t dimension_index, uint32_t reader_id) override
Mark a dimension as consumed for the current processing cycle.
uint32_t m_refill_threshold
Trigger refill when (m_cache_head - read_position) drops below this.
std::function< void(const std::shared_ptr< SignalSourceContainer > &, ProcessingState)> m_state_callback
void set_region_data(const Region ®ion, const std::vector< DataVariant > &data) override
Set data for a specific region.
uint64_t peek_sequential(std::span< double > output, uint64_t count, uint64_t offset) const override
Peek at data without advancing the read position.
const std::vector< uint64_t > & get_read_position() const override
Get the current read position.
std::atomic< uint32_t > m_consumed_readers
uint8_t * mutable_slot_ptr(uint64_t frame_index)
Mutable pointer into m_data[0] for the decode thread to write into.
bool is_at_end() const override
Check if read position has reached the end of the stream.
void unregister_dimension_reader(uint32_t dimension_index) override
Unregister a reader for a specific dimension.
std::vector< DataVariant > get_segments_data(const std::vector< RegionSegment > &segment) const override
Get data for multiple region segments efficiently.
std::vector< std::atomic< uint64_t > > m_slot_frame
bool is_ready() const override
Check if the stream is ready for reading.
DataAccess channel_data(size_t channel) override
Get channel data with semantic interpretation.
std::vector< uint64_t > linear_index_to_coordinates(uint64_t linear_index) const override
Convert linear index to coordinates based on current memory layout.
std::vector< DataDimension > get_dimensions() const override
Get the dimensions describing the structure of the data.
ProcessingState get_processing_state() const override
Get the current processing state of the container.
uint64_t read_sequential(std::span< double > output, uint64_t count) override
Read data sequentially from the current position.
uint64_t get_num_frames() const override
Get the number of frames in the primary (temporal) dimension.
void update_read_position_for_channel(size_t channel, uint64_t frame) override
Update the read position for a specific channel.
void get_frames(std::span< double > output, uint64_t start_frame, uint64_t num_frames) const override
Get multiple frames efficiently.
double get_value_at(const std::vector< uint64_t > &coordinates) const override
Get a single value at the specified coordinates.
void create_default_processor() override
Create and configure a default processor for this container.
std::shared_mutex m_data_mutex
uint32_t register_dimension_reader(uint32_t dimension_index) override
Register a reader for a specific dimension.
void set_value_at(const std::vector< uint64_t > &coordinates, double value) override
Set a single value at the specified coordinates.
Memory::LockFreeQueue< uint64_t, READY_QUEUE_CAPACITY > m_ready_queue
std::vector< DataAccess > all_channel_data() override
Get all channel data as accessors.
bool is_frame_available(uint64_t frame_index) const
Check if a frame is currently valid in the ring.
void unregister_state_change_callback() override
Unregister the state change callback, if any.
void add_region_group(const RegionGroup &group) override
Add a named group of regions to the container.
std::atomic< uint32_t > m_registered_readers
std::vector< DataVariant > get_region_group_data(const RegionGroup &group) const override
Get data for multiple regions efficiently.
void clear() override
Clear all data in the container.
std::vector< DataVariant > get_region_data(const Region ®ion) const override
Get data for a specific region.
uint64_t get_total_elements() const override
Get the total number of elements in the container.
bool has_data() const override
Check if the container currently holds any data.
VideoStreamContainer(uint32_t width=0, uint32_t height=0, uint32_t channels=4, double frame_rate=0.0)
Construct a VideoStreamContainer with specified parameters.
uint64_t get_temporal_rate() const override
Get the temporal rate (e.g., sample rate, frame rate) of the stream.
uint64_t time_to_position(double time) const override
Convert from time (seconds) to position units (e.g., frame/sample index).
const void * get_raw_data() const override
Get a raw pointer to the underlying data storage.
void load_region(const Region ®ion) override
Load a region into memory.
const RegionGroup & get_region_group(const std::string &name) const override
Get a region group by name.
std::shared_ptr< DataProcessor > get_default_processor() const override
Get the current default data processor.
void remove_region_group(const std::string &name) override
Remove a region group by name.
void reset_read_position() override
Reset read position to the beginning of the stream.
std::span< const uint8_t > get_frame_pixels(uint64_t frame_index) const
Get raw pixel data for a single frame as uint8_t span.
std::unordered_map< std::string, RegionGroup > m_region_groups
void advance_cache_head(uint64_t frame_index)
Advance the container's view of how many frames have been decoded.
void setup_ring(uint64_t total_frames, uint32_t ring_capacity, uint32_t width, uint32_t height, uint32_t channels, double frame_rate, uint32_t refill_threshold, uint64_t reader_id=0)
Allocate m_data[0] as a ring of ring_capacity frames.
void set_looping(bool enable) override
Enable or disable looping behavior for the stream.
double position_to_time(uint64_t position) const override
Convert from position units (e.g., frame/sample index) to time (seconds).
void notify_state_change(ProcessingState new_state)
uint64_t get_frame_size() const override
Get the number of elements that constitute one "frame".
void set_loop_region(const Region ®ion) override
Set the loop region using a Region.
void unload_region(const Region ®ion) override
Unload a region from memory.
std::atomic< uint64_t > m_cache_head
Highest frame index committed by the decode thread.
std::vector< DataVariant > m_data
uint32_t slot_for(uint64_t frame_index) const
void set_default_processor(const std::shared_ptr< DataProcessor > &processor) override
Set the default data processor for this container.
void mark_ready_for_processing(bool ready) override
Mark the container as ready or not ready for processing.
size_t get_frame_byte_size() const
Get the total byte size of one frame (width * height * channels).
void register_state_change_callback(std::function< void(const std::shared_ptr< SignalSourceContainer > &, ProcessingState)> callback) override
Register a callback to be invoked on processing state changes.
void process_default() override
Process the container's data using the default processor.
void invalidate_ring()
Invalidate all ring slots.
ContainerDataStructure m_structure
void advance_read_position(const std::vector< uint64_t > &frames) override
Advance the read position by a specified amount.
uint64_t coordinates_to_linear_index(const std::vector< uint64_t > &coordinates) const override
Convert coordinates to linear index based on current memory layout.
uint64_t m_total_source_frames
void set_read_position(const std::vector< uint64_t > &position) override
Set the current read position in the primary temporal dimension per channel.
bool is_ready_for_processing() const override
Check if the container is ready for processing.
void lock() override
Acquire a lock for thread-safe access.
std::atomic< uint64_t > m_read_position
bool has_active_readers() const override
Check if any dimensions currently have active readers.
bool is_region_loaded(const Region ®ion) const override
Check if a region is loaded in memory.
void commit_frame(uint64_t frame_index)
Publish a decoded frame.
std::unordered_map< std::string, RegionGroup > get_all_region_groups() const override
Get all region groups in the container.
void update_processing_state(ProcessingState new_state) override
Update the processing state of the container.
Region get_loop_region() const override
Get the current loop region.
bool all_dimensions_consumed() const override
Check if all active dimensions have been consumed in this cycle.
std::atomic< ProcessingState > m_processing_state
void set_memory_layout(MemoryLayout layout) override
Set the memory layout for this container.
Registry::Service::IOService * m_io_service
bool is_looping() const override
Check if looping is enabled for the stream.
std::span< const double > get_frame(uint64_t frame_index) const override
Get a single frame of data efficiently.
Interface * get_service()
Query for a backend service.
static BackendRegistry & instance()
Get the global registry instance.
@ ContainerProcessing
Container operations (Kakshya - file/stream/region processing)
@ Kakshya
Containers[Signalsource, Stream, File], Regions, DataProcessors.
ProcessingState
Represents the current processing lifecycle state of a container.
@ READY
Container has data loaded and is ready for processing.
@ IDLE
Container is inactive with no data or not ready for processing.
@ PROCESSING
Container is actively being processed.
@ PROCESSED
Container has completed processing and results are available.
uint64_t coordinates_to_linear(const std::vector< uint64_t > &coords, const std::vector< DataDimension > &dimensions)
Convert N-dimensional coordinates to a linear index for interleaved data.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
@ VIDEO_COLOR
4D video (time + 2D + color)
std::vector< uint64_t > linear_to_coordinates(uint64_t index, const std::vector< DataDimension > &dimensions)
Convert a linear index to N-dimensional coordinates for interleaved data.
MemoryLayout
Memory layout for multi-dimensional data.
@ ROW_MAJOR
C/C++ style (last dimension varies fastest)
std::vector< DataDimension > dimensions
static ContainerDataStructure image_interleaved()
Create structure for interleaved image data.
MemoryLayout memory_layout
static std::vector< DataDimension > create_dimensions(DataModality modality, const std::vector< uint64_t > &shape, MemoryLayout layout=MemoryLayout::ROW_MAJOR)
Create dimension descriptors for a data modality.
std::string name
Descriptive name of the group.
Organizes related signal regions into a categorized collection.
Represents a point or span in N-dimensional space.
std::function< void(uint64_t reader_id)> request_decode
Request the identified reader to decode the next batch of frames.
Backend IO streaming service interface.