MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches

◆ capture_operation()

void MayaFlux::Kriya::BufferPipeline::capture_operation ( BufferOperation op,
uint64_t  cycle 
)
private

Definition at line 308 of file BufferPipeline.cpp.

309{
310 bool should_process = op.m_capture.get_processing_control() == BufferCapture::ProcessingControl::ON_CAPTURE;
311 auto buffer_data = extract_buffer_data(op.m_capture.get_buffer(), should_process);
312
313 if (op.m_capture.m_data_ready_callback) {
314 op.m_capture.m_data_ready_callback(buffer_data, cycle);
315 }
316
317 auto capture_mode = op.m_capture.get_mode();
318
319 switch (capture_mode) {
321 m_operation_data[&op] = buffer_data;
322 break;
323
325 auto it = m_operation_data.find(&op);
326 if (it == m_operation_data.end()) {
327 m_operation_data[&op] = buffer_data;
328 } else {
329 try {
330 auto& existing = std::get<std::vector<double>>(it->second);
331 const auto& new_data = std::get<std::vector<double>>(buffer_data);
332 existing.insert(existing.end(), new_data.begin(), new_data.end());
333
334 } catch (const std::bad_variant_access& e) {
337 "Data type mismatch during ACCUMULATE capture: {}",
338 e.what());
339 m_operation_data[&op] = buffer_data;
340 }
341 }
342 break;
343 }
345 uint32_t circular_size = op.m_capture.get_circular_size();
346 if (circular_size == 0) {
347 circular_size = 4096;
348 }
349
350 auto it = m_operation_data.find(&op);
351 if (it == m_operation_data.end()) {
352 m_operation_data[&op] = buffer_data;
353 } else {
354 try {
355 auto& circular = std::get<std::vector<double>>(it->second);
356 const auto& new_data = std::get<std::vector<double>>(buffer_data);
357
358 circular.insert(circular.end(), new_data.begin(), new_data.end());
359
360 if (circular.size() > circular_size) {
361 circular.erase(circular.begin(),
362 circular.begin() + static_cast<int64_t>(circular.size() - circular_size));
363 }
364
365 } catch (const std::bad_variant_access& e) {
368 "Data type mismatch during CIRCULAR capture: {}",
369 e.what());
370 m_operation_data[&op] = buffer_data;
371 } catch (std::exception& e) {
374 std::source_location::current(),
375 "Error during CIRCULAR capture: {}",
376 e.what());
377 m_operation_data[&op] = buffer_data;
378 }
379 }
380 break;
381 }
383 uint32_t window_size = op.m_capture.get_window_size();
384 float overlap_ratio = op.m_capture.get_overlap_ratio();
385
386 if (window_size == 0) {
387 window_size = 512;
388 }
389
390 auto hop_size = static_cast<uint32_t>((float)window_size * (1.0F - overlap_ratio));
391 if (hop_size == 0)
392 hop_size = 1;
393
394 auto it = m_operation_data.find(&op);
395 if (it == m_operation_data.end()) {
396 m_operation_data[&op] = buffer_data;
397 } else {
398 try {
399 auto& windowed = std::get<std::vector<double>>(it->second);
400 const auto& new_data = std::get<std::vector<double>>(buffer_data);
401
402 if (windowed.size() >= window_size) {
403 if (hop_size >= windowed.size()) {
404 windowed = std::get<std::vector<double>>(buffer_data);
405 } else {
406 windowed.erase(windowed.begin(),
407 windowed.begin() + hop_size);
408
409 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
410
411 if (windowed.size() > window_size) {
412 size_t excess = windowed.size() - window_size;
413 windowed.erase(windowed.begin(),
414 windowed.begin() + excess);
415 }
416 }
417 } else {
418 windowed.insert(windowed.end(), new_data.begin(), new_data.end());
419
420 if (windowed.size() > window_size) {
421 size_t excess = windowed.size() - window_size;
422 windowed.erase(windowed.begin(),
423 windowed.begin() + excess);
424 }
425 }
426
427 } catch (const std::bad_variant_access& e) {
429 "Data type mismatch during WINDOWED capture: {}", e.what());
430 m_operation_data[&op] = buffer_data;
431 }
432 }
433 break;
434 }
435
437 if (op.m_capture.m_stop_condition && op.m_capture.m_stop_condition()) {
438 m_operation_data[&op] = buffer_data;
439 }
440 break;
441 }
442
443 default:
444 m_operation_data[&op] = buffer_data;
445 break;
446 }
447
448 if (has_immediate_routing(op)) {
449 auto current_it = std::ranges::find_if(m_operations,
450 [&op](const BufferOperation& o) { return &o == &op; });
451
452 if (current_it != m_operations.end()) {
453 auto next_it = std::next(current_it);
454 if (next_it != m_operations.end() && next_it->get_type() == BufferOperation::OpType::ROUTE) {
455
456 if (next_it->m_target_buffer) {
457 if (!m_buffer_manager) {
458 error<std::invalid_argument>(Journal::Component::Kriya,
460 std::source_location::current(),
461 "BufferPipeline has no BufferManager for immediate ROUTE-to-buffer");
462 }
463
464 if (!next_it->m_attached_processor) {
465 auto writer = std::make_shared<Buffers::AudioWriteProcessor>();
466 m_buffer_manager->add_processor(writer, next_it->m_target_buffer,
468 next_it->m_attached_processor = writer;
469 }
470
471 std::static_pointer_cast<Buffers::AudioWriteProcessor>(next_it->m_attached_processor)
472 ->set_data(buffer_data);
473 } else if (next_it->m_target_container) {
474 write_to_container(next_it->m_target_container, buffer_data);
475 }
476
477 size_t route_index = std::distance(m_operations.begin(), next_it);
478 if (route_index < m_data_states.size()) {
479 m_data_states[route_index] = DataState::CONSUMED;
480 }
481 }
482 }
483 }
484}
#define MF_ERROR(comp, ctx,...)
Cycle Behavior: The for_cycles(N) configuration controls how many times the capture operation execute...
@ TRANSIENT
Single cycle capture (default) - data expires after 1 cycle.
@ CIRCULAR
Circular buffer with overwrite.
@ ACCUMULATE
Accumulate over multiple cycles in container.
@ WINDOWED
Rolling window capture with overlap.
@ TRIGGERED
Capture only when condition met.
@ ROUTE
Route data to destination (buffer or container)
std::unordered_map< BufferOperation *, Kakshya::DataVariant > m_operation_data
static void write_to_container(const std::shared_ptr< Kakshya::DynamicSoundStream > &container, const Kakshya::DataVariant &data)
std::vector< BufferOperation > m_operations
std::vector< DataState > m_data_states
std::shared_ptr< Buffers::BufferManager > m_buffer_manager
bool has_immediate_routing(const BufferOperation &op) const
static Kakshya::DataVariant extract_buffer_data(const std::shared_ptr< Buffers::AudioBuffer > &buffer, bool should_process=false)
@ AUDIO_BACKEND
Standard audio processing backend configuration.
@ CoroutineScheduling
Coroutine scheduling and temporal coordination (Vruta::TaskScheduler)
void error_rethrow(Component component, Context context, std::source_location location=std::source_location::current(), std::string_view additional_context="")
Catch and log an exception, then rethrow it.
@ Kriya
Automatable tasks and fluent scheduling api for Nodes and Buffers.

