MayaFlux 0.3.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
Scheduler.cpp
Go to the documentation of this file.
1#include "Scheduler.hpp"
2
4
5namespace MayaFlux::Vruta {
6
7TaskScheduler::TaskScheduler(uint32_t default_sample_rate, uint32_t default_frame_rate)
8 : m_clock(default_sample_rate)
9 , m_cleanup_threshold(512)
10 , m_registered_sample_rate(default_sample_rate)
11 , m_registered_frame_rate(default_frame_rate)
12{
15 ensure_domain(ProcessingToken::MULTI_RATE, default_sample_rate);
17}
18
19void TaskScheduler::add_task(const std::shared_ptr<Routine>& routine, const std::string& name, bool initialize)
20{
21 if (!routine) {
22 MF_ERROR(Journal::Component::Vruta, Journal::Context::CoroutineScheduling, "Failed to initiate routine; routine is null. Exiting add_task.");
23 return;
24 }
25
26 std::string task_name = name.empty() ? auto_generate_name(routine) : name;
27 ProcessingToken token = routine->get_processing_token();
28
29 {
30 auto existing_it = find_task_by_name(task_name);
31 if (existing_it != m_tasks.end()) {
32 if (existing_it->routine && existing_it->routine->is_active()) {
33 existing_it->routine->set_should_terminate(true);
34 }
35 m_tasks.erase(existing_it);
36 }
37
38 m_tasks.emplace_back(routine, task_name);
39 }
40
41 if (initialize) {
42 ensure_domain(token);
43 initialize_routine_state(routine, token);
44 } else {
45 ensure_domain(token);
46 }
47}
48
49bool TaskScheduler::cancel_task(const std::string& name)
50{
51 auto it = find_task_by_name(name);
52 if (it != m_tasks.end()) {
53 if (it->routine && it->routine->is_active()) {
54 it->routine->set_should_terminate(true);
55 }
56 m_tasks.erase(it);
57 return true;
58 }
59 return false;
60}
61
62bool TaskScheduler::cancel_task(const std::shared_ptr<Routine>& routine)
63{
64 auto it = find_task_by_routine(routine);
65 if (it != m_tasks.end()) {
66 if (routine && routine->is_active()) {
67 routine->set_should_terminate(true);
68 }
69 m_tasks.erase(it);
70 return true;
71 }
72 return false;
73}
74
75bool TaskScheduler::restart_task(const std::string& name)
76{
77 auto it = find_task_by_name(name);
78 if (it != m_tasks.end()) {
79 if (it->routine && it->routine->is_active()) {
80 it->routine->restart();
81 }
82 }
83 return false;
84}
85
86std::shared_ptr<Routine> TaskScheduler::get_task(const std::string& name) const
87{
88 auto it = find_task_by_name(name);
89 return (it != m_tasks.end()) ? it->routine : nullptr;
90}
91
92std::vector<std::shared_ptr<Routine>> TaskScheduler::get_tasks_for_token(ProcessingToken token) const
93{
94 std::vector<std::shared_ptr<Routine>> result;
95 for (const auto& entry : m_tasks) {
96 if (entry.routine && entry.routine->get_processing_token() == token) {
97 result.push_back(entry.routine);
98 }
99 }
100 return result;
101}
102
103void TaskScheduler::process_token(ProcessingToken token, uint64_t processing_units)
104{
105 auto processor_it = m_token_processors.find(token);
106 if (processor_it != m_token_processors.end()) {
107 auto tasks = get_tasks_for_token(token);
108 processor_it->second(tasks, processing_units);
109 } else {
110 process_default(token, processing_units);
111 }
112
113 static uint64_t cleanup_counter = 0;
114 if (++cleanup_counter % (static_cast<uint64_t>(m_cleanup_threshold * 2)) == 0) {
116 }
117}
118
120{
121 for (const auto& token : m_token_clocks) {
122 process_token(token.first, 0);
123 }
124
125 static uint64_t cleanup_counter = 0;
126 if (++cleanup_counter % m_cleanup_threshold == 0) {
128 }
129}
130
132{
133 ensure_domain(token);
134 m_token_processors[token] = std::move(processor);
135}
136
138{
139 auto clock_it = m_token_clocks.find(token);
140 if (clock_it != m_token_clocks.end()) {
141 return *clock_it->second;
142 }
143
144 auto audio_clock_it = m_token_clocks.find(ProcessingToken::SAMPLE_ACCURATE);
145 if (audio_clock_it != m_token_clocks.end()) {
146 return *audio_clock_it->second;
147 }
148
149 throw std::runtime_error("No clocks available in scheduler");
150}
151
152uint64_t TaskScheduler::seconds_to_units(double seconds, ProcessingToken token) const
153{
154 unsigned int rate = get_rate(token);
155 return static_cast<uint64_t>(seconds * rate);
156}
157
158uint64_t TaskScheduler::seconds_to_samples(double seconds) const
159{
160 return static_cast<uint64_t>(seconds * get_rate(ProcessingToken::SAMPLE_ACCURATE));
161}
162
164{
165 const auto& clock = get_clock(token);
166 return clock.current_position();
167}
168
170{
171 auto clock_it = m_token_clocks.find(token);
172 if (clock_it != m_token_clocks.end()) {
173 return clock_it->second->rate();
174 }
175
176 return get_default_rate(token);
177}
178
180{
181 return std::ranges::any_of(m_tasks,
182 [token](const TaskEntry& entry) {
183 return entry.routine && entry.routine->is_active() && entry.routine->get_processing_token() == token;
184 });
185}
186
188{
189 return m_next_task_id.fetch_add(1);
190}
191
192std::string TaskScheduler::auto_generate_name(const std::shared_ptr<Routine>& /*routine*/) const
193{
194 return "task_" + std::to_string(get_next_task_id());
195}
196
197std::vector<TaskEntry>::iterator TaskScheduler::find_task_by_name(const std::string& name)
198{
199 return std::ranges::find_if(m_tasks,
200 [&name](const TaskEntry& entry) { return entry.name == name; });
201}
202
203std::vector<TaskEntry>::const_iterator TaskScheduler::find_task_by_name(const std::string& name) const
204{
205 return std::ranges::find_if(m_tasks,
206 [&name](const TaskEntry& entry) { return entry.name == name; });
207}
208
209std::vector<TaskEntry>::iterator TaskScheduler::find_task_by_routine(const std::shared_ptr<Routine>& routine)
210{
211 return std::ranges::find_if(m_tasks,
212 [&routine](const TaskEntry& entry) { return entry.routine == routine; });
213}
214
216{
217 switch (token) {
225 return 1;
227 return 1000;
228 default:
230 }
231}
232
233void TaskScheduler::ensure_domain(ProcessingToken token, unsigned int rate)
234{
235 auto clock_it = m_token_clocks.find(token);
236 if (clock_it == m_token_clocks.end()) {
237 unsigned int domain_rate = (rate > 0) ? rate : get_default_rate(token);
238
239 switch (token) {
241 m_token_clocks[token] = std::make_unique<FrameClock>(domain_rate);
242 break;
245 default:
246 m_token_clocks[token] = std::make_unique<SampleClock>(domain_rate);
247 break;
248 }
249 }
250}
251
252void TaskScheduler::process_default(ProcessingToken token, uint64_t processing_units)
253{
254 auto clock_it = m_token_clocks.find(token);
255 if (clock_it == m_token_clocks.end()) {
256 return;
257 }
258
259 auto tasks = get_tasks_for_token(token);
260 if (tasks.empty()) {
261 auto& clock = *clock_it->second;
262 clock.tick(processing_units);
263 return;
264 }
265
266 auto& clock = *clock_it->second;
267
268 for (uint64_t i = 0; i < processing_units; i++) {
269 uint64_t current_context = clock.current_position();
270
271 for (auto& routine : tasks) {
272 if (routine && routine->is_active()) {
273 if (routine->requires_clock_sync()) {
274 if (current_context >= routine->next_execution()) {
275 routine->try_resume_with_context(current_context, DelayContext::SAMPLE_BASED);
276 }
277 } else {
278 routine->try_resume_with_context(current_context, DelayContext::SAMPLE_BASED);
279 }
280 }
281 }
282
283 clock.tick(1);
284 }
285}
286
288{
289 m_tasks.erase(
290 std::remove_if(m_tasks.begin(), m_tasks.end(),
291 [](const TaskEntry& entry) {
292 return !entry.routine || !entry.routine->is_active();
293 }),
294 m_tasks.end());
295}
296
297bool TaskScheduler::initialize_routine_state(const std::shared_ptr<Routine>& routine, ProcessingToken token)
298{
299 if (!routine) {
300 return false;
301 }
302
303 auto clock_it = m_token_clocks.find(token);
304 if (clock_it == m_token_clocks.end()) {
305 return false;
306 }
307
308 uint64_t current_context = clock_it->second->current_position();
309 return routine->initialize_state(current_context);
310}
311
313{
314 for (auto& entry : m_tasks) {
315 if (entry.routine && entry.routine->is_active()) {
316 bool current_auto_resume = entry.routine->get_auto_resume();
317 entry.routine->set_state<bool>("was_auto_resume", current_auto_resume);
318 entry.routine->set_auto_resume(false);
319 }
320 }
321}
322
324{
325 for (auto& entry : m_tasks) {
326 if (entry.routine && entry.routine->is_active()) {
327 auto was_auto_resume = entry.routine->get_state<bool>("was_auto_resume");
328 if (was_auto_resume) {
329 entry.routine->set_auto_resume(*was_auto_resume);
330 } else {
331 entry.routine->set_auto_resume(true);
332 }
333 }
334 }
335}
336
338{
339 for (auto& entry : m_tasks) {
340 if (entry.routine && entry.routine->is_active()) {
341 entry.routine->set_should_terminate(true);
342 }
343 }
344
345 for (auto& entry : m_tasks) {
346 if (entry.routine && entry.routine->is_active()) {
347 entry.routine->force_resume();
348 }
349 }
350
351 constexpr int MAX_ATTEMPTS = 3;
352 for (int attempt = 0; attempt < MAX_ATTEMPTS; ++attempt) {
353 std::this_thread::sleep_for(std::chrono::milliseconds(5));
354
355 bool any_active = false;
356 for (auto& entry : m_tasks) {
357 if (entry.routine && entry.routine->is_active()) {
358 any_active = true;
359 entry.routine->force_resume();
360 }
361 }
362
363 if (!any_active) {
364 break;
365 }
366 }
367
368 bool all_done = true;
369 for (const auto& entry : m_tasks) {
370 if (entry.routine && entry.routine->is_active()) {
371 all_done = false;
373 "Coroutine '{}' stuck after {} attempts - forcing destruction",
374 entry.name, MAX_ATTEMPTS);
375 }
376 }
377
378 if (!all_done) {
380 "Some coroutines did not complete - forcing destruction");
381 } else {
383 "All coroutines terminated successfully");
384 }
385
386 m_tasks.clear();
387}
388
390{
393
394 for (auto& task : tasks) {
395 if (task && task->is_active()) {
396 if (task->requires_clock_sync()) {
397 if (m_current_buffer_cycle >= task->next_execution()) {
398 task->try_resume_with_context(m_current_buffer_cycle, DelayContext::BUFFER_BASED);
399 }
400 } else {
401 task->try_resume_with_context(m_current_buffer_cycle, DelayContext::BUFFER_BASED);
402 }
403 }
404 }
405}
406
407}
#define MF_ERROR(comp, ctx,...)
#define MF_PRINT(comp, ctx,...)
#define MF_WARN(comp, ctx,...)
Abstract base interface for all clock types in the multimodal scheduling system.
Definition Clock.hpp:23
std::atomic< uint64_t > m_next_task_id
Task ID counter for unique identification.
void add_task(const std::shared_ptr< Routine > &routine, const std::string &name="", bool initialize=false)
Add a routine to the scheduler based on its processing token.
Definition Scheduler.cpp:19
bool initialize_routine_state(const std::shared_ptr< Routine > &routine, ProcessingToken token)
Initialize a routine's state for a specific domain.
void register_token_processor(ProcessingToken token, token_processing_func_t processor)
Register a custom processor for a specific token domain.
uint64_t seconds_to_units(double seconds, ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Convert seconds to processing units for a specific domain.
unsigned int get_default_rate(ProcessingToken token) const
Get the default rate for a processing token.
std::shared_ptr< Routine > get_task(const std::string &name) const
Get a named task.
Definition Scheduler.cpp:86
void cleanup_completed_tasks()
Clean up completed tasks in a domain.
std::string auto_generate_name(const std::shared_ptr< Routine > &routine) const
Generate automatic name for a routine based on its type.
TaskScheduler(uint32_t default_sample_rate=48000, uint32_t default_frame_rate=60)
Constructs a TaskScheduler with the specified sample rate.
Definition Scheduler.cpp:7
std::vector< TaskEntry > m_tasks
void process_token(ProcessingToken token, uint64_t processing_units=1)
Process all tasks for a specific processing domain.
std::vector< TaskEntry >::iterator find_task_by_name(const std::string &name)
Find task entry by name.
void ensure_domain(ProcessingToken token, unsigned int rate=0)
Initialize a processing domain if it doesn't exist.
void pause_all_tasks()
Pause all active tasks.
uint64_t get_next_task_id() const
Generates a unique task ID for new tasks.
uint32_t m_cleanup_threshold
Threshold for task cleanup.
uint64_t seconds_to_samples(double seconds) const
Converts a time in seconds to a number of samples.
std::vector< std::shared_ptr< Routine > > get_tasks_for_token(ProcessingToken token) const
Get all tasks for a specific processing domain.
Definition Scheduler.cpp:92
std::unordered_map< ProcessingToken, token_processing_func_t > m_token_processors
Custom processors for specific domains.
uint64_t current_units(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get current processing units for a domain.
void process_default(ProcessingToken token, uint64_t processing_units)
Process tasks in a specific domain with default algorithm.
void process_all_tokens()
Process all active domains.
unsigned int get_rate(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get processing rate for a domain.
bool restart_task(const std::string &name)
Restart a named task.
Definition Scheduler.cpp:75
void terminate_all_tasks()
Terminate and clear all tasks.
void resume_all_tasks()
Resume all previously paused tasks.
std::unordered_map< ProcessingToken, std::unique_ptr< IClock > > m_token_clocks
Clock instances for each processing domain.
bool has_active_tasks(ProcessingToken token) const
Check if a processing domain has any active tasks.
const SampleClock & get_clock() const
Gets the primary clock (audio domain for legacy compatibility)
bool cancel_task(const std::shared_ptr< Routine > &routine)
Cancels and removes a task from the scheduler.
Definition Scheduler.cpp:62
std::vector< TaskEntry >::iterator find_task_by_routine(const std::shared_ptr< Routine > &routine)
Find task entry by routine pointer.
void initialize()
Definition main.cpp:11
@ CoroutineScheduling
Coroutine scheduling and temporal coordination (Vruta::TaskScheduler)
@ Vruta
Coroutines, schedulers, clocks, task management.
std::function< void(const std::vector< std::shared_ptr< Routine > > &, uint64_t)> token_processing_func_t
Function type for processing tasks in a specific token domain.
@ MULTI_RATE
Coroutine can handle multiple sample rates. Picks the frame-accurate processing token by default.
@ FRAME_ACCURATE
Coroutine is frame-accurate.
@ SAMPLE_ACCURATE
Coroutine is sample-accurate.
@ ON_DEMAND
Coroutine is executed on demand, not scheduled.
@ SAMPLE_BASED
Sample-accurate delay (audio domain)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)
std::shared_ptr< Routine > routine
Definition Scheduler.hpp:9