10 : m_clock(default_sample_rate)
11 , m_cleanup_threshold(512)
22 std::cerr <<
"Failed to initiate routine\n;\t >> Exiting" << std::endl;
31 if (existing_it !=
m_tasks.end()) {
32 if (existing_it->routine && existing_it->routine->is_active()) {
33 existing_it->routine->set_should_terminate(
true);
38 m_tasks.emplace_back(routine, task_name);
53 if (it->routine && it->routine->is_active()) {
54 it->routine->set_should_terminate(
true);
66 if (routine && routine->is_active()) {
67 routine->set_should_terminate(
true);
79 if (it->routine && it->routine->is_active()) {
80 it->routine->restart();
89 return (it !=
m_tasks.end()) ? it->routine :
nullptr;
94 std::vector<std::shared_ptr<Routine>> result;
95 for (
const auto& entry :
m_tasks) {
96 if (entry.routine && entry.routine->get_processing_token() ==
token) {
97 result.push_back(entry.routine);
108 processor_it->second(tasks, processing_units);
113 static uint64_t cleanup_counter = 0;
125 static uint64_t cleanup_counter = 0;
141 return *clock_it->second;
146 return *audio_clock_it->second;
149 throw std::runtime_error(
"No clocks available in scheduler");
166 return clock.current_position();
173 return clock_it->second->rate();
183 return entry.routine && entry.routine->is_active() && entry.routine->get_processing_token() == token;
200 [&name](
const TaskEntry& entry) { return entry.name == name; });
206 [&name](
const TaskEntry& entry) { return entry.name == name; });
212 [&routine](
const TaskEntry& entry) { return entry.routine == routine; });
265 auto& clock = *clock_it->second;
266 clock.tick(processing_units);
270 auto& clock = *clock_it->second;
272 for (uint64_t i = 0; i < processing_units; i++) {
273 uint64_t current_context = clock.current_position();
275 for (
auto& routine : tasks) {
276 if (routine && routine->is_active()) {
277 if (routine->requires_clock_sync()) {
278 if (current_context >= routine->next_execution()) {
296 return !entry.routine || !entry.routine->is_active();
312 uint64_t current_context = clock_it->second->current_position();
313 return routine->initialize_state(current_context);
319 if (entry.routine && entry.routine->is_active()) {
320 bool current_auto_resume = entry.routine->get_auto_resume();
321 entry.routine->set_state<
bool>(
"was_auto_resume", current_auto_resume);
322 entry.routine->set_auto_resume(
false);
330 if (entry.routine && entry.routine->is_active()) {
331 bool* was_auto_resume = entry.routine->get_state<
bool>(
"was_auto_resume");
332 if (was_auto_resume) {
333 entry.routine->set_auto_resume(*was_auto_resume);
335 entry.routine->set_auto_resume(
true);
344 if (entry.routine && entry.routine->is_active()) {
345 entry.routine->set_should_terminate(
true);
350 if (entry.routine && entry.routine->is_active()) {
351 entry.routine->force_resume();
355 constexpr int MAX_ATTEMPTS = 3;
356 for (
int attempt = 0; attempt < MAX_ATTEMPTS; ++attempt) {
357 std::this_thread::sleep_for(std::chrono::milliseconds(5));
359 bool any_active =
false;
361 if (entry.routine && entry.routine->is_active()) {
363 entry.routine->force_resume();
372 bool all_done =
true;
373 for (
const auto& entry :
m_tasks) {
374 if (entry.routine && entry.routine->is_active()) {
377 "Coroutine '{}' stuck after {} attempts - forcing destruction",
378 entry.name, MAX_ATTEMPTS);
384 "Some coroutines did not complete - forcing destruction");
387 "All coroutines terminated successfully");
398 for (
auto& task : tasks) {
399 if (task && task->is_active()) {
400 if (task->requires_clock_sync()) {
#define MF_PRINT(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
static MayaFlux::Nodes::ProcessingToken token
Abstract base interface for all clock types in the multimodal scheduling system.
std::atomic< uint64_t > m_next_task_id
Task ID counter for unique identification.
bool cancel_task(std::shared_ptr< Routine > task)
Cancels and removes a task from the scheduler.
void register_token_processor(ProcessingToken token, token_processing_func_t processor)
Register a custom processor for a specific token domain.
uint64_t seconds_to_units(double seconds, ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Convert seconds to processing units for a specific domain.
std::vector< TaskEntry >::iterator find_task_by_routine(std::shared_ptr< Routine > routine)
Find task entry by routine pointer.
unsigned int get_default_rate(ProcessingToken token) const
Get the default rate for a processing token.
std::shared_ptr< Routine > get_task(const std::string &name) const
Get a named task.
void cleanup_completed_tasks()
Clean up completed tasks in a domain.
void add_task(std::shared_ptr< Routine > routine, const std::string &name="", bool initialize=false)
Add a routine to the scheduler based on its processing token.
void process_buffer_cycle_tasks()
TaskScheduler(uint32_t default_sample_rate=48000, uint32_t default_frame_rate=60)
Constructs a TaskScheduler with the specified sample rate.
std::vector< TaskEntry > m_tasks
bool initialize_routine_state(std::shared_ptr< Routine > routine, ProcessingToken token)
Initialize a routine's state for a specific domain.
void process_token(ProcessingToken token, uint64_t processing_units=1)
Process all tasks for a specific processing domain.
std::vector< TaskEntry >::iterator find_task_by_name(const std::string &name)
Find task entry by name.
uint64_t m_current_buffer_cycle
void ensure_domain(ProcessingToken token, unsigned int rate=0)
Initialize a processing domain if it doesn't exist.
void pause_all_tasks()
Pause all active tasks.
uint64_t get_next_task_id() const
Generates a unique task ID for new tasks.
uint32_t m_cleanup_threshold
Threshold for task cleanup.
uint64_t seconds_to_samples(double seconds) const
Converts a time in seconds to a number of samples.
std::vector< std::shared_ptr< Routine > > get_tasks_for_token(ProcessingToken token) const
Get all tasks for a specific processing domain.
std::unordered_map< ProcessingToken, token_processing_func_t > m_token_processors
Custom processors for specific domains.
uint64_t current_units(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get current processing units for a domain.
void process_default(ProcessingToken token, uint64_t processing_units)
Process tasks in a specific domain with default algorithm.
void process_all_tokens()
Process all active domains.
std::string auto_generate_name(std::shared_ptr< Routine > routine) const
Generate automatic name for a routine based on its type.
unsigned int get_rate(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get processing rate for a domain.
bool restart_task(const std::string &name)
Restart a named task.
void terminate_all_tasks()
Terminate and clear all tasks.
void resume_all_tasks()
Resume all previously paused tasks.
std::unordered_map< ProcessingToken, std::unique_ptr< IClock > > m_token_clocks
Clock instances for each processing domain.
bool has_active_tasks(ProcessingToken token) const
Check if a processing domain has any active tasks.
const SampleClock & get_clock() const
Gets the primary clock (audio domain for legacy compatibility)
@ CoroutineScheduling
Coroutine scheduling and temporal coordination (Vruta::TaskScheduler)
@ Vruta
Coroutines, schedulers, clocks, task management.
uint64_t seconds_to_units(double seconds, uint32_t rate)
Convert seconds to processing units for any rate.
uint64_t seconds_to_samples(double seconds, uint32_t sample_rate)
Convert seconds to samples at a given sample rate.
std::function< void(const std::vector< std::shared_ptr< Routine > > &, uint64_t)> token_processing_func_t
Function type for processing tasks in a specific token domain.
@ MULTI_RATE
Coroutine can handle multiple sample rates. Picks the frame-accurate processing token by default.
@ FRAME_ACCURATE
Coroutine is frame-accurate.
@ SAMPLE_ACCURATE
Coroutine is sample-accurate.
@ ON_DEMAND
Coroutine is executed on demand, not scheduled.
@ SAMPLE_BASED
Sample-accurate delay (audio domain)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)