MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
Scheduler.cpp
Go to the documentation of this file.
1#include "Scheduler.hpp"
2
3#include "MayaFlux/Utils.hpp"
4
5namespace MayaFlux::Vruta {
6
7TaskScheduler::TaskScheduler(uint32_t default_sample_rate, uint32_t default_frame_rate)
8 : m_clock(default_sample_rate)
9 , m_cleanup_threshold(512)
10{
13 ensure_domain(ProcessingToken::MULTI_RATE, default_sample_rate);
15}
16
17void TaskScheduler::add_task(std::shared_ptr<Routine> routine, const std::string& name, bool initialize)
18{
19 if (!routine) {
20 std::cerr << "Failed to initiate routine\n;\t >> Exiting" << std::endl;
21 return;
22 }
23
24 std::string task_name = name.empty() ? auto_generate_name(routine) : name;
25 ProcessingToken token = routine->get_processing_token();
26
27 {
28 auto existing_it = find_task_by_name(task_name);
29 if (existing_it != m_tasks.end()) {
30 if (existing_it->routine && existing_it->routine->is_active()) {
31 existing_it->routine->set_should_terminate(true);
32 }
33 m_tasks.erase(existing_it);
34 }
35
36 m_tasks.emplace_back(routine, task_name);
37 }
38
39 if (initialize) {
42 } else {
44 }
45}
46
47bool TaskScheduler::cancel_task(const std::string& name)
48{
49 auto it = find_task_by_name(name);
50 if (it != m_tasks.end()) {
51 if (it->routine && it->routine->is_active()) {
52 it->routine->set_should_terminate(true);
53 }
54 m_tasks.erase(it);
55 return true;
56 }
57 return false;
58}
59
60bool TaskScheduler::cancel_task(std::shared_ptr<Routine> routine)
61{
62 auto it = find_task_by_routine(routine);
63 if (it != m_tasks.end()) {
64 if (routine && routine->is_active()) {
65 routine->set_should_terminate(true);
66 }
67 m_tasks.erase(it);
68 return true;
69 }
70 return false;
71}
72
73bool TaskScheduler::restart_task(const std::string& name)
74{
75 auto it = find_task_by_name(name);
76 if (it != m_tasks.end()) {
77 if (it->routine && it->routine->is_active()) {
78 it->routine->restart();
79 }
80 }
81 return false;
82}
83
84std::shared_ptr<Routine> TaskScheduler::get_task(const std::string& name) const
85{
86 auto it = find_task_by_name(name);
87 return (it != m_tasks.end()) ? it->routine : nullptr;
88}
89
90std::vector<std::shared_ptr<Routine>> TaskScheduler::get_tasks_for_token(ProcessingToken token) const
91{
92 std::vector<std::shared_ptr<Routine>> result;
93 for (const auto& entry : m_tasks) {
94 if (entry.routine && entry.routine->get_processing_token() == token) {
95 result.push_back(entry.routine);
96 }
97 }
98 return result;
99}
100
101void TaskScheduler::process_token(ProcessingToken token, uint64_t processing_units)
102{
103 auto processor_it = m_token_processors.find(token);
104 if (processor_it != m_token_processors.end()) {
105 auto tasks = get_tasks_for_token(token);
106 processor_it->second(tasks, processing_units);
107 } else {
108 process_default(token, processing_units);
109 }
110
111 static uint64_t cleanup_counter = 0;
112 if (++cleanup_counter % (m_cleanup_threshold * 2) == 0) {
114 }
115}
116
118{
119 for (const auto& token : m_token_clocks) {
120 process_token(token.first, 0);
121 }
122
123 static uint64_t cleanup_counter = 0;
124 if (++cleanup_counter % m_cleanup_threshold == 0) {
126 }
127}
128
134
136{
137 auto clock_it = m_token_clocks.find(token);
138 if (clock_it != m_token_clocks.end()) {
139 return *clock_it->second;
140 }
141
142 auto audio_clock_it = m_token_clocks.find(ProcessingToken::SAMPLE_ACCURATE);
143 if (audio_clock_it != m_token_clocks.end()) {
144 return *audio_clock_it->second;
145 }
146
147 throw std::runtime_error("No clocks available in scheduler");
148}
149
151{
152 unsigned int rate = get_rate(token);
153 return Utils::seconds_to_units(seconds, rate);
154}
155
156uint64_t TaskScheduler::seconds_to_samples(double seconds) const
157{
159}
160
162{
163 const auto& clock = get_clock(token);
164 return clock.current_position();
165}
166
168{
169 auto clock_it = m_token_clocks.find(token);
170 if (clock_it != m_token_clocks.end()) {
171 return clock_it->second->rate();
172 }
173
174 return get_default_rate(token);
175}
176
178{
179 return std::any_of(m_tasks.begin(), m_tasks.end(),
180 [token](const TaskEntry& entry) {
181 return entry.routine && entry.routine->is_active() && entry.routine->get_processing_token() == token;
182 });
183}
184
186{
187 return m_next_task_id.fetch_add(1);
188}
189
190std::string TaskScheduler::auto_generate_name(std::shared_ptr<Routine> routine) const
191{
192 return "task_" + std::to_string(get_next_task_id());
193}
194
195std::vector<TaskEntry>::iterator TaskScheduler::find_task_by_name(const std::string& name)
196{
197 return std::find_if(m_tasks.begin(), m_tasks.end(),
198 [&name](const TaskEntry& entry) { return entry.name == name; });
199}
200
201std::vector<TaskEntry>::const_iterator TaskScheduler::find_task_by_name(const std::string& name) const
202{
203 return std::find_if(m_tasks.begin(), m_tasks.end(),
204 [&name](const TaskEntry& entry) { return entry.name == name; });
205}
206
207std::vector<TaskEntry>::iterator TaskScheduler::find_task_by_routine(std::shared_ptr<Routine> routine)
208{
209 return std::find_if(m_tasks.begin(), m_tasks.end(),
210 [&routine](const TaskEntry& entry) { return entry.routine == routine; });
211}
212
214{
215 switch (token) {
217 return 48000;
219 return 60;
221 return 48000;
223 return 1;
225 return 1000;
226 default:
227 return 48000;
228 }
229}
230
232{
233 auto clock_it = m_token_clocks.find(token);
234 if (clock_it == m_token_clocks.end()) {
235 unsigned int domain_rate = (rate > 0) ? rate : get_default_rate(token);
236
237 switch (token) {
239 m_token_clocks[token] = std::make_unique<SampleClock>(domain_rate);
240 break;
242 m_token_clocks[token] = std::make_unique<SampleClock>(domain_rate);
243 break;
245 m_token_clocks[token] = std::make_unique<SampleClock>(domain_rate);
246 break;
247 default:
248 m_token_clocks[token] = std::make_unique<SampleClock>(domain_rate);
249 break;
250 }
251 }
252}
253
255{
256 auto clock_it = m_token_clocks.find(token);
257 if (clock_it == m_token_clocks.end()) {
258 return;
259 }
260
261 auto tasks = get_tasks_for_token(token);
262 if (tasks.empty()) {
263 auto& clock = *clock_it->second;
264 clock.tick(processing_units);
265 return;
266 }
267
268 auto& clock = *clock_it->second;
269
270 for (uint64_t i = 0; i < processing_units; i++) {
271 uint64_t current_context = clock.current_position();
272
273 for (auto& routine : tasks) {
274 if (routine && routine->is_active()) {
275 if (routine->requires_clock_sync()) {
276 if (current_context >= routine->next_execution()) {
277 routine->try_resume_with_context(current_context, DelayContext::SAMPLE_BASED);
278 }
279 } else {
280 routine->try_resume_with_context(current_context, DelayContext::SAMPLE_BASED);
281 }
282 }
283 }
284
285 clock.tick(1);
286 }
287}
288
290{
291 m_tasks.erase(
292 std::remove_if(m_tasks.begin(), m_tasks.end(),
293 [](const TaskEntry& entry) {
294 return !entry.routine || !entry.routine->is_active();
295 }),
296 m_tasks.end());
297}
298
299bool TaskScheduler::initialize_routine_state(std::shared_ptr<Routine> routine, ProcessingToken token)
300{
301 if (!routine) {
302 return false;
303 }
304
305 auto clock_it = m_token_clocks.find(token);
306 if (clock_it == m_token_clocks.end()) {
307 return false;
308 }
309
310 uint64_t current_context = clock_it->second->current_position();
311 return routine->initialize_state(current_context);
312}
313
315{
316 for (auto& entry : m_tasks) {
317 if (entry.routine && entry.routine->is_active()) {
318 bool current_auto_resume = entry.routine->get_auto_resume();
319 entry.routine->set_state<bool>("was_auto_resume", current_auto_resume);
320 entry.routine->set_auto_resume(false);
321 }
322 }
323}
324
326{
327 for (auto& entry : m_tasks) {
328 if (entry.routine && entry.routine->is_active()) {
329 bool* was_auto_resume = entry.routine->get_state<bool>("was_auto_resume");
330 if (was_auto_resume) {
331 entry.routine->set_auto_resume(*was_auto_resume);
332 } else {
333 entry.routine->set_auto_resume(true);
334 }
335 }
336 }
337}
338
340{
341 for (auto& entry : m_tasks) {
342 if (entry.routine && entry.routine->is_active()) {
343 entry.routine->set_should_terminate(true);
344 entry.routine->set_auto_resume(true);
345 }
346 }
347
348 std::this_thread::sleep_for(std::chrono::milliseconds(10));
349
350 m_tasks.clear();
351}
352
354{
357
358 for (auto& task : tasks) {
359 if (task && task->is_active()) {
360 if (task->requires_clock_sync()) {
361 if (m_current_buffer_cycle >= task->next_execution()) {
362 task->try_resume_with_context(m_current_buffer_cycle, DelayContext::BUFFER_BASED);
363 }
364 } else {
365 task->try_resume_with_context(m_current_buffer_cycle, DelayContext::BUFFER_BASED);
366 }
367 }
368 }
369}
370
371}
static MayaFlux::Nodes::ProcessingToken token
Definition Timers.cpp:8
Abstract base interface for all clock types in the multimodal scheduling system.
Definition Clock.hpp:23
std::atomic< uint64_t > m_next_task_id
Task ID counter for unique identification.
bool cancel_task(std::shared_ptr< Routine > task)
Cancels and removes a task from the scheduler.
Definition Scheduler.cpp:60
void register_token_processor(ProcessingToken token, token_processing_func_t processor)
Register a custom processor for a specific token domain.
uint64_t seconds_to_units(double seconds, ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Convert seconds to processing units for a specific domain.
std::vector< TaskEntry >::iterator find_task_by_routine(std::shared_ptr< Routine > routine)
Find task entry by routine pointer.
unsigned int get_default_rate(ProcessingToken token) const
Get the default rate for a processing token.
std::shared_ptr< Routine > get_task(const std::string &name) const
Get a named task.
Definition Scheduler.cpp:84
void cleanup_completed_tasks()
Clean up completed tasks in a domain.
void add_task(std::shared_ptr< Routine > routine, const std::string &name="", bool initialize=false)
Add a routine to the scheduler based on its processing token.
Definition Scheduler.cpp:17
TaskScheduler(uint32_t default_sample_rate=48000, uint32_t default_frame_rate=60)
Constructs a TaskScheduler with the specified sample rate.
Definition Scheduler.cpp:7
std::vector< TaskEntry > m_tasks
bool initialize_routine_state(std::shared_ptr< Routine > routine, ProcessingToken token)
Initialize a routine's state for a specific domain.
void process_token(ProcessingToken token, uint64_t processing_units=1)
Process all tasks for a specific processing domain.
std::vector< TaskEntry >::iterator find_task_by_name(const std::string &name)
Find task entry by name.
void ensure_domain(ProcessingToken token, unsigned int rate=0)
Initialize a processing domain if it doesn't exist.
void pause_all_tasks()
Pause all active tasks.
uint64_t get_next_task_id() const
Generates a unique task ID for new tasks.
uint32_t m_cleanup_threshold
Threshold for task cleanup.
uint64_t seconds_to_samples(double seconds) const
Converts a time in seconds to a number of samples.
std::vector< std::shared_ptr< Routine > > get_tasks_for_token(ProcessingToken token) const
Get all tasks for a specific processing domain.
Definition Scheduler.cpp:90
std::unordered_map< ProcessingToken, token_processing_func_t > m_token_processors
Custom processors for specific domains.
uint64_t current_units(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get current processing units for a domain.
void process_default(ProcessingToken token, uint64_t processing_units)
Process tasks in a specific domain with default algorithm.
void process_all_tokens()
Process all active domains.
std::string auto_generate_name(std::shared_ptr< Routine > routine) const
Generate automatic name for a routine based on its type.
unsigned int get_rate(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get processing rate for a domain.
bool restart_task(const std::string &name)
Restart a named task.
Definition Scheduler.cpp:73
void terminate_all_tasks()
Terminate and clear all tasks.
void resume_all_tasks()
Resume all previously paused tasks.
std::unordered_map< ProcessingToken, std::unique_ptr< IClock > > m_token_clocks
Clock instances for each processing domain.
bool has_active_tasks(ProcessingToken token) const
Check if a processing domain has any active tasks.
const SampleClock & get_clock() const
Gets the primary clock (audio domain for legacy compatibility)
void initialize()
Definition main.cpp:11
uint64_t seconds_to_units(double seconds, uint32_t rate)
Convert seconds to processing units for any rate.
Definition Utils.hpp:188
uint64_t seconds_to_samples(double seconds, uint32_t sample_rate)
Convert seconds to samples at a given sample rate.
Definition Utils.hpp:166
std::function< void(const std::vector< std::shared_ptr< Routine > > &, uint64_t)> token_processing_func_t
Function type for processing tasks in a specific token domain.
@ MULTI_RATE
Coroutine can handle multiple sample rates. Picks the frame-accurate processing token by default.
@ FRAME_ACCURATE
Coroutine is frame-accurate.
@ SAMPLE_ACCURATE
Coroutine is sample-accurate.
@ ON_DEMAND
Coroutine is executed on demand, not scheduled.
@ SAMPLE_BASED
Sample-accurate delay (audio domain)
@ BUFFER_BASED
Buffer-cycle delay (audio hardware boundary)