10 : m_clock(default_sample_rate)
11 , m_cleanup_threshold(512)
12 , m_registered_sample_rate(default_sample_rate)
13 , m_registered_frame_rate(default_frame_rate)
39 routine->set_auto_resume(
true);
41 bool expected =
false;
42 if (op.active.compare_exchange_strong(expected,
true,
43 std::memory_order_relaxed, std::memory_order_relaxed)) {
44 op.is_addition =
true;
45 op.entry = { routine, task_name };
52 "Conditional pending queue full, could not add task '{}'", task_name);
57 bool expected =
false;
58 if (op.active.compare_exchange_strong(expected,
true,
59 std::memory_order_acquire, std::memory_order_relaxed)) {
60 op.entry = { routine, task_name };
61 op.is_addition =
true;
68 "Pending task queue full, could not add task '{}'", task_name);
74 bool expected =
false;
75 if (op.active.compare_exchange_strong(expected,
true,
76 std::memory_order_acquire, std::memory_order_relaxed)) {
77 op.entry = {
nullptr, name };
78 op.is_addition =
false;
85 bool expected =
false;
86 if (op.active.compare_exchange_strong(expected,
true,
87 std::memory_order_acquire, std::memory_order_relaxed)) {
88 op.entry = {
nullptr, name };
89 op.is_addition =
false;
96 "Pending task queue full, could not cancel task '{}'", name);
114 const std::string name = (it !=
m_tasks.end()) ? it->name :
"";
117 bool expected =
false;
118 if (op.active.compare_exchange_strong(expected,
true,
119 std::memory_order_acquire, std::memory_order_relaxed)) {
120 op.entry = { routine, name };
121 op.is_addition =
false;
128 "Pending task queue full, could not cancel task '{}'", name);
137 if (cit->routine && cit->routine->is_active())
138 cit->routine->restart();
144 if (it->routine && it->routine->is_active()) {
145 it->routine->restart();
159 return (it !=
m_tasks.end()) ? it->routine :
nullptr;
165 std::vector<std::shared_ptr<Routine>> result;
168 result.push_back(e.routine);
173 std::vector<std::shared_ptr<Routine>> result;
174 for (
const auto& entry :
m_tasks) {
175 if (entry.routine && entry.routine->get_processing_token() == token) {
176 result.push_back(entry.routine);
184 std::vector<std::string> names;
185 for (
const auto& entry :
m_tasks) {
187 names.push_back(entry.name);
195 std::vector<std::string> names;
198 names.push_back(e.name);
203 std::vector<std::string> names;
204 for (
const auto& entry :
m_tasks) {
205 if (entry.routine && entry.routine->get_processing_token() == token)
206 names.push_back(entry.name);
224 processor_it->second(tasks, processing_units);
235 static uint64_t cleanup_counter = 0;
247 static uint64_t cleanup_counter = 0;
269 return *clock_it->second;
274 return *audio_clock_it->second;
278 "No clock found for token {}, and no sample-accurate clock available as fallback",
static_cast<int>(token));
283 unsigned int rate =
get_rate(token);
284 return static_cast<uint64_t
>(seconds * rate);
295 return clock.current_position();
302 return clock_it->second->rate();
310 return std::ranges::any_of(
m_tasks,
312 return entry.
routine && entry.
routine->is_active() && entry.
routine->get_processing_token() == token;
328 return std::ranges::find_if(
m_tasks,
329 [&name](
const TaskEntry& entry) {
return entry.
name == name; });
334 return std::ranges::find_if(
m_tasks,
335 [&name](
const TaskEntry& entry) {
return entry.
name == name; });
340 return std::ranges::find_if(
m_tasks,
375 m_token_clocks[token] = std::make_shared<FrameClock>(domain_rate);
380 m_token_clocks[token] = std::make_shared<SampleClock>(domain_rate);
395 auto& clock = *clock_it->second;
396 clock.tick(processing_units);
400 auto& clock = *clock_it->second;
402 for (uint64_t i = 0; i < processing_units; i++) {
403 uint64_t current_context = clock.current_position();
405 for (
auto& routine : tasks) {
406 if (routine && routine->is_active()) {
407 if (routine->requires_clock_sync()) {
408 if (current_context >= routine->next_execution()) {
439 uint64_t current_context = clock_it->second->current_position();
440 return routine->initialize_state(current_context);
448#if MAYAFLUX_USE_JTHREAD
450 while (!st.stop_requested())
465 if (entry.routine && entry.routine->is_active()) {
466 bool current_auto_resume = entry.routine->get_auto_resume();
467 entry.routine->set_state<
bool>(
"was_auto_resume", current_auto_resume);
468 entry.routine->set_auto_resume(
false);
473 if (entry.routine && entry.routine->is_active()) {
474 bool current_auto_resume = entry.routine->get_auto_resume();
475 entry.routine->set_state<
bool>(
"was_auto_resume", current_auto_resume);
476 entry.routine->set_auto_resume(
false);
484 if (entry.routine && entry.routine->is_active()) {
485 auto was_auto_resume = entry.routine->get_state<
bool>(
"was_auto_resume");
486 if (was_auto_resume) {
487 entry.routine->set_auto_resume(*was_auto_resume);
489 entry.routine->set_auto_resume(
true);
495 if (entry.routine && entry.routine->is_active()) {
496 auto was_auto_resume = entry.routine->get_state<
bool>(
"was_auto_resume");
497 if (was_auto_resume) {
498 entry.routine->set_auto_resume(*was_auto_resume);
500 entry.routine->set_auto_resume(
true);
511 if (entry.routine && entry.routine->is_active()) {
512 entry.routine->set_should_terminate(
true);
517 if (entry.routine && entry.routine->is_active()) {
518 entry.routine->force_resume();
522 constexpr int MAX_ATTEMPTS = 3;
523 for (
int attempt = 0; attempt < MAX_ATTEMPTS; ++attempt) {
524 std::this_thread::sleep_for(std::chrono::milliseconds(5));
526 bool any_active =
false;
528 if (entry.routine && entry.routine->is_active()) {
530 entry.routine->force_resume();
539 bool all_done =
true;
540 for (
const auto& entry :
m_tasks) {
541 if (entry.routine && entry.routine->is_active()) {
544 "Coroutine '{}' stuck after {} attempts - forcing destruction",
545 entry.name, MAX_ATTEMPTS);
551 "Some coroutines did not complete - forcing destruction");
554 "All coroutines terminated successfully");
559 if (entry.routine && entry.routine->is_active())
560 entry.routine->set_should_terminate(
true);
563 if (entry.routine && entry.routine->is_active())
564 entry.routine->force_resume();
578 for (
auto& task : tasks) {
579 if (task && task->is_active()) {
580 if (task->requires_clock_sync()) {
597 if (!op.active.load(std::memory_order_acquire))
600 if (op.is_addition) {
602 if (existing_it !=
m_tasks.end()) {
603 if (existing_it->routine && existing_it->routine->is_active())
604 existing_it->routine->set_should_terminate(
true);
607 m_tasks.push_back(std::move(op.entry));
611 if (it->routine && it->routine->is_active())
612 it->routine->set_should_terminate(
true);
617 op.entry = {
nullptr,
"" };
618 op.active.store(
false, std::memory_order_release);
629 if (!op.active.load(std::memory_order_acquire))
632 if (op.is_addition) {
634 [&](
const TaskEntry& e) {
return e.
name == op.entry.name; });
636 if (it->routine && it->routine->is_active())
637 it->routine->set_should_terminate(
true);
643 [&](
const TaskEntry& e) {
return e.
name == op.entry.name; });
645 if (it->routine && it->routine->is_active())
646 it->routine->set_should_terminate(
true);
651 op.entry = {
nullptr,
"" };
652 op.active.store(
false, std::memory_order_release);
660 if (cross_tasks.empty()) {
669 if (processing_units == 0) {
670 processing_units = 1;
673 uint64_t base = clock_it->second->current_position();
675 for (uint64_t i = 0; i < processing_units; ++i) {
676 uint64_t pos = base + i;
677 for (
auto& routine : cross_tasks) {
678 if (routine && routine->is_active()) {
679 routine->try_resume_with_context(pos, context);
690 std::this_thread::yield();
699 if (entry.routine && entry.routine->is_active())
700 entry.routine->try_resume(0);
#define MF_LOG(comp, ctx,...)
#define MF_ERROR(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
Abstract base interface for all clock types in the multimodal scheduling system.
std::atomic< uint64_t > m_next_task_id
Task ID counter for unique identification.
void add_task(const std::shared_ptr< Routine > &routine, const std::string &name="", bool initialize=false)
Add a routine to the scheduler based on its processing token.
bool initialize_routine_state(const std::shared_ptr< Routine > &routine, ProcessingToken token)
Initialize a routine's state for a specific domain.
void register_token_processor(ProcessingToken token, token_processing_func_t processor)
Register a custom processor for a specific token domain.
std::atomic< uint32_t > m_pending_count
uint64_t seconds_to_units(double seconds, ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Convert seconds to processing units for a specific domain.
unsigned int get_default_rate(ProcessingToken token) const
Get the default rate for a processing token.
std::shared_ptr< Routine > get_task(const std::string &name) const
Get a named task.
void cleanup_completed_tasks()
Clean up completed tasks in a domain.
std::atomic< uint32_t > m_conditional_pending_count
void process_buffer_cycle_tasks()
void start_conditional_thread()
Start the conditional task processing thread if not already running.
uint64_t m_registered_sample_rate
std::unordered_map< ProcessingToken, std::shared_ptr< IClock > > m_token_clocks
Clock instances for each processing domain.
uint32_t m_registered_frame_rate
std::string auto_generate_name(const std::shared_ptr< Routine > &routine) const
Generate automatic name for a routine based on its type.
void pump_conditional()
Pump the conditional task list in a separate thread.
TaskScheduler(uint32_t default_sample_rate=48000, uint32_t default_frame_rate=60)
Constructs a TaskScheduler with the specified sample rate.
std::vector< TaskEntry > m_tasks
const std::vector< TaskEntry > & get_all_tasks()
Get all tasks for inspection/debugging.
PendingTaskOp m_pending_ops[MAX_PENDING_TASKS]
void process_token(ProcessingToken token, uint64_t processing_units=1)
Process all tasks for a specific processing domain.
std::vector< TaskEntry >::iterator find_task_by_name(const std::string &name)
Find task entry by name.
uint64_t m_current_buffer_cycle
void ensure_domain(ProcessingToken token, unsigned int rate=0)
Initialize a processing domain if it doesn't exist.
void pause_all_tasks()
Pause all active tasks.
std::vector< TaskEntry > m_conditional_tasks
uint64_t get_next_task_id() const
Generates a unique task ID for new tasks.
uint32_t m_cleanup_threshold
Threshold for task cleanup.
uint64_t seconds_to_samples(double seconds) const
Converts a time in seconds to a number of samples.
std::thread m_conditional_thread
std::vector< std::shared_ptr< Routine > > get_tasks_for_token(ProcessingToken token) const
Get all tasks for a specific processing domain.
std::unordered_map< ProcessingToken, token_processing_func_t > m_token_processors
Custom processors for specific domains.
void drain_pending_tasks()
Drain pending task operations (additions/removals) before processing.
void pump_cross(DelayContext context, ProcessingToken clock_token, uint64_t processing_units)
Pump the MULTI_RATE list from one clock's context.
uint64_t current_units(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get current processing units for a domain.
void register_clock(ProcessingToken token, std::shared_ptr< IClock > clock)
Register an externally-owned clock as the authoritative source for a token domain.
void process_default(ProcessingToken token, uint64_t processing_units)
Process tasks in a specific domain with default algorithm.
void process_all_tokens()
Process all active domains.
unsigned int get_rate(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get processing rate for a domain.
bool restart_task(const std::string &name)
Restart a named task.
PendingTaskOp m_conditional_pending_ops[MAX_PENDING_CONDITIONAL]
std::vector< std::string > get_task_names() const
Get all task names for debugging/inspection.
std::atomic< bool > m_conditional_stop
void terminate_all_tasks()
Terminate and clear all tasks.
void resume_all_tasks()
Resume all previously paused tasks.
bool has_active_tasks(ProcessingToken token) const
Check if a processing domain has any active tasks.
const SampleClock & get_clock() const
Gets the primary clock (audio domain for legacy compatibility)
bool cancel_task(const std::shared_ptr< Routine > &routine)
Cancels and removes a task from the scheduler.
void drain_conditional_pending()
Drain pending conditional task operations before processing.
std::vector< TaskEntry >::iterator find_task_by_routine(const std::shared_ptr< Routine > &routine)
Find task entry by routine pointer.
@ CoroutineScheduling
Coroutine scheduling and temporal coordination (Vruta::TaskScheduler)
@ Vruta
Coroutines, schedulers, clocks, task management.
std::function< void(const std::vector< std::shared_ptr< Routine > > &, uint64_t)> token_processing_func_t
Function type for processing tasks in a specific token domain.
uint32_t s_registered_frame_rate
@ MULTI_RATE
Coroutine can handle multiple sample rates. Picks the frame-accurate processing token by default.
@ CONDITIONAL
Condition-driven execution - resume when a caller-supplied predicate returns true.
@ FRAME_ACCURATE
Coroutine is frame-accurate.
@ SAMPLE_ACCURATE
Coroutine is sample-accurate.
@ ON_DEMAND
Coroutine is executed on demand, not scheduled.
uint32_t s_registered_sample_rate
DelayContext
Discriminator for different temporal delay mechanisms.
@ FRAME_BASED
Frame-rate delay (Graphics domain)
@ SAMPLE_BASED
Sample-accurate delay (audio domain)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)
std::shared_ptr< Routine > routine