MayaFlux 0.2.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
VideoStreamContainer.hpp
Go to the documentation of this file.
1#pragma once
2
5
7#include <condition_variable>
8
10struct IOService;
11}
12
13namespace MayaFlux::Kakshya {
14
15/**
16 * @class VideoStreamContainer
17 * @brief Concrete base implementation for streaming video containers.
18 *
19 * VideoStreamContainer provides a complete, concrete implementation of all
20 * StreamContainer functionality for decoded video frame data. It serves as:
21 * 1. A standalone streaming container for real-time video processing
22 * 2. A base class for specialized containers like VideoFileContainer
23 *
24 * Data is stored as uint8_t pixels in RGBA interleaved layout (matching
25 * Vulkan VK_FORMAT_R8G8B8A8_UNORM and the TextureBuffer pipeline).
26 * Each frame is width * height * channels bytes. All frames are stored
27 * contiguously in a single DataVariant.
28 *
29 * Dimensions follow VIDEO_COLOR convention:
30 * dims[0] → TIME (frame count)
31 * dims[1] → SPATIAL_Y (height)
32 * dims[2] → SPATIAL_X (width)
33 * dims[3] → CHANNEL (colour channels, typically 4 for RGBA)
34 *
35 * Reader model follows WindowContainer's pattern: a simple atomic reader
36 * count rather than per-dimension/per-channel tracking. Video frames are
37 * atomic spatial units — channel-level access is a processor concern,
38 * not a container concern.
39 *
40 * Uses virtual inheritance to support diamond inheritance when used as a
41 * base for FileContainer-derived classes.
42 */
43class MAYAFLUX_API VideoStreamContainer : public virtual StreamContainer {
44public:
45 /**
46 * @brief Construct a VideoStreamContainer with specified parameters.
47 * @param width Frame width in pixels.
48 * @param height Frame height in pixels.
49 * @param channels Colour channels per pixel (default 4 for RGBA).
50 * @param frame_rate Temporal rate in frames per second.
51 */
52 VideoStreamContainer(uint32_t width = 0,
53 uint32_t height = 0,
54 uint32_t channels = 4,
55 double frame_rate = 0.0);
56
57 ~VideoStreamContainer() override = default;
58
59 // =========================================================================
60 // NDDimensionalContainer
61 // =========================================================================
62
63 [[nodiscard]] std::vector<DataDimension> get_dimensions() const override;
64 [[nodiscard]] uint64_t get_total_elements() const override;
65 [[nodiscard]] MemoryLayout get_memory_layout() const override { return m_structure.memory_layout; }
66 void set_memory_layout(MemoryLayout layout) override;
67
68 [[nodiscard]] uint64_t get_frame_size() const override;
69 [[nodiscard]] uint64_t get_num_frames() const override;
70
71 std::vector<DataVariant> get_region_data(const Region& region) const override;
72 void set_region_data(const Region& region, const std::vector<DataVariant>& data) override;
73
74 std::vector<DataVariant> get_region_group_data(const RegionGroup& group) const override;
75 std::vector<DataVariant> get_segments_data(const std::vector<RegionSegment>& segment) const override;
76
77 [[nodiscard]] std::span<const double> get_frame(uint64_t frame_index) const override;
78 void get_frames(std::span<double> output, uint64_t start_frame, uint64_t num_frames) const override;
79
80 [[nodiscard]] double get_value_at(const std::vector<uint64_t>& coordinates) const override;
81 void set_value_at(const std::vector<uint64_t>& coordinates, double value) override;
82
83 [[nodiscard]] uint64_t coordinates_to_linear_index(const std::vector<uint64_t>& coordinates) const override;
84 [[nodiscard]] std::vector<uint64_t> linear_index_to_coordinates(uint64_t linear_index) const override;
85
86 void clear() override;
87 void lock() override { m_data_mutex.lock(); }
88 void unlock() override { m_data_mutex.unlock(); }
89 bool try_lock() override { return m_data_mutex.try_lock(); }
90
91 [[nodiscard]] const void* get_raw_data() const override;
92 [[nodiscard]] bool has_data() const override;
93
94 ContainerDataStructure& get_structure() override { return m_structure; }
95 const ContainerDataStructure& get_structure() const override { return m_structure; }
96 void set_structure(ContainerDataStructure structure) override { m_structure = structure; }
97
98 // =========================================================================
99 // Ring buffer streaming API
100 // =========================================================================
101
102 /**
103 * @brief Allocate m_data[0] as a ring of ring_capacity frames.
104 *
105 * Switches the container from flat mode to ring mode. m_data[0] is
106 * resized to ring_capacity x frame_byte_size. m_num_frames is set to
107 * total_frames so processors see the full temporal extent. Pixel data
108 * is indexed by frame_index % ring_capacity.
109 *
110 * @param total_frames Total frames in the source (file, stream, etc).
111 * @param ring_capacity Number of frame slots (must be power of 2).
112 * @param width Frame width in pixels.
113 * @param height Frame height in pixels.
114 * @param channels Colour channels per pixel.
115 * @param frame_rate Frame rate in fps.
116 * @param refill_threshold Frames of look-ahead below which refill callback fires.
117 * @param reader_id The current class ID registered at the stream/file-read source
118 */
119 void setup_ring(uint64_t total_frames,
120 uint32_t ring_capacity,
121 uint32_t width,
122 uint32_t height,
123 uint32_t channels,
124 double frame_rate,
125 uint32_t refill_threshold,
126 uint64_t reader_id = 0);
127
128 /**
129 * @brief Mutable pointer into m_data[0] for the decode thread to write into.
130 * @param frame_index Absolute frame index; mapped to slot via modulo.
131 * @return Pointer into the pixel vector, or nullptr if not in ring mode.
132 */
133 [[nodiscard]] uint8_t* mutable_slot_ptr(uint64_t frame_index);
134
135 /**
136 * @brief Publish a decoded frame. Sets validity, pushes to ready queue,
137 * notifies any thread blocked in get_frame_pixels().
138 * @param frame_index Absolute frame index just written.
139 */
140 void commit_frame(uint64_t frame_index);
141
142 /**
143 * @brief Invalidate all ring slots. Called before seek.
144 */
145 void invalidate_ring();
146
147 /**
148 * @brief Check if a frame is currently valid in the ring.
149 * @param frame_index Absolute frame index.
150 */
151 [[nodiscard]] bool is_frame_available(uint64_t frame_index) const;
152
153 /**
154 * @brief True if the container is operating in ring mode.
155 */
156 [[nodiscard]] bool is_ring_mode() const { return m_ring_capacity > 0; }
157
158 [[nodiscard]] uint32_t get_ring_capacity() const { return m_ring_capacity; }
159 [[nodiscard]] uint64_t get_total_source_frames() const { return m_total_source_frames; }
160
161 /**
162 * @brief Set the number of frames below which the refill callback fires.
163 * Called by the reader before or immediately after setup_ring().
164 * @param threshold Frames of look-ahead below which notification fires.
165 */
166 void set_refill_threshold(uint32_t threshold)
167 {
168 m_refill_threshold = threshold;
169 }
170
171 /**
172 * @brief Advance the container's view of how many frames have been decoded.
173 * Called by the decode thread (via VideoFileReader) after commit_frame().
174 * Monotonically increasing; never decremented (seek resets via setup_ring).
175 * @param frame_index The highest frame index just committed.
176 */
177 void advance_cache_head(uint64_t frame_index)
178 {
179 uint64_t prev = m_cache_head.load(std::memory_order_relaxed);
180 while (frame_index > prev
181 && !m_cache_head.compare_exchange_weak(prev, frame_index,
182 std::memory_order_release, std::memory_order_relaxed)) { }
183 }
184
185 /**
186 * @brief Total frame count known at construction / setup_ring() time.
187 * Non-zero even before any frames are decoded.
188 */
189 [[nodiscard]] uint64_t get_cache_head() const
190 {
191 return m_cache_head.load(std::memory_order_acquire);
192 }
193
194 // =========================================================================
195 // RegionGroup management
196 // =========================================================================
197
198 void add_region_group(const RegionGroup& group) override;
199 const RegionGroup& get_region_group(const std::string& name) const override;
200 std::unordered_map<std::string, RegionGroup> get_all_region_groups() const override;
201 void remove_region_group(const std::string& name) override;
202
203 bool is_region_loaded(const Region& region) const override;
204 void load_region(const Region& region) override;
205 void unload_region(const Region& region) override;
206
207 // =========================================================================
208 // Read position and looping
209 // =========================================================================
210
211 void set_read_position(const std::vector<uint64_t>& position) override;
212 void update_read_position_for_channel(size_t channel, uint64_t frame) override;
213 [[nodiscard]] const std::vector<uint64_t>& get_read_position() const override;
214 void advance_read_position(const std::vector<uint64_t>& frames) override;
215 [[nodiscard]] bool is_at_end() const override;
216 void reset_read_position() override;
217
218 [[nodiscard]] uint64_t get_temporal_rate() const override;
219 [[nodiscard]] uint64_t time_to_position(double time) const override;
220 [[nodiscard]] double position_to_time(uint64_t position) const override;
221
222 void set_looping(bool enable) override;
223 [[nodiscard]] bool is_looping() const override { return m_looping_enabled; }
224 void set_loop_region(const Region& region) override;
225 [[nodiscard]] Region get_loop_region() const override;
226
227 [[nodiscard]] bool is_ready() const override;
228 [[nodiscard]] std::vector<uint64_t> get_remaining_frames() const override;
229 uint64_t read_sequential(std::span<double> output, uint64_t count) override;
230 uint64_t peek_sequential(std::span<double> output, uint64_t count, uint64_t offset) const override;
231
232 // =========================================================================
233 // Processing state
234 // =========================================================================
235
236 [[nodiscard]] ProcessingState get_processing_state() const override { return m_processing_state.load(); }
237 void update_processing_state(ProcessingState new_state) override;
238
239 void register_state_change_callback(
240 std::function<void(const std::shared_ptr<SignalSourceContainer>&, ProcessingState)> callback) override;
241 void unregister_state_change_callback() override;
242
243 [[nodiscard]] bool is_ready_for_processing() const override;
244 void mark_ready_for_processing(bool ready) override;
245
246 void create_default_processor() override;
247 void process_default() override;
248 void set_default_processor(const std::shared_ptr<DataProcessor>& processor) override;
249 [[nodiscard]] std::shared_ptr<DataProcessor> get_default_processor() const override;
250
251 std::shared_ptr<DataProcessingChain> get_processing_chain() override { return m_processing_chain; }
252 void set_processing_chain(const std::shared_ptr<DataProcessingChain>& chain) override { m_processing_chain = chain; }
253
254 // =========================================================================
255 // Reader tracking (WindowContainer-style atomic counting)
256 // =========================================================================
257
258 uint32_t register_dimension_reader(uint32_t dimension_index) override;
259 void unregister_dimension_reader(uint32_t dimension_index) override;
260 [[nodiscard]] bool has_active_readers() const override;
261 void mark_dimension_consumed(uint32_t dimension_index, uint32_t reader_id) override;
262 [[nodiscard]] bool all_dimensions_consumed() const override;
263
264 // =========================================================================
265 // Processing token
266 // =========================================================================
267
268 void reset_processing_token() override { m_processing_token_channel.store(-1); }
269
270 bool try_acquire_processing_token(int channel) override
271 {
272 int expected = -1;
273 return m_processing_token_channel.compare_exchange_strong(expected, channel);
274 }
275
276 [[nodiscard]] bool has_processing_token(int channel) const override
277 {
278 return m_processing_token_channel.load() == channel;
279 }
280
281 // =========================================================================
282 // Data access
283 // =========================================================================
284
285 const std::vector<DataVariant>& get_data() override { return m_data; }
286
287 DataAccess channel_data(size_t channel) override;
288 std::vector<DataAccess> all_channel_data() override;
289
290 std::vector<DataVariant>& get_processed_data() override { return m_processed_data; }
291 const std::vector<DataVariant>& get_processed_data() const override { return m_processed_data; }
292
293 void mark_buffers_for_processing(bool) override { }
294 void mark_buffers_for_removal() override { }
295
296 // =========================================================================
297 // Video-specific accessors
298 // =========================================================================
299
300 [[nodiscard]] uint32_t get_width() const { return m_width; }
301 [[nodiscard]] uint32_t get_height() const { return m_height; }
302 [[nodiscard]] uint32_t get_channels() const { return m_channels; }
303 [[nodiscard]] double get_frame_rate() const { return m_frame_rate; }
304
305 /**
306 * @brief Get raw pixel data for a single frame as uint8_t span.
307 * @param frame_index Zero-based frame index.
308 * @return Span of pixel bytes for the frame, empty if out of range.
309 */
310 [[nodiscard]] std::span<const uint8_t> get_frame_pixels(uint64_t frame_index) const;
311
312 /**
313 * @brief Get the total byte size of one frame (width * height * channels).
314 */
315 [[nodiscard]] size_t get_frame_byte_size() const;
316
317protected:
318 void setup_dimensions();
319 void notify_state_change(ProcessingState new_state);
320
321 uint32_t m_width = 0;
322 uint32_t m_height = 0;
323 uint32_t m_channels = 4;
324 double m_frame_rate = 0.0;
325 uint64_t m_num_frames = 0;
326
328
329 std::vector<DataVariant> m_data;
330 std::vector<DataVariant> m_processed_data;
331
332 mutable std::shared_mutex m_data_mutex;
333 mutable std::mutex m_state_mutex;
334
335 std::atomic<ProcessingState> m_processing_state { ProcessingState::IDLE };
336 std::atomic<int> m_processing_token_channel { -1 };
337
338 std::function<void(const std::shared_ptr<SignalSourceContainer>&, ProcessingState)> m_state_callback;
339 std::shared_ptr<DataProcessor> m_default_processor;
340 std::shared_ptr<DataProcessingChain> m_processing_chain;
341
342 std::unordered_map<std::string, RegionGroup> m_region_groups;
343
344 std::atomic<uint64_t> m_read_position { 0 };
345 bool m_looping_enabled = false;
347
348 std::atomic<uint32_t> m_registered_readers { 0 };
349 std::atomic<uint32_t> m_consumed_readers { 0 };
350
351 // =========================================================================
352 // Ring buffer state (inactive when m_ring_capacity == 0)
353 // =========================================================================
354
355 uint32_t m_ring_capacity { 0 };
356 uint64_t m_total_source_frames { 0 };
357
358 std::vector<std::atomic<uint64_t>> m_slot_frame;
359
360 static constexpr uint32_t READY_QUEUE_CAPACITY = 256;
362
363 /**
364 * @brief Highest frame index committed by the decode thread.
365 * Written by the decode thread via commit_frame(); read by
366 * update_read_position_for_channel() to compute buffered-ahead count.
367 */
368 std::atomic<uint64_t> m_cache_head { 0 };
369
370 /**
371 * @brief Trigger refill when (m_cache_head - read_position) drops below this.
372 * Set by the reader at load_into_container() time.
373 * A value of 0 disables threshold notification.
374 */
375 uint32_t m_refill_threshold { 0 };
376
377 Registry::Service::IOService* m_io_service { nullptr }; // non-owning; owned by registry
378 uint64_t m_io_reader_id { 0 };
379
380 [[nodiscard]] uint32_t slot_for(uint64_t frame_index) const
381 {
382 return static_cast<uint32_t>(frame_index % m_ring_capacity);
383 }
384};
385
386} // namespace MayaFlux::Kakshya
Eigen::Index count
Type-erased accessor for NDData with semantic view construction.
Data-driven interface for temporal stream containers with navigable read position.
std::shared_ptr< DataProcessor > m_default_processor
const ContainerDataStructure & get_structure() const override
uint64_t get_cache_head() const
Total frame count known at construction / setup_ring() time.
ContainerDataStructure & get_structure() override
Get the data structure defining this container's layout.
std::shared_ptr< DataProcessingChain > get_processing_chain() override
Get the current processing chain for this container.
void set_refill_threshold(uint32_t threshold)
Set the number of frames below which the refill callback fires.
std::shared_ptr< DataProcessingChain > m_processing_chain
bool try_lock() override
Attempt to acquire a lock without blocking.
std::function< void(const std::shared_ptr< SignalSourceContainer > &, ProcessingState)> m_state_callback
std::vector< DataVariant > & get_processed_data() override
Get a mutable reference to the processed data buffer.
std::vector< std::atomic< uint64_t > > m_slot_frame
ProcessingState get_processing_state() const override
Get the current processing state of the container.
MemoryLayout get_memory_layout() const override
Get the memory layout used by this container.
Memory::LockFreeQueue< uint64_t, READY_QUEUE_CAPACITY > m_ready_queue
bool is_ring_mode() const
True if the container is operating in ring mode.
void unlock() override
Release a previously acquired lock.
bool try_acquire_processing_token(int channel) override
const std::vector< DataVariant > & get_data() override
Get a reference to the raw data stored in the container.
void mark_buffers_for_removal() override
Mark associated buffers for removal from the system.
std::unordered_map< std::string, RegionGroup > m_region_groups
void advance_cache_head(uint64_t frame_index)
Advance the container's view of how many frames have been decoded.
bool has_processing_token(int channel) const override
void set_processing_chain(const std::shared_ptr< DataProcessingChain > &chain) override
Set the processing chain for this container.
const std::vector< DataVariant > & get_processed_data() const override
Get a const reference to the processed data buffer.
void mark_buffers_for_processing(bool) override
Mark associated buffers for processing in the next cycle.
uint32_t slot_for(uint64_t frame_index) const
void set_structure(ContainerDataStructure structure) override
Set the data structure for this container.
void lock() override
Acquire a lock for thread-safe access.
bool is_looping() const override
Check if looping is enabled for the stream.
Concrete base implementation for streaming video containers.
Policy-driven unified circular buffer implementation.
ProcessingState
Represents the current processing lifecycle state of a container.
std::optional< RegionGroup > get_region_group(const std::unordered_map< std::string, RegionGroup > &groups, const std::string &name)
Get a RegionGroup by name from a group map.
void add_region_group(std::unordered_map< std::string, RegionGroup > &groups, const RegionGroup &group)
Add a RegionGroup to a group map.
MemoryLayout
Memory layout for multi-dimensional data.
Definition NDData.hpp:36
double position_to_time(uint64_t position, double sample_rate)
Convert position (samples/frames) to time (seconds) given a sample rate.
void remove_region_group(std::unordered_map< std::string, RegionGroup > &groups, const std::string &name)
Remove a RegionGroup by name from a group map.
uint64_t time_to_position(double time, double sample_rate)
Convert time (seconds) to position (samples/frames) given a sample rate.
Container structure for consistent dimension ordering.
Organizes related signal regions into a categorized collection.
Represents a point or span in N-dimensional space.
Definition Region.hpp:67
Backend IO streaming service interface.
Definition IOService.hpp:18