MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
VideoStreamContainer.hpp
Go to the documentation of this file.
1#pragma once
2
4
6
8struct IOService;
9}
10
11namespace MayaFlux::Kakshya {
12
13/**
14 * @class VideoStreamContainer
15 * @brief Concrete base implementation for streaming video containers.
16 *
17 * VideoStreamContainer provides a complete, concrete implementation of all
18 * StreamContainer functionality for decoded video frame data. It serves as:
19 * 1. A standalone streaming container for real-time video processing
20 * 2. A base class for specialized containers like VideoFileContainer
21 *
22 * Data is stored as uint8_t pixels in RGBA interleaved layout (matching
23 * Vulkan VK_FORMAT_R8G8B8A8_UNORM and the TextureBuffer pipeline).
24 * Each frame is width * height * channels bytes. All frames are stored
25 * contiguously in a single DataVariant.
26 *
27 * Dimensions follow VIDEO_COLOR convention:
28 * dims[0] → TIME (frame count)
29 * dims[1] → SPATIAL_Y (height)
30 * dims[2] → SPATIAL_X (width)
31 * dims[3] → CHANNEL (colour channels, typically 4 for RGBA)
32 *
33 * Reader model follows WindowContainer's pattern: a simple atomic reader
34 * count rather than per-dimension/per-channel tracking. Video frames are
35 * atomic spatial units — channel-level access is a processor concern,
36 * not a container concern.
37 *
38 * Uses virtual inheritance to support diamond inheritance when used as a
39 * base for FileContainer-derived classes.
40 */
41class MAYAFLUX_API VideoStreamContainer : public virtual StreamContainer {
42public:
43 /**
44 * @brief Construct a VideoStreamContainer with specified parameters.
45 * @param width Frame width in pixels.
46 * @param height Frame height in pixels.
47 * @param channels Colour channels per pixel (default 4 for RGBA).
48 * @param frame_rate Temporal rate in frames per second.
49 */
50 VideoStreamContainer(uint32_t width = 0,
51 uint32_t height = 0,
52 uint32_t channels = 4,
53 double frame_rate = 0.0);
54
55 ~VideoStreamContainer() override = default;
56
57 // =========================================================================
58 // NDDimensionalContainer
59 // =========================================================================
60
61 [[nodiscard]] std::vector<DataDimension> get_dimensions() const override;
62 [[nodiscard]] uint64_t get_total_elements() const override;
63 [[nodiscard]] MemoryLayout get_memory_layout() const override { return m_structure.memory_layout; }
64 void set_memory_layout(MemoryLayout layout) override;
65
66 [[nodiscard]] uint64_t get_frame_size() const override;
67 [[nodiscard]] uint64_t get_num_frames() const override;
68
69 std::vector<DataVariant> get_region_data(const Region& region) const override;
70 void set_region_data(const Region& region, const std::vector<DataVariant>& data) override;
71
72 std::vector<DataVariant> get_region_group_data(const RegionGroup& group) const override;
73 std::vector<DataVariant> get_segments_data(const std::vector<RegionSegment>& segment) const override;
74
75 [[nodiscard]] std::span<const double> get_frame(uint64_t frame_index) const override;
76 void get_frames(std::span<double> output, uint64_t start_frame, uint64_t num_frames) const override;
77
78 [[nodiscard]] double get_value_at(const std::vector<uint64_t>& coordinates) const override;
79 void set_value_at(const std::vector<uint64_t>& coordinates, double value) override;
80
81 [[nodiscard]] uint64_t coordinates_to_linear_index(const std::vector<uint64_t>& coordinates) const override;
82 [[nodiscard]] std::vector<uint64_t> linear_index_to_coordinates(uint64_t linear_index) const override;
83
84 void clear() override;
85 void lock() override { m_data_mutex.lock(); }
86 void unlock() override { m_data_mutex.unlock(); }
87 bool try_lock() override { return m_data_mutex.try_lock(); }
88
89 [[nodiscard]] const void* get_raw_data() const override;
90 [[nodiscard]] bool has_data() const override;
91
92 ContainerDataStructure& get_structure() override { return m_structure; }
93 const ContainerDataStructure& get_structure() const override { return m_structure; }
94 void set_structure(ContainerDataStructure structure) override { m_structure = structure; }
95
96 // =========================================================================
97 // Ring buffer streaming API
98 // =========================================================================
99
100 /**
101 * @brief Allocate m_data[0] as a ring of ring_capacity frames.
102 *
103 * Switches the container from flat mode to ring mode. m_data[0] is
104 * resized to ring_capacity x frame_byte_size. m_num_frames is set to
105 * total_frames so processors see the full temporal extent. Pixel data
106 * is indexed by frame_index % ring_capacity.
107 *
108 * @param total_frames Total frames in the source (file, stream, etc).
109 * @param ring_capacity Number of frame slots (must be power of 2).
110 * @param width Frame width in pixels.
111 * @param height Frame height in pixels.
112 * @param channels Colour channels per pixel.
113 * @param frame_rate Frame rate in fps.
114 * @param refill_threshold Frames of look-ahead below which refill callback fires.
115 * @param reader_id The current class ID registered at the stream/file-read source
116 */
117 void setup_ring(uint64_t total_frames,
118 uint32_t ring_capacity,
119 uint32_t width,
120 uint32_t height,
121 uint32_t channels,
122 double frame_rate,
123 uint32_t refill_threshold,
124 uint64_t reader_id = 0);
125
126 /**
127 * @brief Mutable pointer into m_data[0] for the decode thread to write into.
128 * @param frame_index Absolute frame index; mapped to slot via modulo.
129 * @return Pointer into the pixel vector, or nullptr if not in ring mode.
130 */
131 [[nodiscard]] uint8_t* mutable_slot_ptr(uint64_t frame_index);
132
133 /**
134 * @brief Publish a decoded frame. Sets validity, pushes to ready queue,
135 * notifies any thread blocked in get_frame_pixels().
136 * @param frame_index Absolute frame index just written.
137 */
138 void commit_frame(uint64_t frame_index);
139
140 /**
141 * @brief Invalidate all ring slots. Called before seek.
142 */
143 void invalidate_ring();
144
145 /**
146 * @brief Check if a frame is currently valid in the ring.
147 * @param frame_index Absolute frame index.
148 */
149 [[nodiscard]] bool is_frame_available(uint64_t frame_index) const;
150
151 /**
152 * @brief True if the container is operating in ring mode.
153 */
154 [[nodiscard]] bool is_ring_mode() const { return m_ring_capacity > 0; }
155
156 [[nodiscard]] uint32_t get_ring_capacity() const { return m_ring_capacity; }
157 [[nodiscard]] uint64_t get_total_source_frames() const { return m_total_source_frames; }
158
159 /**
160 * @brief Set the number of frames below which the refill callback fires.
161 * Called by the reader before or immediately after setup_ring().
162 * @param threshold Frames of look-ahead below which notification fires.
163 */
164 void set_refill_threshold(uint32_t threshold)
165 {
166 m_refill_threshold = threshold;
167 }
168
169 /**
170 * @brief Advance the container's view of how many frames have been decoded.
171 * Called by the decode thread (via VideoFileReader) after commit_frame().
172 * Monotonically increasing; never decremented (seek resets via setup_ring).
173 * @param frame_index The highest frame index just committed.
174 */
175 void advance_cache_head(uint64_t frame_index)
176 {
177 uint64_t prev = m_cache_head.load(std::memory_order_relaxed);
178 while (frame_index > prev
179 && !m_cache_head.compare_exchange_weak(prev, frame_index,
180 std::memory_order_release, std::memory_order_relaxed)) { }
181 }
182
183 /**
184 * @brief Total frame count known at construction / setup_ring() time.
185 * Non-zero even before any frames are decoded.
186 */
187 [[nodiscard]] uint64_t get_cache_head() const
188 {
189 return m_cache_head.load(std::memory_order_acquire);
190 }
191
192 // =========================================================================
193 // RegionGroup management
194 // =========================================================================
195
196 void add_region_group(const RegionGroup& group) override;
197 const RegionGroup& get_region_group(const std::string& name) const override;
198 std::unordered_map<std::string, RegionGroup> get_all_region_groups() const override;
199 void remove_region_group(const std::string& name) override;
200
201 bool is_region_loaded(const Region& region) const override;
202 void load_region(const Region& region) override;
203 void unload_region(const Region& region) override;
204
205 // =========================================================================
206 // Read position and looping
207 // =========================================================================
208
209 void set_read_position(const std::vector<uint64_t>& position) override;
210 void update_read_position_for_channel(size_t channel, uint64_t frame) override;
211 [[nodiscard]] const std::vector<uint64_t>& get_read_position() const override;
212 void advance_read_position(const std::vector<uint64_t>& frames) override;
213 [[nodiscard]] bool is_at_end() const override;
214 void reset_read_position() override;
215
216 [[nodiscard]] uint64_t get_temporal_rate() const override;
217 [[nodiscard]] uint64_t time_to_position(double time) const override;
218 [[nodiscard]] double position_to_time(uint64_t position) const override;
219
220 void set_looping(bool enable) override;
221 [[nodiscard]] bool is_looping() const override { return m_looping_enabled; }
222 void set_loop_region(const Region& region) override;
223 [[nodiscard]] Region get_loop_region() const override;
224
225 [[nodiscard]] bool is_ready() const override;
226 [[nodiscard]] std::vector<uint64_t> get_remaining_frames() const override;
227 uint64_t read_sequential(std::span<double> output, uint64_t count) override;
228 uint64_t peek_sequential(std::span<double> output, uint64_t count, uint64_t offset) const override;
229
230 // =========================================================================
231 // Processing state
232 // =========================================================================
233
234 [[nodiscard]] ProcessingState get_processing_state() const override { return m_processing_state.load(); }
235 void update_processing_state(ProcessingState new_state) override;
236
237 void register_state_change_callback(
238 std::function<void(const std::shared_ptr<SignalSourceContainer>&, ProcessingState)> callback) override;
239 void unregister_state_change_callback() override;
240
241 [[nodiscard]] bool is_ready_for_processing() const override;
242 void mark_ready_for_processing(bool ready) override;
243
244 void create_default_processor() override;
245 void process_default() override;
246 void set_default_processor(const std::shared_ptr<DataProcessor>& processor) override;
247 [[nodiscard]] std::shared_ptr<DataProcessor> get_default_processor() const override;
248
249 std::shared_ptr<DataProcessingChain> get_processing_chain() override { return m_processing_chain; }
250 void set_processing_chain(const std::shared_ptr<DataProcessingChain>& chain) override { m_processing_chain = chain; }
251
252 // =========================================================================
253 // Reader tracking (WindowContainer-style atomic counting)
254 // =========================================================================
255
256 uint32_t register_dimension_reader(uint32_t dimension_index) override;
257 void unregister_dimension_reader(uint32_t dimension_index) override;
258 [[nodiscard]] bool has_active_readers() const override;
259 void mark_dimension_consumed(uint32_t dimension_index, uint32_t reader_id) override;
260 [[nodiscard]] bool all_dimensions_consumed() const override;
261
262 // =========================================================================
263 // Processing token
264 // =========================================================================
265
266 void reset_processing_token() override { m_processing_token_channel.store(-1); }
267
268 bool try_acquire_processing_token(int channel) override
269 {
270 int expected = -1;
271 return m_processing_token_channel.compare_exchange_strong(expected, channel);
272 }
273
274 [[nodiscard]] bool has_processing_token(int channel) const override
275 {
276 return m_processing_token_channel.load() == channel;
277 }
278
279 // =========================================================================
280 // Data access
281 // =========================================================================
282
283 const std::vector<DataVariant>& get_data() override { return m_data; }
284
285 DataAccess channel_data(size_t channel) override;
286 std::vector<DataAccess> all_channel_data() override;
287
288 std::vector<DataVariant>& get_processed_data() override { return m_processed_data; }
289 const std::vector<DataVariant>& get_processed_data() const override { return m_processed_data; }
290
291 void mark_buffers_for_processing(bool) override { }
292 void mark_buffers_for_removal() override { }
293
294 // =========================================================================
295 // Video-specific accessors
296 // =========================================================================
297
298 [[nodiscard]] uint32_t get_width() const { return m_width; }
299 [[nodiscard]] uint32_t get_height() const { return m_height; }
300 [[nodiscard]] uint32_t get_channels() const { return m_channels; }
301 [[nodiscard]] double get_frame_rate() const { return m_frame_rate; }
302
303 /**
304 * @brief Get raw pixel data for a single frame as uint8_t span.
305 * @param frame_index Zero-based frame index.
306 * @return Span of pixel bytes for the frame, empty if out of range.
307 */
308 [[nodiscard]] std::span<const uint8_t> get_frame_pixels(uint64_t frame_index) const;
309
310 /**
311 * @brief Get the total byte size of one frame (width * height * channels).
312 */
313 [[nodiscard]] size_t get_frame_byte_size() const;
314
315protected:
316 void setup_dimensions();
317 void notify_state_change(ProcessingState new_state);
318
319 uint32_t m_width = 0;
320 uint32_t m_height = 0;
321 uint32_t m_channels = 4;
322 double m_frame_rate = 0.0;
323 uint64_t m_num_frames = 0;
324
326
327 std::vector<DataVariant> m_data;
328 std::vector<DataVariant> m_processed_data;
329
330 mutable std::shared_mutex m_data_mutex;
331 mutable std::mutex m_state_mutex;
332
333 std::atomic<ProcessingState> m_processing_state { ProcessingState::IDLE };
334 std::atomic<int> m_processing_token_channel { -1 };
335
336 std::function<void(const std::shared_ptr<SignalSourceContainer>&, ProcessingState)> m_state_callback;
337 std::shared_ptr<DataProcessor> m_default_processor;
338 std::shared_ptr<DataProcessingChain> m_processing_chain;
339
340 std::unordered_map<std::string, RegionGroup> m_region_groups;
341
342 std::atomic<uint64_t> m_read_position { 0 };
343 bool m_looping_enabled = false;
345
346 std::atomic<uint32_t> m_registered_readers { 0 };
347 std::atomic<uint32_t> m_consumed_readers { 0 };
348
349 // =========================================================================
350 // Ring buffer state (inactive when m_ring_capacity == 0)
351 // =========================================================================
352
353 uint32_t m_ring_capacity { 0 };
354 uint64_t m_total_source_frames { 0 };
355
356 std::vector<std::atomic<uint64_t>> m_slot_frame;
357
358 static constexpr uint32_t READY_QUEUE_CAPACITY = 256;
360
361 /**
362 * @brief Highest frame index committed by the decode thread.
363 * Written by the decode thread via commit_frame(); read by
364 * update_read_position_for_channel() to compute buffered-ahead count.
365 */
366 std::atomic<uint64_t> m_cache_head { 0 };
367
368 /**
369 * @brief Trigger refill when (m_cache_head - read_position) drops below this.
370 * Set by the reader at load_into_container() time.
371 * A value of 0 disables threshold notification.
372 */
373 uint32_t m_refill_threshold { 0 };
374
375 Registry::Service::IOService* m_io_service { nullptr }; // non-owning; owned by registry
376 uint64_t m_io_reader_id { 0 };
377
378 [[nodiscard]] uint32_t slot_for(uint64_t frame_index) const
379 {
380 return static_cast<uint32_t>(frame_index % m_ring_capacity);
381 }
382};
383
384} // namespace MayaFlux::Kakshya
uint32_t width
Definition Decoder.cpp:59
size_t count
Type-erased accessor for NDData with semantic view construction.
Data-driven interface for temporal stream containers with navigable read position.
std::shared_ptr< DataProcessor > m_default_processor
const ContainerDataStructure & get_structure() const override
uint64_t get_cache_head() const
Total frame count known at construction / setup_ring() time.
ContainerDataStructure & get_structure() override
Get the data structure defining this container's layout.
std::shared_ptr< DataProcessingChain > get_processing_chain() override
Get the current processing chain for this container.
void set_refill_threshold(uint32_t threshold)
Set the number of frames below which the refill callback fires.
std::shared_ptr< DataProcessingChain > m_processing_chain
bool try_lock() override
Attempt to acquire a lock without blocking.
std::function< void(const std::shared_ptr< SignalSourceContainer > &, ProcessingState)> m_state_callback
std::vector< DataVariant > & get_processed_data() override
Get a mutable reference to the processed data buffer.
std::vector< std::atomic< uint64_t > > m_slot_frame
ProcessingState get_processing_state() const override
Get the current processing state of the container.
MemoryLayout get_memory_layout() const override
Get the memory layout used by this container.
Memory::LockFreeQueue< uint64_t, READY_QUEUE_CAPACITY > m_ready_queue
bool is_ring_mode() const
True if the container is operating in ring mode.
void unlock() override
Release a previously acquired lock.
bool try_acquire_processing_token(int channel) override
const std::vector< DataVariant > & get_data() override
Get a reference to the raw data stored in the container.
void mark_buffers_for_removal() override
Mark associated buffers for removal from the system.
std::unordered_map< std::string, RegionGroup > m_region_groups
void advance_cache_head(uint64_t frame_index)
Advance the container's view of how many frames have been decoded.
bool has_processing_token(int channel) const override
void set_processing_chain(const std::shared_ptr< DataProcessingChain > &chain) override
Set the processing chain for this container.
const std::vector< DataVariant > & get_processed_data() const override
Get a const reference to the processed data buffer.
void mark_buffers_for_processing(bool) override
Mark associated buffers for processing in the next cycle.
uint32_t slot_for(uint64_t frame_index) const
void set_structure(ContainerDataStructure structure) override
Set the data structure for this container.
void lock() override
Acquire a lock for thread-safe access.
bool is_looping() const override
Check if looping is enabled for the stream.
Concrete base implementation for streaming video containers.
Policy-driven unified circular buffer implementation.
ProcessingState
Represents the current processing lifecycle state of a container.
std::optional< RegionGroup > get_region_group(const std::unordered_map< std::string, RegionGroup > &groups, const std::string &name)
Get a RegionGroup by name from a group map.
void add_region_group(std::unordered_map< std::string, RegionGroup > &groups, const RegionGroup &group)
Add a RegionGroup to a group map.
MemoryLayout
Memory layout for multi-dimensional data.
Definition NDData.hpp:39
double position_to_time(uint64_t position, double sample_rate)
Convert position (samples/frames) to time (seconds) given a sample rate.
void remove_region_group(std::unordered_map< std::string, RegionGroup > &groups, const std::string &name)
Remove a RegionGroup by name from a group map.
uint64_t time_to_position(double time, double sample_rate)
Convert time (seconds) to position (samples/frames) given a sample rate.
Container structure for consistent dimension ordering.
Organizes related signal regions into a categorized collection.
Represents a point or span in N-dimensional space.
Definition Region.hpp:73
Backend IO streaming service interface.
Definition IOService.hpp:18