References MayaFlux::Kriya::BufferCapture::ACCUMULATE, MayaFlux::Buffers::AUDIO_BACKEND, MayaFlux::Kriya::BufferCapture::CIRCULAR, CONSUMED, MayaFlux::Journal::CoroutineScheduling, extract_buffer_data(), MayaFlux::Kriya::BufferCapture::get_buffer(), MayaFlux::Kriya::BufferCapture::get_circular_size(), MayaFlux::Kriya::BufferCapture::get_mode(), MayaFlux::Kriya::BufferCapture::get_overlap_ratio(), MayaFlux::Kriya::BufferCapture::get_processing_control(), MayaFlux::Kriya::BufferCapture::get_window_size(), has_immediate_routing(), MayaFlux::Journal::Kriya, m_buffer_manager, MayaFlux::Kriya::BufferOperation::m_capture, MayaFlux::Kriya::BufferCapture::m_data_ready_callback, m_data_states, m_operation_data, m_operations, MayaFlux::Kriya::BufferCapture::m_stop_condition, MF_ERROR, MayaFlux::Kriya::BufferCapture::ON_CAPTURE, MayaFlux::Kriya::BufferOperation::ROUTE, MayaFlux::Kriya::BufferCapture::TRANSIENT, MayaFlux::Kriya::BufferCapture::TRIGGERED, MayaFlux::Kriya::BufferCapture::WINDOWED, and write_to_container().

Referenced by process_operation().

+ Here is the call graph for this function:
+ Here is the caller graph for this function: