MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
Scheduler.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "Clock.hpp"
4#include "Routine.hpp"
5
6namespace MayaFlux::Vruta {
7
8struct TaskEntry {
9 std::shared_ptr<Routine> routine;
10 std::string name;
11
12 TaskEntry(const std::shared_ptr<Routine>& r, std::string n)
13 : routine(r)
14 , name(std::move(n))
15 {
16 }
17};
18
19/** @typedef token_processing_func_t
20 * @brief Function type for processing tasks in a specific token domain
21 *
22 * This function type defines the signature for custom processing functions
23 * that can be registered to handle tasks in a specific processing domain.
24 * The function receives a vector of routines and the number of processing
25 * units (samples, frames, etc.) to advance.
26 */
27using token_processing_func_t = std::function<void(const std::vector<std::shared_ptr<Routine>>&, uint64_t)>;
28
29/**
30 * @class TaskScheduler
31 * @brief Token-based multimodal task scheduling system for unified coroutine processing
32 *
33 * TaskScheduler serves as the central orchestrator for coroutine scheduling in the MayaFlux engine,
34 * implementing a token-based architecture that enables seamless integration of different processing
35 * domains while maintaining proven audio scheduling patterns. This design enables true digital-first,
36 * data-driven multimedia task scheduling where audio, visual, and custom coroutines can coexist
37 * in a unified scheduling system.
38 *
39 * **Core Architecture:**
40 * - **Token-Based Scheduling**: Each processing domain (SAMPLE_ACCURATE, FRAME_ACCURATE, etc.)
41 * has its own dedicated scheduling characteristics and clock synchronization
42 * - **Domain-Specific Clocks**: Audio uses SampleClock, video will use FrameClock, etc.
43 * Each domain maintains its own timing reference for precise scheduling
44 * - **Proven Processing Patterns**: Maintains the existing successful audio scheduling flow
45 * while extending it to other domains through token-based routing
46 * - **Cross-Modal Coordination**: Enables coordination between domains for synchronized effects
47 *
48 * The scheduler creates appropriate clocks and task lists based on processing tokens,
49 * ensuring optimal performance for each domain while enabling cross-domain synchronization.
50 */
51class MAYAFLUX_API TaskScheduler {
52public:
53 /**
54 * @brief Constructs a TaskScheduler with the specified sample rate
55 * @param sample_rate The number of samples per second (default: 48000)
56 *
57 * Creates a new TaskScheduler with an internal SampleClock initialized
58 * to the given sample rate. The sample rate determines the relationship
59 * between sample counts and real-time durations for all scheduled tasks.
60 */
61 TaskScheduler(uint32_t default_sample_rate = 48000, uint32_t default_frame_rate = 60);
62
63 /**
64 * @brief Add a routine to the scheduler based on its processing token
65 * @param routine Routine to add (SoundRoutine, GraphicsRoutine, or ComplexRoutine)
66 * @param name Optional name for the routine (for task management)
67 * @param initialize Whether to initialize the routine's state immediately
68 *
69 * The routine's processing token determines which domain it belongs to and
70 * which clock it synchronizes with. Automatically dispatches to the appropriate
71 * token-specific task list and clock synchronization.
72 */
73 void add_task(const std::shared_ptr<Routine>& routine, const std::string& name = "", bool initialize = false);
74
75 /**
76 * @brief Get a named task
77 * @param name Task name
78 * @return Shared pointer to routine or nullptr
79 */
80 std::shared_ptr<Routine> get_task(const std::string& name) const;
81
82 /**
83 * @brief Cancels and removes a task from the scheduler
84 * @param routine Shared pointer to the task to cancel
85 * @return True if the task was found and cancelled, false otherwise
86 *
87 * This method removes a task from the scheduler, preventing it from
88 * executing further. It's used to stop tasks that are no longer needed
89 * or to clean up before shutting down the engine.
90 */
91 bool cancel_task(const std::shared_ptr<Routine>& routine);
92
93 /**
94 * @brief Cancel a task by name
95 * @param name Task name to cancel
96 * @return True if found and cancelled
97 */
98 bool cancel_task(const std::string& name);
99
100 /**
101 * @brief Restart a named task
102 * @param name Task name to restart
103 * @return True if found and restarted
104 */
105 bool restart_task(const std::string& name);
106
107 /**
108 * @brief Get all tasks for a specific processing domain
109 * @param token Processing domain
110 * @return Vector of tasks in the domain
111 */
112 std::vector<std::shared_ptr<Routine>> get_tasks_for_token(ProcessingToken token) const;
113
114 /**
115 * @brief Process all tasks for a specific processing domain
116 * @param token Processing domain to advance
117 * @param processing_units Number of units to process (samples/frames/etc.)
118 *
119 * Advances the appropriate clock and processes all tasks that are ready
120 * to execute in the specified domain. This is the main entry point for
121 * backend-specific processing loops.
122 */
123 void process_token(ProcessingToken token, uint64_t processing_units = 1);
124
125 /**
126 * @brief Process all active domains
127 *
128 * Processes all domains that have active tasks, advancing each domain's
129 * clock by its default processing unit size. Useful for unified processing
130 * in applications that need to advance all domains simultaneously.
131 */
132 void process_all_tokens();
133
134 /**
135 * @brief Register a custom processor for a specific token domain
136 * @param token Processing domain to handle
137 * @param processor Function that receives tasks and processing units for custom scheduling
138 *
139 * Allows registering domain-specific scheduling algorithms. For example,
140 * a graphics backend might register a processor that batches frame-accurate
141 * tasks for optimal GPU utilization.
142 */
143 void register_token_processor(ProcessingToken token, token_processing_func_t processor);
144
145 /**
146 * @brief Register an externally-owned clock as the authoritative source for a token domain.
147 *
148 * Replaces any clock ensure_domain created for this token. The caller retains
149 * shared ownership; the scheduler reads position and rate from it but does not
150 * tick it. Intended for subsystems that self-drive their own clock (e.g.
151 * GraphicsSubsystem and its FrameClock).
152 *
153 * @param token Processing domain whose clock is being replaced.
154 * @param clock Externally-owned clock instance.
155 */
156 void register_clock(ProcessingToken token, std::shared_ptr<IClock> clock);
157
158 /**
159 * @brief Convert seconds to processing units for a specific domain
160 * @param seconds Time in seconds
161 * @param token Processing domain (default: audio)
162 * @return Number of processing units (samples/frames/etc.)
163 */
164 uint64_t seconds_to_units(double seconds, ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const;
165
166 /**
167 * @brief Get current processing units for a domain
168 * @param token Processing domain
169 * @return Current position in the domain's timeline
170 */
171 uint64_t current_units(ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const;
172
173 /**
174 * @brief Get processing rate for a domain
175 * @param token Processing domain
176 * @return Processing rate (sample rate, frame rate, etc.)
177 */
178 unsigned int get_rate(ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const;
179
180 /**
181 * @brief Converts a time in seconds to a number of samples
182 * @param seconds Time duration in seconds
183 * @return Equivalent number of samples at the current sample rate
184 *
185 * Convenience method that uses the audio domain's sample rate for conversion.
186 * For domain-specific conversions, use seconds_to_units() with the appropriate token.
187 */
188 uint64_t seconds_to_samples(double seconds) const;
189
190 /**
191 * @brief Get the audio domain's SampleClock (legacy interface)
192 * @return Reference to the audio domain's SampleClock
193 *
194 * Provides backward compatibility for code that expects direct SampleClock access.
195 * New code should use get_clock() or get_typed_clock<SampleClock>().
196 */
197 inline const SampleClock& get_sample_clock() const
198 {
199 return get_typed_clock<SampleClock>(ProcessingToken::SAMPLE_ACCURATE);
200 }
201
202 /**
203 * @brief Gets the primary clock (audio domain for legacy compatibility)
204 * @return Const reference to the audio domain's SampleClock
205 *
206 * Legacy method - returns the audio clock for backward compatibility.
207 * New multimodal code should use get_clock(token) for specific domains.
208 */
209 const SampleClock& get_clock() const { return m_clock; }
210
211 /**
212 * @brief Get the clock for a specific processing domain
213 * @param token Processing domain
214 * @return Reference to the domain's clock, or audio clock if token not found
215 */
216 const IClock& get_clock(ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const;
217
218 /**
219 * @brief Get a typed clock for a specific processing domain
220 * @tparam ClockType Type of clock to retrieve (e.g., SampleClock, FrameClock)
221 * @param token Processing domain
222 * @return Const reference to the typed clock
223 *
224 * This method allows retrieving a specific type of clock for the given
225 * processing domain, ensuring type safety and correct clock usage.
226 */
227 template <typename ClockType>
228 const ClockType& get_typed_clock(ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const
229 {
230 return dynamic_cast<const ClockType&>(get_clock(token));
231 }
232
233 /**
234 * @brief Update parameters of a named task
235 * @tparam Args Parameter types
236 * @param name Task name
237 * @param args New parameters
238 * @return True if task found and updated
239 */
240 template <typename... Args>
241 bool update_task_params(const std::string& name, Args&&... args)
242 {
243 auto it = find_task_by_name(name);
244 if (it != m_tasks.end() && it->routine && it->routine->is_active()) {
245 it->routine->update_params(std::forward<Args>(args)...);
246 return true;
247 }
248 return false;
249 }
250
251 /**
252 * @brief Get task state value by name and key
253 * @tparam T State value type
254 * @param name Task name
255 * @param state_key State key
256 * @return Pointer to value or nullptr
257 */
258 template <typename T>
259 T* get_task_state(const std::string& name, const std::string& state_key) const
260 {
261 auto it = find_task_by_name(name);
262 if (it != m_tasks.end() && it->routine && it->routine->is_active()) {
263 return it->routine->get_state<T>(state_key);
264 }
265 return nullptr;
266 }
267
268 /**
269 * @brief Create value accessor function for named task
270 * @tparam T Value type
271 * @param name Task name
272 * @param state_key State key
273 * @return Function returning current value
274 */
275 template <typename T>
276 std::function<T()> create_value_accessor(const std::string& name, const std::string& state_key) const
277 {
278 return [this, name, state_key]() -> T {
279 if (auto value = get_task_state<T>(name, state_key)) {
280 return *value;
281 }
282 return T {};
283 };
284 }
285
286 /**
287 * @brief Generates a unique task ID for new tasks
288 * @return A unique task ID
289 */
290 uint64_t get_next_task_id() const;
291
292 /**
293 * @brief Check if a processing domain has any active tasks
294 * @param token Processing domain to check
295 * @return True if domain has active tasks
296 */
297 bool has_active_tasks(ProcessingToken token) const;
298
299 /**
300 * @brief Get all task names for debugging/inspection
301 * @return Vector of all task names
302 */
303 std::vector<std::string> get_task_names() const;
304
305 /**
306 * @brief Get task names for a specific processing domain.
307 * @param token Processing domain to filter by.
308 * @return Vector of task names active in that domain.
309 */
310 std::vector<std::string> get_task_names(ProcessingToken token) const;
311
312 /**
313 * @brief Get all tasks for inspection/debugging
314 * @return Vector of all TaskEntry objects
315 */
316 [[nodiscard]] const std::vector<TaskEntry>& get_all_tasks();
317
318 /**
319 * @brief Pause all active tasks
320 */
321 void pause_all_tasks();
322
323 /**
324 * @brief Resume all previously paused tasks
325 */
326 void resume_all_tasks();
327
328 /**
329 * @brief Terminate and clear all tasks
330 */
331 void terminate_all_tasks();
332
333 /** @brief Get the task cleanup threshold
334 *
335 * This value determines how many processing units must pass before
336 * the scheduler cleans up completed tasks.
337 */
338 inline uint32_t get_cleanup_threshold() const { return m_cleanup_threshold; }
339
340 /** @brief Set the task cleanup threshold
341 * @param threshold New threshold value
342 *
343 * This value determines how many processing units must pass before
344 * the scheduler cleans up completed tasks. Lower values increase
345 * cleanup frequency, while higher values reduce overhead.
346 */
347 inline void set_cleanup_threshold(uint32_t threshold) { m_cleanup_threshold = threshold; }
348
349 /**
350 * @brief Get current buffer cycle for task scheduling
351 * Updated by AudioSubsystem at each buffer boundary
352 */
354 {
355 return m_current_buffer_cycle;
356 }
357
358 /**
359 * @brief Increment buffer cycle counter
360 * Called by AudioSubsystem at start of each buffer processing
361 */
363 {
364 m_current_buffer_cycle++;
365 }
366
367 void process_buffer_cycle_tasks();
368
369private:
370 /**
371 * @brief Generate automatic name for a routine based on its type
372 * @param routine The routine to name
373 * @return Generated name
374 */
375 std::string auto_generate_name(const std::shared_ptr<Routine>& routine) const;
376
377 /**
378 * @brief Find task entry by name
379 * @param name Task name to find
380 * @return Iterator to task entry or end()
381 */
382 std::vector<TaskEntry>::iterator find_task_by_name(const std::string& name);
383
384 /**
385 * @brief Find task entry by name (const version)
386 * @param name Task name to find
387 * @return Const iterator to task entry or end()
388 */
389 std::vector<TaskEntry>::const_iterator find_task_by_name(const std::string& name) const;
390
391 /**
392 * @brief Find task entry by routine pointer
393 * @param routine Routine to find
394 * @return Iterator to task entry or end()
395 */
396 std::vector<TaskEntry>::iterator find_task_by_routine(const std::shared_ptr<Routine>& routine);
397
398 /**
399 * @brief Initialize a processing domain if it doesn't exist
400 * @param token Processing domain to initialize
401 * @param rate Processing rate for the domain
402 */
403 void ensure_domain(ProcessingToken token, unsigned int rate = 0);
404
405 /**
406 * @brief Get the default rate for a processing token
407 * @param token Processing domain
408 * @return Default rate for the domain
409 */
410 unsigned int get_default_rate(ProcessingToken token) const;
411
412 /**
413 * @brief Process tasks in a specific domain with default algorithm
414 * @param token Processing domain
415 * @param processing_units Number of units to process
416 */
417 void process_default(ProcessingToken token, uint64_t processing_units);
418
419 /**
420 * @brief Clean up completed tasks in a domain
421 */
422 void cleanup_completed_tasks();
423
424 /**
425 * @brief Drain pending task operations (additions/removals) before processing
426 *
427 * This method ensures that any tasks that were added or removed while processing
428 * are properly handled before the next processing cycle.
429 */
430 void drain_pending_tasks();
431
432 /**
433 * @brief Drain pending conditional task operations before processing
434 *
435 * Similar to drain_pending_tasks, but specifically for conditional tasks that may
436 * have been added or removed while the conditional processing thread is running.
437 */
438 void drain_conditional_pending();
439
440 /**
441 * @brief Initialize a routine's state for a specific domain
442 * @param routine Routine to initialize
443 * @param token Processing domain
444 */
445 bool initialize_routine_state(const std::shared_ptr<Routine>& routine, ProcessingToken token);
446
447 /**
448 * @brief Start the conditional task processing thread if not already running
449 *
450 * This method ensures that the thread responsible for processing conditional tasks
451 * is started when the first conditional task is added. It checks if the thread is
452 * already running to avoid starting multiple threads.
453 */
454 void start_conditional_thread();
455
456 /**
457 * @brief Pump the MULTI_RATE list from one clock's context.
458 *
459 * Called as a fall-through from process_token when the driving token is
460 * SAMPLE_ACCURATE or FRAME_ACCURATE. Reads the driving clock's position and
461 * offers each cross routine a resume under the given context. The
462 * CrossRoutine gate decides eligibility and claims the resume atomically, so
463 * both threads may call this concurrently on the same list.
464 *
465 * @param context SAMPLE_BASED from the audio thread, FRAME_BASED from graphics.
466 * @param clock_token The token whose clock position drives this pump.
467 * @param processing_units Units to advance, matching the driving token's call.
468 */
469 void pump_cross(DelayContext context, ProcessingToken clock_token, uint64_t processing_units);
470
471 /**
472 * @brief Pump the conditional task list in a separate thread
473 *
474 * This method runs in a dedicated thread and continuously checks for conditional tasks
475 * that are ready to execute. It processes these tasks based on their conditions and
476 * ensures they are executed as soon as their conditions are met.
477 */
478 void pump_conditional();
479
480 /**
481 * @brief Clock instances for each processing domain
482 *
483 * Each domain maintains its own clock for precise timing.
484 * Audio uses SampleClock, graphics will use FrameClock, etc.
485 */
486 std::unordered_map<ProcessingToken, std::shared_ptr<IClock>> m_token_clocks;
487
488 /**
489 * @brief Custom processors for specific domains
490 *
491 * Allows registering domain-specific scheduling algorithms that can
492 * optimize task execution for particular backends or use cases.
493 */
494 std::unordered_map<ProcessingToken, token_processing_func_t> m_token_processors;
495
496 /**
497 * @brief Default processing rates for each domain
498 *
499 * Stores the rate (samples/sec, frames/sec, etc.) for each domain
500 * to enable proper time conversions and clock initialization.
501 */
502 std::unordered_map<ProcessingToken, unsigned int> m_token_rates;
503
504 /**
505 * @brief Task ID counter for unique identification
506 */
507 mutable std::atomic<uint64_t> m_next_task_id { 1 };
508
509 std::vector<TaskEntry> m_tasks;
510
511 std::vector<TaskEntry> m_conditional_tasks;
512
513 /**
514 * @brief The master sample clock for the processing engine
515 *
516 * This clock provides the authoritative time reference for all scheduled
517 * tasks, ensuring they execute with sample-accurate timing relative to
518 * the processing stream.
519 */
521
522 /** @brief Threshold for task cleanup
523 *
524 * This value determines how many processing units must pass before
525 * the scheduler cleans up completed tasks. It helps manage memory
526 * and performance by removing inactive tasks periodically.
527 */
529
530 uint64_t m_current_buffer_cycle {};
531
532 uint64_t m_registered_sample_rate { 48000 };
533 uint32_t m_registered_frame_rate { 60 };
534
535 static constexpr size_t MAX_PENDING_TASKS = 256;
536
538 std::atomic<bool> active { false };
539 TaskEntry entry { nullptr, "" };
540 bool is_addition { true };
541 };
542
543 std::atomic<uint32_t> m_pending_count { 0 };
544 PendingTaskOp m_pending_ops[MAX_PENDING_TASKS];
545
546 static constexpr size_t MAX_PENDING_CONDITIONAL = 64;
547
548 std::atomic<uint32_t> m_conditional_pending_count { 0 };
549 PendingTaskOp m_conditional_pending_ops[MAX_PENDING_CONDITIONAL];
550
551#if MAYAFLUX_USE_JTHREAD
552 std::jthread m_conditional_thread;
553#else
555 std::atomic<bool> m_conditional_stop { false };
556#endif
557};
558
559}
Abstract base interface for all clock types in the multimodal scheduling system.
Definition Clock.hpp:23
Sample-accurate timing system for audio processing domain.
Definition Clock.hpp:90
std::function< T()> create_value_accessor(const std::string &name, const std::string &state_key) const
Create value accessor function for named task.
uint64_t get_current_buffer_cycle() const
Get current buffer cycle for task scheduling Updated by AudioSubsystem at each buffer boundary.
SampleClock m_clock
The master sample clock for the processing engine.
uint32_t get_cleanup_threshold() const
Get the task cleanup threshold.
std::unordered_map< ProcessingToken, std::shared_ptr< IClock > > m_token_clocks
Clock instances for each processing domain.
std::unordered_map< ProcessingToken, unsigned int > m_token_rates
Default processing rates for each domain.
std::vector< TaskEntry > m_tasks
void tick_buffer_cycle()
Increment buffer cycle counter Called by AudioSubsystem at start of each buffer processing.
const SampleClock & get_sample_clock() const
Get the audio domain's SampleClock (legacy interface)
std::vector< TaskEntry > m_conditional_tasks
uint32_t m_cleanup_threshold
Threshold for task cleanup.
std::unordered_map< ProcessingToken, token_processing_func_t > m_token_processors
Custom processors for specific domains.
bool update_task_params(const std::string &name, Args &&... args)
Update parameters of a named task.
T * get_task_state(const std::string &name, const std::string &state_key) const
Get task state value by name and key.
const ClockType & get_typed_clock(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get a typed clock for a specific processing domain.
void set_cleanup_threshold(uint32_t threshold)
Set the task cleanup threshold.
const SampleClock & get_clock() const
Gets the primary clock (audio domain for legacy compatibility)
Token-based multimodal task scheduling system for unified coroutine processing.
Definition Scheduler.hpp:51
void initialize()
Definition main.cpp:11
std::function< void(const std::vector< std::shared_ptr< Routine > > &, uint64_t)> token_processing_func_t
Function type for processing tasks in a specific token domain.
uint64_t seconds_to_units(double seconds, uint32_t rate)
Convert seconds to processing units for any rate.
DelayContext
Discriminator for different temporal delay mechanisms.
bool restart_task(const std::string &name)
Restarts a scheduled task.
Definition Chronie.cpp:123
uint64_t seconds_to_samples(double seconds)
Converts a time duration in seconds to the equivalent number of audio samples.
Definition Chronie.cpp:271
bool cancel_task(const std::string &name)
Cancels a scheduled task.
Definition Chronie.cpp:118
std::shared_ptr< Routine > routine
Definition Scheduler.hpp:9
TaskEntry(const std::shared_ptr< Routine > &r, std::string n)
Definition Scheduler.hpp:12