MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
VideoFileReader.cpp
Go to the documentation of this file.
1#include "VideoFileReader.hpp"
3
6
9
10extern "C" {
11#include <cstddef>
12#include <libavcodec/avcodec.h>
13#include <libavformat/avformat.h>
14#include <libswscale/swscale.h>
15}
16
17namespace MayaFlux::IO {
18
19// =========================================================================
20// Construction / destruction
21// =========================================================================
22
24
25void VideoFileReader::setup_io_service(uint64_t reader_id)
26{
27 m_reader_id = reader_id;
28
30 .get_service<Registry::Service::IOService>()) {
31
32 m_io_service = std::make_shared<Registry::Service::IOService>();
33 m_io_service->request_decode = [this](uint64_t reader_id) {
34 if (reader_id == m_reader_id)
36 };
37
40 [this]() -> void* { return m_io_service.get(); });
41
42 m_owns_io_service = true;
43 }
44}
45
46void VideoFileReader::setup_io_service(const std::shared_ptr<Registry::Service::IOService>& io_service, uint64_t reader_id)
47{
48 m_io_service = io_service;
49 m_reader_id = reader_id;
50 m_owns_io_service = false;
51}
52
57
58bool VideoFileReader::can_read(const std::string& filepath) const
59{
60 static const std::vector<std::string> exts = {
61 "mp4", "mkv", "avi", "mov", "webm", "flv", "wmv", "m4v", "ts", "mts"
62 };
63 auto dot = filepath.rfind('.');
64 if (dot == std::string::npos)
65 return false;
66 std::string ext = filepath.substr(dot + 1);
67 std::ranges::transform(ext, ext.begin(), ::tolower);
68 return std::ranges::find(exts, ext) != exts.end();
69}
70
71bool VideoFileReader::open(const std::string& filepath, FileReadOptions options)
72{
73 close();
74
75 auto resolved = resolve_path(filepath);
76 m_filepath = resolved;
77 m_options = options;
78
79 auto demux = std::make_shared<FFmpegDemuxContext>();
80 if (!demux->open(resolved)) {
81 set_error(demux->last_error());
82 return false;
83 }
84
85 auto video = std::make_shared<VideoStreamContext>();
86 if (!video->open(*demux, m_target_width, m_target_height)) {
87 set_error(video->last_error());
88 return false;
89 }
90
91 std::shared_ptr<AudioStreamContext> audio;
94
95 audio = std::make_shared<AudioStreamContext>();
96 if (!audio->open(*demux, planar, m_target_sample_rate)) {
98 "VideoFileReader: no audio stream found or audio open failed");
99 audio.reset();
100 }
101 }
102
103 {
104 std::unique_lock lock(m_context_mutex);
105 m_demux = std::move(demux);
106 m_video = std::move(video);
107 m_audio = std::move(audio);
108 }
109
114
115 return true;
116}
117
119{
121 m_container_ref.reset();
122
123 std::unique_lock ctx_lock(m_context_mutex);
124
125 if (m_audio) {
126 m_audio->close();
127 m_audio.reset();
128 }
129 if (m_video) {
130 m_video->close();
131 m_video.reset();
132 }
133 if (m_demux) {
134 m_demux->close();
135 m_demux.reset();
136 }
137
138 m_audio_container.reset();
139 m_sws_buf.clear();
140 m_sws_buf.shrink_to_fit();
141
142 {
143 std::lock_guard lock(m_metadata_mutex);
144 m_cached_metadata.reset();
145 m_cached_regions.clear();
146 }
147
148 m_decode_head.store(0);
149 clear_error();
150
151 if (m_owns_io_service) {
154 m_io_service.reset();
155 m_owns_io_service = false;
156 }
157}
158
160{
161 std::shared_lock lock(m_context_mutex);
162 return m_demux && m_video && m_video->is_valid();
163}
164
165// =========================================================================
166// Metadata / regions
167// =========================================================================
168
169std::optional<FileMetadata> VideoFileReader::get_metadata() const
170{
171 std::lock_guard lock(m_metadata_mutex);
172 return m_cached_metadata;
173}
174
175std::vector<FileRegion> VideoFileReader::get_regions() const
176{
177 std::lock_guard lock(m_metadata_mutex);
178 return m_cached_regions;
179}
180
182 const std::shared_ptr<FFmpegDemuxContext>& demux,
183 const std::shared_ptr<VideoStreamContext>& video) const
184{
185 FileMetadata meta;
186 meta.mime_type = "video";
187 demux->extract_container_metadata(meta);
188 video->extract_stream_metadata(*demux, meta);
189
190 std::lock_guard lock(m_metadata_mutex);
191 m_cached_metadata = std::move(meta);
192}
193
195 const std::shared_ptr<FFmpegDemuxContext>& demux,
196 const std::shared_ptr<VideoStreamContext>& video) const
197{
198 std::vector<FileRegion> regions;
199
200 auto chapters = demux->extract_chapter_regions();
201 regions.insert(regions.end(),
202 std::make_move_iterator(chapters.begin()),
203 std::make_move_iterator(chapters.end()));
204
205 auto keyframes = video->extract_keyframe_regions(*demux);
206 regions.insert(regions.end(),
207 std::make_move_iterator(keyframes.begin()),
208 std::make_move_iterator(keyframes.end()));
209
210 std::lock_guard lock(m_metadata_mutex);
211 m_cached_regions = std::move(regions);
212}
213
215{
216 return typeid(Kakshya::VideoFileContainer);
217}
218
219// =========================================================================
220// FileReader interface
221// =========================================================================
222
223std::vector<Kakshya::DataVariant> VideoFileReader::read_all()
224{
226 "VideoFileReader::read_all() is not supported; "
227 "use create_container() + load_into_container()");
228 return {};
229}
230
231std::vector<Kakshya::DataVariant> VideoFileReader::read_region(const FileRegion& /*region*/)
232{
234 "VideoFileReader::read_region() is not supported; "
235 "use the container API to access regions");
236 return {};
237}
238
239// =========================================================================
240// Container operations
241// =========================================================================
242
243std::shared_ptr<Kakshya::SignalSourceContainer> VideoFileReader::create_container()
244{
245 std::shared_lock lock(m_context_mutex);
246 if (!m_demux || !m_video) {
247 set_error("File not open");
248 return nullptr;
249 }
250 return std::make_shared<Kakshya::VideoFileContainer>();
251}
252
254 std::shared_ptr<Kakshya::SignalSourceContainer> container)
255{
256 if (!container) {
257 set_error("Invalid container");
258 return false;
259 }
260
261 auto vc = std::dynamic_pointer_cast<Kakshya::VideoFileContainer>(container);
262 if (!vc) {
263 set_error("Container is not a VideoFileContainer");
264 return false;
265 }
266
267 std::shared_ptr<VideoStreamContext> video;
268 std::shared_ptr<AudioStreamContext> audio;
269 std::shared_ptr<FFmpegDemuxContext> demux;
270 {
271 std::shared_lock lock(m_context_mutex);
272 if (!m_demux || !m_video) {
273 set_error("File not open");
274 return false;
275 }
276 video = m_video;
277 audio = m_audio;
278 demux = m_demux;
279 }
280
281 vc->set_source_path(m_filepath);
282 if (m_demux && m_demux->format_context)
283 vc->set_source_format(m_demux->format_context->iformat->name);
284
285 const uint64_t total = video->total_frames;
286 if (total == 0) {
287 set_error("Video stream reports 0 frames");
288 return false;
289 }
290
291 const uint32_t ring_cap = std::min(
293 static_cast<uint32_t>(total));
294
295 const uint32_t threshold = (m_refill_threshold > 0)
297 : ring_cap / 4;
298
299 vc->setup_ring(total, ring_cap,
300 video->out_width, video->out_height,
301 video->out_bytes_per_pixel, video->frame_rate,
302 threshold, m_reader_id);
303
304 m_sws_buf.resize(
305 static_cast<size_t>(video->out_linesize) * video->out_height);
306
309 && demux->find_best_stream(AVMEDIA_TYPE_AUDIO) >= 0;
310
311 if (want_audio && audio && audio->is_valid()) {
312 {
313 std::unique_lock lock(m_context_mutex);
314 demux->seek(audio->stream_index, 0);
315 audio->flush_codec();
316 audio->drain_resampler_init();
317 }
318
319 SoundFileReader audio_reader;
322
323 if (audio_reader.open_from_demux(demux, audio, m_filepath, m_options)) {
324 auto sc = audio_reader.create_container();
325 if (audio_reader.load_into_container(sc)) {
326 m_audio_container = std::dynamic_pointer_cast<Kakshya::SoundFileContainer>(sc);
327 } else {
329 "VideoFileReader: audio load failed: {}",
330 audio_reader.get_last_error());
331 }
332 } else {
334 "VideoFileReader: open_from_demux failed: {}",
335 audio_reader.get_last_error());
336 }
337
338 {
339 std::unique_lock lock(m_context_mutex);
340 demux->seek(video->stream_index, 0);
341 video->flush_codec();
342 }
343 }
344
345 m_decode_head.store(0);
346 m_container_ref = vc;
347
348 const uint64_t preload = std::min(
349 static_cast<uint64_t>(ring_cap),
350 total);
351
352 uint64_t decoded = decode_batch(*vc, preload);
353
354 if (decoded == 0) {
355 set_error("Failed to decode any frames during preload");
356 return false;
357 }
358
360 "VideoFileReader: preloaded {}/{} frames ({}x{}, {:.1f} fps, ring={})",
361 decoded, total,
362 video->out_width, video->out_height,
363 video->frame_rate, ring_cap);
364
365 auto regions = get_regions();
366 auto region_groups = regions_to_groups(regions);
367 for (const auto& [name, group] : region_groups)
368 vc->add_region_group(group);
369
370 vc->create_default_processor();
371 vc->mark_ready_for_processing(true);
372
373 if (decoded < total)
375
376 return true;
377}
378
380 Kakshya::VideoFileContainer& vc, uint64_t batch_size)
381{
382 std::shared_lock ctx_lock(m_context_mutex);
383 if (!m_demux || !m_video || !m_video->is_valid())
384 return 0;
385
386 const size_t required = static_cast<size_t>(m_video->out_linesize) * m_video->out_height;
387 if (m_sws_buf.size() < required)
388 m_sws_buf.resize(required);
389
390 const size_t frame_bytes = vc.get_frame_byte_size();
391 const int packed_stride = static_cast<int>(
392 m_video->out_width * m_video->out_bytes_per_pixel);
393
394 uint64_t decoded = 0;
395
396 AVPacket* pkt = av_packet_alloc();
397 AVFrame* frame = av_frame_alloc();
398 if (!pkt || !frame) {
399 av_packet_free(&pkt);
400 av_frame_free(&frame);
401 return 0;
402 }
403
404 uint8_t* sws_dst[1] = { m_sws_buf.data() };
405 int sws_stride[1] = { m_video->out_linesize };
406
407 auto write_frame_to_ring = [&]() -> bool {
408 uint64_t idx = m_decode_head.load();
409 if (idx >= vc.get_total_source_frames())
410 return false;
411
412 uint8_t* dest = vc.mutable_slot_ptr(idx);
413 if (!dest)
414 return false;
415
416 sws_scale(m_video->sws_context,
417 frame->data, frame->linesize,
418 0, static_cast<int>(m_video->height),
419 sws_dst, sws_stride);
420
421 if (m_video->out_linesize == packed_stride) {
422 std::memcpy(dest, m_sws_buf.data(), frame_bytes);
423 } else {
424 for (uint32_t row = 0; row < m_video->out_height; ++row) {
425 std::memcpy(
426 dest + static_cast<size_t>(row) * packed_stride,
427 m_sws_buf.data() + static_cast<size_t>(row) * m_video->out_linesize,
428 static_cast<size_t>(packed_stride));
429 }
430 }
431
432 vc.commit_frame(idx);
433 m_decode_head.fetch_add(1);
434 ++decoded;
435
436 av_frame_unref(frame);
437 return true;
438 };
439
440 while (decoded < batch_size) {
441 int ret = av_read_frame(m_demux->format_context, pkt);
442
443 if (ret < 0) {
444 if (ret == AVERROR_EOF) {
445 avcodec_send_packet(m_video->codec_context, nullptr);
446 } else {
447 break;
448 }
449 } else if (pkt->stream_index != m_video->stream_index) {
450 av_packet_unref(pkt);
451 continue;
452 } else {
453 ret = avcodec_send_packet(m_video->codec_context, pkt);
454 av_packet_unref(pkt);
455 if (ret < 0 && ret != AVERROR(EAGAIN))
456 continue;
457 }
458
459 while (decoded < batch_size) {
460 ret = avcodec_receive_frame(m_video->codec_context, frame);
461 if (ret == AVERROR(EAGAIN))
462 break;
463 if (ret == AVERROR_EOF)
464 goto done;
465 if (ret < 0) {
466 av_frame_unref(frame);
467 break;
468 }
469
470 if (!write_frame_to_ring())
471 goto done;
472 }
473
474 if (ret == AVERROR_EOF)
475 break;
476 }
477
478done:
479 av_packet_free(&pkt);
480 av_frame_free(&frame);
481 return decoded;
482}
483
484// =========================================================================
485// Background decode thread
486// =========================================================================
487
489{
491
492 m_decode_stop.store(false);
493 m_decode_active.store(true);
495}
496
498{
499 if (!m_decode_active.load())
500 return;
501
502 m_decode_stop.store(true);
503 m_decode_cv.notify_all();
504
505 if (m_decode_thread.joinable())
506 m_decode_thread.join();
507
508 m_decode_active.store(false);
509}
510
512{
513 auto vc = m_container_ref.lock();
514 if (!vc) {
516 "VideoFileReader: decode thread — container expired");
517 m_decode_active.store(false);
518 return;
519 }
520
521 const uint64_t total = vc->get_total_source_frames();
522 const uint32_t ring_cap = vc->get_ring_capacity();
523 const uint32_t threshold = (m_refill_threshold > 0)
525 : ring_cap / 4;
526
527 while (!m_decode_stop.load()) {
528 uint64_t head = m_decode_head.load();
529 const uint64_t read_pos = vc->get_read_position()[0];
530
531 if (head >= total)
532 break;
533
534 const uint64_t buffered = (head > read_pos) ? (head - read_pos) : 0;
535
536 if (buffered >= static_cast<uint64_t>(ring_cap)) {
537 std::unique_lock lock(m_decode_mutex);
538 m_decode_cv.wait_for(lock, std::chrono::milliseconds(50), [&] {
539 if (m_decode_stop.load())
540 return true;
541 const uint64_t h = m_decode_head.load(std::memory_order_acquire);
542 const uint64_t rp = vc->get_read_position()[0];
543 const uint64_t ahead = (h > rp) ? (h - rp) : 0;
544 return ahead <= static_cast<uint64_t>(ring_cap - threshold);
545 });
546 continue;
547 }
548
549 const uint64_t want = static_cast<uint64_t>(ring_cap) - buffered;
550 const uint64_t capped = std::min(want, total - head);
551 const uint64_t batch = std::min(capped,
552 static_cast<uint64_t>(m_decode_batch_size));
553
554 uint64_t decoded = decode_batch(*vc, batch);
555
556 if (decoded == 0)
557 break;
558 }
559
560 m_decode_active.store(false);
561}
562
563// =========================================================================
564// Seeking
565// =========================================================================
566
567std::vector<uint64_t> VideoFileReader::get_read_position() const
568{
569 return { m_decode_head.load() };
570}
571
572bool VideoFileReader::seek(const std::vector<uint64_t>& position)
573{
574 if (position.empty())
575 return false;
576
577 const uint64_t target_frame = position[0];
578
580
581 std::shared_ptr<VideoStreamContext> video;
582 std::shared_ptr<FFmpegDemuxContext> demux;
583 {
584 std::shared_lock lock(m_context_mutex);
585 if (!m_demux || !m_video || !m_video->is_valid()) {
586 set_error("Cannot seek: reader not open");
587 return false;
588 }
589 video = m_video;
590 demux = m_demux;
591 }
592
593 if (!seek_internal(demux, video, target_frame))
594 return false;
595
596 m_decode_head.store(target_frame);
597
598 auto vc = m_container_ref.lock();
599 if (!vc)
600 return true;
601
602 vc->invalidate_ring();
603 vc->set_read_position({ target_frame });
604
605 const uint64_t total = vc->get_total_source_frames();
606 const uint64_t batch = std::min(
607 static_cast<uint64_t>(m_decode_batch_size),
608 total > target_frame ? total - target_frame : 0UL);
609
610 decode_batch(*vc, batch);
611
612 if (m_decode_head.load() < total)
614
615 return true;
616}
617
619 const std::shared_ptr<FFmpegDemuxContext>& demux,
620 const std::shared_ptr<VideoStreamContext>& video,
621 uint64_t frame_position)
622{
623 if (frame_position > video->total_frames)
624 frame_position = video->total_frames;
625
626 if (video->frame_rate <= 0.0) {
627 set_error("Invalid frame rate for seeking");
628 return false;
629 }
630
631 AVStream* stream = demux->get_stream(video->stream_index);
632 if (!stream) {
633 set_error("Invalid stream index");
634 return false;
635 }
636
637 double target_seconds = static_cast<double>(frame_position) / video->frame_rate;
638 auto ts = static_cast<int64_t>(target_seconds / av_q2d(stream->time_base));
639
640 if (!demux->seek(video->stream_index, ts)) {
641 set_error(demux->last_error());
642 return false;
643 }
644
645 video->flush_codec();
646 return true;
647}
648
650{
651 m_decode_cv.notify_one();
652}
653
654// =========================================================================
655// Dimension queries
656// =========================================================================
657
659{
660 return 4;
661}
662
663std::vector<uint64_t> VideoFileReader::get_dimension_sizes() const
664{
665 std::shared_lock lock(m_context_mutex);
666 if (!m_video)
667 return { 0, 0, 0, 0 };
668 return {
669 m_video->total_frames,
670 m_video->out_height,
671 m_video->out_width,
672 m_video->out_bytes_per_pixel
673 };
674}
675
676std::vector<std::string> VideoFileReader::get_supported_extensions() const
677{
678 return { "mp4", "mkv", "avi", "mov", "webm", "flv", "wmv", "m4v", "ts", "mts" };
679}
680
681// =========================================================================
682// Error
683// =========================================================================
684
686{
687 std::lock_guard lock(m_error_mutex);
688 return m_last_error;
689}
690
691void VideoFileReader::set_error(const std::string& msg) const
692{
693 std::lock_guard lock(m_error_mutex);
694 m_last_error = msg;
696 "VideoFileReader: {}", msg);
697}
698
700{
701 std::lock_guard lock(m_error_mutex);
702 m_last_error.clear();
703}
704
705} // namespace MayaFlux::IO
#define MF_INFO(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
uint32_t h
Definition InkPress.cpp:25
glm::vec3 position
static std::string resolve_path(const std::string &filepath)
Resolve a filepath against the project source root if not found as-is.
static std::unordered_map< std::string, Kakshya::RegionGroup > regions_to_groups(const std::vector< FileRegion > &regions)
Convert file regions to region groups.
std::string get_last_error() const override
Get the last error message encountered by the reader.
bool load_into_container(std::shared_ptr< Kakshya::SignalSourceContainer > container) override
Load file data into an existing SignalSourceContainer.
bool open_from_demux(std::shared_ptr< FFmpegDemuxContext > demux, std::shared_ptr< AudioStreamContext > audio, const std::string &filepath, FileReadOptions options=FileReadOptions::ALL)
Open an audio stream from an already-constructed demux and stream context.
void set_audio_options(AudioReadOptions options)
Set audio-specific read options.
std::shared_ptr< Kakshya::SignalSourceContainer > create_container() override
Create a SignalSourceContainer for this file.
void set_target_sample_rate(uint32_t sample_rate)
Set the target sample rate for resampling.
FFmpeg-based audio file reader for MayaFlux.
std::condition_variable m_decode_cv
uint64_t decode_batch(Kakshya::VideoFileContainer &vc, uint64_t batch_size)
Decode up to batch_size frames starting at m_decode_head.
void set_error(const std::string &msg) const
std::vector< FileRegion > m_cached_regions
std::vector< FileRegion > get_regions() const override
Get semantic regions from the file.
void build_metadata(const std::shared_ptr< FFmpegDemuxContext > &demux, const std::shared_ptr< VideoStreamContext > &video) const
std::vector< uint8_t > m_sws_buf
One-frame sws scratch buffer (padded linesize, reused by decode thread).
void build_regions(const std::shared_ptr< FFmpegDemuxContext > &demux, const std::shared_ptr< VideoStreamContext > &video) const
bool open(const std::string &filepath, FileReadOptions options=FileReadOptions::ALL) override
Open a file for reading.
std::vector< Kakshya::DataVariant > read_all() override
Read all data from the file into memory.
std::weak_ptr< Kakshya::VideoFileContainer > m_container_ref
std::shared_ptr< FFmpegDemuxContext > m_demux
std::shared_ptr< Registry::Service::IOService > m_io_service
bool seek(const std::vector< uint64_t > &position) override
Seek to a specific position in the file.
std::vector< std::string > get_supported_extensions() const override
Get supported file extensions for this reader.
std::atomic< bool > m_decode_active
bool seek_internal(const std::shared_ptr< FFmpegDemuxContext > &demux, const std::shared_ptr< VideoStreamContext > &video, uint64_t frame_position)
bool load_into_container(std::shared_ptr< Kakshya::SignalSourceContainer > container) override
Load file data into an existing container.
bool is_open() const override
Check if a file is currently open.
std::string get_last_error() const override
Get the last error message.
std::vector< uint64_t > get_read_position() const override
Get current read position in primary dimension.
std::shared_ptr< VideoStreamContext > m_video
std::optional< FileMetadata > get_metadata() const override
Get metadata from the open file.
std::shared_ptr< AudioStreamContext > m_audio
void close() override
Close the currently open file.
std::optional< FileMetadata > m_cached_metadata
std::shared_ptr< Kakshya::SignalSourceContainer > create_container() override
Create and initialize a container from the file.
std::shared_ptr< Kakshya::SoundFileContainer > m_audio_container
void setup_io_service(uint64_t reader_id=0)
Internal setup for IOService integration.
std::atomic< uint64_t > m_decode_head
bool can_read(const std::string &filepath) const override
Check if a file can be read by this reader.
std::type_index get_container_type() const override
Get the container type this reader creates.
void signal_decode()
Non-blocking signal to the background decode thread.
size_t get_num_dimensions() const override
Get the dimensionality of the file data.
std::vector< Kakshya::DataVariant > read_region(const FileRegion &region) override
Read a specific region of data.
std::vector< uint64_t > get_dimension_sizes() const override
Get size of each dimension in the file data.
File-backed video container — semantic marker over VideoStreamContainer.
uint8_t * mutable_slot_ptr(uint64_t frame_index)
Mutable pointer into m_data[0] for the decode thread to write into.
size_t get_frame_byte_size() const
Get the total byte size of one frame (width * height * channels).
void commit_frame(uint64_t frame_index)
Publish a decoded frame.
void register_service(ServiceFactory factory)
Register a backend service capability.
static BackendRegistry & instance()
Get the global registry instance.
void unregister_service()
Unregister a service.
@ DEINTERLEAVE
Output planar (per-channel) doubles instead of interleaved.
FileReadOptions
Generic options for file reading behavior.
@ EXTRACT_METADATA
Extract file metadata.
@ EXTRACT_REGIONS
Extract semantic regions (format-specific)
@ NONE
No special options.
@ FileIO
Filesystem I/O operations.
@ IO
Networking, file handling, streaming.
std::string mime_type
MIME type if applicable (e.g., "audio/wav")
Generic metadata structure for any file type.
Generic region descriptor for any file type.
Backend IO streaming service interface.
Definition IOService.hpp:18