MayaFlux 0.2.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
VideoFileReader.cpp
Go to the documentation of this file.
1#include "VideoFileReader.hpp"
3
6
9
10extern "C" {
11#include <cstddef>
12#include <libavcodec/avcodec.h>
13#include <libavformat/avformat.h>
14#include <libswscale/swscale.h>
15}
16
17namespace MayaFlux::IO {
18
19// =========================================================================
20// Construction / destruction
21// =========================================================================
22
24
25void VideoFileReader::setup_io_service(uint64_t reader_id)
26{
27 m_reader_id = reader_id;
28
30 .get_service<Registry::Service::IOService>()) {
31
32 m_io_service = std::make_shared<Registry::Service::IOService>();
33 m_io_service->request_decode = [this](uint64_t reader_id) {
34 if (reader_id == m_reader_id)
36 };
37
40 [this]() -> void* { return m_io_service.get(); });
41
42 m_owns_io_service = true;
43 }
44}
45
46void VideoFileReader::setup_io_service(const std::shared_ptr<Registry::Service::IOService>& io_service, uint64_t reader_id)
47{
48 m_io_service = io_service;
49 m_reader_id = reader_id;
50 m_owns_io_service = false;
51}
52
57
58bool VideoFileReader::can_read(const std::string& filepath) const
59{
60 static const std::vector<std::string> exts = {
61 "mp4", "mkv", "avi", "mov", "webm", "flv", "wmv", "m4v", "ts", "mts"
62 };
63 auto dot = filepath.rfind('.');
64 if (dot == std::string::npos)
65 return false;
66 std::string ext = filepath.substr(dot + 1);
67 std::ranges::transform(ext, ext.begin(), ::tolower);
68 return std::ranges::find(exts, ext) != exts.end();
69}
70
71bool VideoFileReader::open(const std::string& filepath, FileReadOptions options)
72{
73 close();
74
75 m_filepath = filepath;
76 m_options = options;
77
78 auto demux = std::make_shared<FFmpegDemuxContext>();
79 if (!demux->open(filepath)) {
80 set_error(demux->last_error());
81 return false;
82 }
83
84 auto video = std::make_shared<VideoStreamContext>();
85 if (!video->open(*demux, m_target_width, m_target_height)) {
86 set_error(video->last_error());
87 return false;
88 }
89
90 std::shared_ptr<AudioStreamContext> audio;
93
94 audio = std::make_shared<AudioStreamContext>();
95 if (!audio->open(*demux, planar, m_target_sample_rate)) {
97 "VideoFileReader: no audio stream found or audio open failed");
98 audio.reset();
99 }
100 }
101
102 {
103 std::unique_lock lock(m_context_mutex);
104 m_demux = std::move(demux);
105 m_video = std::move(video);
106 m_audio = std::move(audio);
107 }
108
113
114 return true;
115}
116
118{
120 m_container_ref.reset();
121
122 std::unique_lock ctx_lock(m_context_mutex);
123
124 if (m_audio) {
125 m_audio->close();
126 m_audio.reset();
127 }
128 if (m_video) {
129 m_video->close();
130 m_video.reset();
131 }
132 if (m_demux) {
133 m_demux->close();
134 m_demux.reset();
135 }
136
137 m_audio_container.reset();
138 m_sws_buf.clear();
139 m_sws_buf.shrink_to_fit();
140
141 {
142 std::lock_guard lock(m_metadata_mutex);
143 m_cached_metadata.reset();
144 m_cached_regions.clear();
145 }
146
147 m_decode_head.store(0);
148 clear_error();
149
150 if (m_owns_io_service) {
153 m_io_service.reset();
154 m_owns_io_service = false;
155 }
156}
157
159{
160 std::shared_lock lock(m_context_mutex);
161 return m_demux && m_video && m_video->is_valid();
162}
163
164// =========================================================================
165// Metadata / regions
166// =========================================================================
167
168std::optional<FileMetadata> VideoFileReader::get_metadata() const
169{
170 std::lock_guard lock(m_metadata_mutex);
171 return m_cached_metadata;
172}
173
174std::vector<FileRegion> VideoFileReader::get_regions() const
175{
176 std::lock_guard lock(m_metadata_mutex);
177 return m_cached_regions;
178}
179
181 const std::shared_ptr<FFmpegDemuxContext>& demux,
182 const std::shared_ptr<VideoStreamContext>& video) const
183{
184 FileMetadata meta;
185 meta.mime_type = "video";
186 demux->extract_container_metadata(meta);
187 video->extract_stream_metadata(*demux, meta);
188
189 std::lock_guard lock(m_metadata_mutex);
190 m_cached_metadata = std::move(meta);
191}
192
194 const std::shared_ptr<FFmpegDemuxContext>& demux,
195 const std::shared_ptr<VideoStreamContext>& video) const
196{
197 std::vector<FileRegion> regions;
198
199 auto chapters = demux->extract_chapter_regions();
200 regions.insert(regions.end(),
201 std::make_move_iterator(chapters.begin()),
202 std::make_move_iterator(chapters.end()));
203
204 auto keyframes = video->extract_keyframe_regions(*demux);
205 regions.insert(regions.end(),
206 std::make_move_iterator(keyframes.begin()),
207 std::make_move_iterator(keyframes.end()));
208
209 std::lock_guard lock(m_metadata_mutex);
210 m_cached_regions = std::move(regions);
211}
212
214{
215 return typeid(Kakshya::VideoFileContainer);
216}
217
218// =========================================================================
219// FileReader interface
220// =========================================================================
221
222std::vector<Kakshya::DataVariant> VideoFileReader::read_all()
223{
225 "VideoFileReader::read_all() is not supported; "
226 "use create_container() + load_into_container()");
227 return {};
228}
229
230std::vector<Kakshya::DataVariant> VideoFileReader::read_region(const FileRegion& /*region*/)
231{
233 "VideoFileReader::read_region() is not supported; "
234 "use the container API to access regions");
235 return {};
236}
237
238// =========================================================================
239// Container operations
240// =========================================================================
241
242std::shared_ptr<Kakshya::SignalSourceContainer> VideoFileReader::create_container()
243{
244 std::shared_lock lock(m_context_mutex);
245 if (!m_demux || !m_video) {
246 set_error("File not open");
247 return nullptr;
248 }
249 return std::make_shared<Kakshya::VideoFileContainer>();
250}
251
253 std::shared_ptr<Kakshya::SignalSourceContainer> container)
254{
255 if (!container) {
256 set_error("Invalid container");
257 return false;
258 }
259
260 auto vc = std::dynamic_pointer_cast<Kakshya::VideoFileContainer>(container);
261 if (!vc) {
262 set_error("Container is not a VideoFileContainer");
263 return false;
264 }
265
266 std::shared_ptr<VideoStreamContext> video;
267 std::shared_ptr<AudioStreamContext> audio;
268 std::shared_ptr<FFmpegDemuxContext> demux;
269 {
270 std::shared_lock lock(m_context_mutex);
271 if (!m_demux || !m_video) {
272 set_error("File not open");
273 return false;
274 }
275 video = m_video;
276 audio = m_audio;
277 demux = m_demux;
278 }
279
280 const uint64_t total = video->total_frames;
281 if (total == 0) {
282 set_error("Video stream reports 0 frames");
283 return false;
284 }
285
286 const uint32_t ring_cap = std::min(
288 static_cast<uint32_t>(total));
289
290 const uint32_t threshold = (m_refill_threshold > 0)
292 : ring_cap / 4;
293
294 vc->setup_ring(total, ring_cap,
295 video->out_width, video->out_height,
296 video->out_bytes_per_pixel, video->frame_rate,
297 threshold, m_reader_id);
298
299 m_sws_buf.resize(
300 static_cast<size_t>(video->out_linesize) * video->out_height);
301
304 && demux->find_best_stream(AVMEDIA_TYPE_AUDIO) >= 0;
305
306 if (want_audio && audio && audio->is_valid()) {
307 {
308 std::unique_lock lock(m_context_mutex);
309 demux->seek(audio->stream_index, 0);
310 audio->flush_codec();
311 audio->drain_resampler_init();
312 }
313
314 SoundFileReader audio_reader;
317
318 if (audio_reader.open_from_demux(demux, audio, m_filepath, m_options)) {
319 auto sc = audio_reader.create_container();
320 if (audio_reader.load_into_container(sc)) {
321 m_audio_container = std::dynamic_pointer_cast<Kakshya::SoundFileContainer>(sc);
322 } else {
324 "VideoFileReader: audio load failed: {}",
325 audio_reader.get_last_error());
326 }
327 } else {
329 "VideoFileReader: open_from_demux failed: {}",
330 audio_reader.get_last_error());
331 }
332
333 {
334 std::unique_lock lock(m_context_mutex);
335 demux->seek(video->stream_index, 0);
336 video->flush_codec();
337 }
338 }
339
340 m_decode_head.store(0);
341 m_container_ref = vc;
342
343 const uint64_t preload = std::min(
344 static_cast<uint64_t>(ring_cap),
345 total);
346
347 uint64_t decoded = decode_batch(*vc, preload);
348
349 if (decoded == 0) {
350 set_error("Failed to decode any frames during preload");
351 return false;
352 }
353
355 "VideoFileReader: preloaded {}/{} frames ({}x{}, {:.1f} fps, ring={})",
356 decoded, total,
357 video->out_width, video->out_height,
358 video->frame_rate, ring_cap);
359
360 auto regions = get_regions();
361 auto region_groups = regions_to_groups(regions);
362 for (const auto& [name, group] : region_groups)
363 vc->add_region_group(group);
364
365 vc->create_default_processor();
366 vc->mark_ready_for_processing(true);
367
368 if (decoded < total)
370
371 return true;
372}
373
375 Kakshya::VideoFileContainer& vc, uint64_t batch_size)
376{
377 std::shared_lock ctx_lock(m_context_mutex);
378 if (!m_demux || !m_video || !m_video->is_valid())
379 return 0;
380
381 const size_t required = static_cast<size_t>(m_video->out_linesize) * m_video->out_height;
382 if (m_sws_buf.size() < required)
383 m_sws_buf.resize(required);
384
385 const size_t frame_bytes = vc.get_frame_byte_size();
386 const int packed_stride = static_cast<int>(
387 m_video->out_width * m_video->out_bytes_per_pixel);
388
389 uint64_t decoded = 0;
390
391 AVPacket* pkt = av_packet_alloc();
392 AVFrame* frame = av_frame_alloc();
393 if (!pkt || !frame) {
394 av_packet_free(&pkt);
395 av_frame_free(&frame);
396 return 0;
397 }
398
399 uint8_t* sws_dst[1] = { m_sws_buf.data() };
400 int sws_stride[1] = { m_video->out_linesize };
401
402 auto write_frame_to_ring = [&]() -> bool {
403 uint64_t idx = m_decode_head.load();
404 if (idx >= vc.get_total_source_frames())
405 return false;
406
407 uint8_t* dest = vc.mutable_slot_ptr(idx);
408 if (!dest)
409 return false;
410
411 sws_scale(m_video->sws_context,
412 frame->data, frame->linesize,
413 0, static_cast<int>(m_video->height),
414 sws_dst, sws_stride);
415
416 if (m_video->out_linesize == packed_stride) {
417 std::memcpy(dest, m_sws_buf.data(), frame_bytes);
418 } else {
419 for (uint32_t row = 0; row < m_video->out_height; ++row) {
420 std::memcpy(
421 dest + static_cast<size_t>(row) * packed_stride,
422 m_sws_buf.data() + static_cast<size_t>(row) * m_video->out_linesize,
423 static_cast<size_t>(packed_stride));
424 }
425 }
426
427 vc.commit_frame(idx);
428 m_decode_head.fetch_add(1);
429 ++decoded;
430
431 av_frame_unref(frame);
432 return true;
433 };
434
435 while (decoded < batch_size) {
436 int ret = av_read_frame(m_demux->format_context, pkt);
437
438 if (ret < 0) {
439 if (ret == AVERROR_EOF) {
440 avcodec_send_packet(m_video->codec_context, nullptr);
441 } else {
442 break;
443 }
444 } else if (pkt->stream_index != m_video->stream_index) {
445 av_packet_unref(pkt);
446 continue;
447 } else {
448 ret = avcodec_send_packet(m_video->codec_context, pkt);
449 av_packet_unref(pkt);
450 if (ret < 0 && ret != AVERROR(EAGAIN))
451 continue;
452 }
453
454 while (decoded < batch_size) {
455 ret = avcodec_receive_frame(m_video->codec_context, frame);
456 if (ret == AVERROR(EAGAIN))
457 break;
458 if (ret == AVERROR_EOF)
459 goto done;
460 if (ret < 0) {
461 av_frame_unref(frame);
462 break;
463 }
464
465 if (!write_frame_to_ring())
466 goto done;
467 }
468
469 if (ret == AVERROR_EOF)
470 break;
471 }
472
473done:
474 av_packet_free(&pkt);
475 av_frame_free(&frame);
476 return decoded;
477}
478
479// =========================================================================
480// Background decode thread
481// =========================================================================
482
484{
486
487 m_decode_stop.store(false);
488 m_decode_active.store(true);
490}
491
493{
494 if (!m_decode_active.load())
495 return;
496
497 m_decode_stop.store(true);
498 m_decode_cv.notify_all();
499
500 if (m_decode_thread.joinable())
501 m_decode_thread.join();
502
503 m_decode_active.store(false);
504}
505
507{
508 auto vc = m_container_ref.lock();
509 if (!vc) {
511 "VideoFileReader: decode thread — container expired");
512 m_decode_active.store(false);
513 return;
514 }
515
516 const uint64_t total = vc->get_total_source_frames();
517 const uint32_t ring_cap = vc->get_ring_capacity();
518 const uint32_t threshold = (m_refill_threshold > 0)
520 : ring_cap / 4;
521
522 while (!m_decode_stop.load()) {
523 uint64_t head = m_decode_head.load();
524 const uint64_t read_pos = vc->get_read_position()[0];
525
526 if (head >= total)
527 break;
528
529 const uint64_t buffered = (head > read_pos) ? (head - read_pos) : 0;
530
531 if (buffered >= static_cast<uint64_t>(ring_cap)) {
532 std::unique_lock lock(m_decode_mutex);
533 m_decode_cv.wait_for(lock, std::chrono::milliseconds(50), [&] {
534 if (m_decode_stop.load())
535 return true;
536 const uint64_t h = m_decode_head.load(std::memory_order_acquire);
537 const uint64_t rp = vc->get_read_position()[0];
538 const uint64_t ahead = (h > rp) ? (h - rp) : 0;
539 return ahead <= static_cast<uint64_t>(ring_cap - threshold);
540 });
541 continue;
542 }
543
544 const uint64_t want = static_cast<uint64_t>(ring_cap) - buffered;
545 const uint64_t capped = std::min(want, total - head);
546 const uint64_t batch = std::min(capped,
547 static_cast<uint64_t>(m_decode_batch_size));
548
549 uint64_t decoded = decode_batch(*vc, batch);
550
551 if (decoded == 0)
552 break;
553 }
554
555 m_decode_active.store(false);
556}
557
558// =========================================================================
559// Seeking
560// =========================================================================
561
562std::vector<uint64_t> VideoFileReader::get_read_position() const
563{
564 return { m_decode_head.load() };
565}
566
567bool VideoFileReader::seek(const std::vector<uint64_t>& position)
568{
569 if (position.empty())
570 return false;
571
572 const uint64_t target_frame = position[0];
573
575
576 std::shared_ptr<VideoStreamContext> video;
577 std::shared_ptr<FFmpegDemuxContext> demux;
578 {
579 std::shared_lock lock(m_context_mutex);
580 if (!m_demux || !m_video || !m_video->is_valid()) {
581 set_error("Cannot seek: reader not open");
582 return false;
583 }
584 video = m_video;
585 demux = m_demux;
586 }
587
588 if (!seek_internal(demux, video, target_frame))
589 return false;
590
591 m_decode_head.store(target_frame);
592
593 auto vc = m_container_ref.lock();
594 if (!vc)
595 return true;
596
597 vc->invalidate_ring();
598 vc->set_read_position({ target_frame });
599
600 const uint64_t total = vc->get_total_source_frames();
601 const uint64_t batch = std::min(
602 static_cast<uint64_t>(m_decode_batch_size),
603 total > target_frame ? total - target_frame : 0UL);
604
605 decode_batch(*vc, batch);
606
607 if (m_decode_head.load() < total)
609
610 return true;
611}
612
614 const std::shared_ptr<FFmpegDemuxContext>& demux,
615 const std::shared_ptr<VideoStreamContext>& video,
616 uint64_t frame_position)
617{
618 if (frame_position > video->total_frames)
619 frame_position = video->total_frames;
620
621 if (video->frame_rate <= 0.0) {
622 set_error("Invalid frame rate for seeking");
623 return false;
624 }
625
626 AVStream* stream = demux->get_stream(video->stream_index);
627 if (!stream) {
628 set_error("Invalid stream index");
629 return false;
630 }
631
632 double target_seconds = static_cast<double>(frame_position) / video->frame_rate;
633 auto ts = static_cast<int64_t>(target_seconds / av_q2d(stream->time_base));
634
635 if (!demux->seek(video->stream_index, ts)) {
636 set_error(demux->last_error());
637 return false;
638 }
639
640 video->flush_codec();
641 return true;
642}
643
645{
646 m_decode_cv.notify_one();
647}
648
649// =========================================================================
650// Dimension queries
651// =========================================================================
652
654{
655 return 4;
656}
657
658std::vector<uint64_t> VideoFileReader::get_dimension_sizes() const
659{
660 std::shared_lock lock(m_context_mutex);
661 if (!m_video)
662 return { 0, 0, 0, 0 };
663 return {
664 m_video->total_frames,
665 m_video->out_height,
666 m_video->out_width,
667 m_video->out_bytes_per_pixel
668 };
669}
670
671std::vector<std::string> VideoFileReader::get_supported_extensions() const
672{
673 return { "mp4", "mkv", "avi", "mov", "webm", "flv", "wmv", "m4v", "ts", "mts" };
674}
675
676// =========================================================================
677// Error
678// =========================================================================
679
681{
682 std::lock_guard lock(m_error_mutex);
683 return m_last_error;
684}
685
686void VideoFileReader::set_error(const std::string& msg) const
687{
688 std::lock_guard lock(m_error_mutex);
689 m_last_error = msg;
691 "VideoFileReader: {}", msg);
692}
693
695{
696 std::lock_guard lock(m_error_mutex);
697 m_last_error.clear();
698}
699
700} // namespace MayaFlux::IO
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
static std::unordered_map< std::string, Kakshya::RegionGroup > regions_to_groups(const std::vector< FileRegion > &regions)
Convert file regions to region groups.
std::string get_last_error() const override
Get the last error message encountered by the reader.
bool load_into_container(std::shared_ptr< Kakshya::SignalSourceContainer > container) override
Load file data into an existing SignalSourceContainer.
bool open_from_demux(std::shared_ptr< FFmpegDemuxContext > demux, std::shared_ptr< AudioStreamContext > audio, const std::string &filepath, FileReadOptions options=FileReadOptions::ALL)
Open an audio stream from an already-constructed demux and stream context.
void set_audio_options(AudioReadOptions options)
Set audio-specific read options.
std::shared_ptr< Kakshya::SignalSourceContainer > create_container() override
Create a SignalSourceContainer for this file.
void set_target_sample_rate(uint32_t sample_rate)
Set the target sample rate for resampling.
FFmpeg-based audio file reader for MayaFlux.
std::condition_variable m_decode_cv
uint64_t decode_batch(Kakshya::VideoFileContainer &vc, uint64_t batch_size)
Decode up to batch_size frames starting at m_decode_head.
void set_error(const std::string &msg) const
std::vector< FileRegion > m_cached_regions
std::vector< FileRegion > get_regions() const override
Get semantic regions from the file.
void build_metadata(const std::shared_ptr< FFmpegDemuxContext > &demux, const std::shared_ptr< VideoStreamContext > &video) const
std::vector< uint8_t > m_sws_buf
One-frame sws scratch buffer (padded linesize, reused by decode thread).
void build_regions(const std::shared_ptr< FFmpegDemuxContext > &demux, const std::shared_ptr< VideoStreamContext > &video) const
bool open(const std::string &filepath, FileReadOptions options=FileReadOptions::ALL) override
Open a file for reading.
std::vector< Kakshya::DataVariant > read_all() override
Read all data from the file into memory.
std::weak_ptr< Kakshya::VideoFileContainer > m_container_ref
std::shared_ptr< FFmpegDemuxContext > m_demux
std::shared_ptr< Registry::Service::IOService > m_io_service
bool seek(const std::vector< uint64_t > &position) override
Seek to a specific position in the file.
std::vector< std::string > get_supported_extensions() const override
Get supported file extensions for this reader.
std::atomic< bool > m_decode_active
bool seek_internal(const std::shared_ptr< FFmpegDemuxContext > &demux, const std::shared_ptr< VideoStreamContext > &video, uint64_t frame_position)
bool load_into_container(std::shared_ptr< Kakshya::SignalSourceContainer > container) override
Load file data into an existing container.
bool is_open() const override
Check if a file is currently open.
std::string get_last_error() const override
Get the last error message.
std::vector< uint64_t > get_read_position() const override
Get current read position in primary dimension.
std::shared_ptr< VideoStreamContext > m_video
std::optional< FileMetadata > get_metadata() const override
Get metadata from the open file.
std::shared_ptr< AudioStreamContext > m_audio
void close() override
Close the currently open file.
std::optional< FileMetadata > m_cached_metadata
std::shared_ptr< Kakshya::SignalSourceContainer > create_container() override
Create and initialize a container from the file.
std::shared_ptr< Kakshya::SoundFileContainer > m_audio_container
void setup_io_service(uint64_t reader_id=0)
Internal setup for IOService integration.
std::atomic< uint64_t > m_decode_head
bool can_read(const std::string &filepath) const override
Check if a file can be read by this reader.
std::type_index get_container_type() const override
Get the container type this reader creates.
void signal_decode()
Non-blocking signal to the background decode thread.
size_t get_num_dimensions() const override
Get the dimensionality of the file data.
std::vector< Kakshya::DataVariant > read_region(const FileRegion &region) override
Read a specific region of data.
std::vector< uint64_t > get_dimension_sizes() const override
Get size of each dimension in the file data.
File-backed video container — semantic marker over VideoStreamContainer.
uint8_t * mutable_slot_ptr(uint64_t frame_index)
Mutable pointer into m_data[0] for the decode thread to write into.
size_t get_frame_byte_size() const
Get the total byte size of one frame (width * height * channels).
void commit_frame(uint64_t frame_index)
Publish a decoded frame.
void register_service(ServiceFactory factory)
Register a backend service capability.
static BackendRegistry & instance()
Get the global registry instance.
void unregister_service()
Unregister a service.
@ DEINTERLEAVE
Output planar (per-channel) doubles instead of interleaved.
FileReadOptions
Generic options for file reading behavior.
@ EXTRACT_METADATA
Extract file metadata.
@ EXTRACT_REGIONS
Extract semantic regions (format-specific)
@ NONE
No special options.
@ FileIO
Filesystem I/O operations.
@ IO
Networking, file handling, streaming.
std::string mime_type
MIME type if applicable (e.g., "audio/wav")
Generic metadata structure for any file type.
Generic region descriptor for any file type.
Backend IO streaming service interface.
Definition IOService.hpp:18