MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
Scheduler.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "Clock.hpp"
4#include "Routine.hpp"
5
6namespace MayaFlux::Vruta {
7
8struct TaskEntry {
9 std::shared_ptr<Routine> routine;
10 std::string name;
11
12 TaskEntry(std::shared_ptr<Routine> r, const std::string& n)
13 : routine(std::move(r))
14 , name(std::move(n))
15 {
16 }
17};
18
19/** @typedef token_processing_func_t
20 * @brief Function type for processing tasks in a specific token domain
21 *
22 * This function type defines the signature for custom processing functions
23 * that can be registered to handle tasks in a specific processing domain.
24 * The function receives a vector of routines and the number of processing
25 * units (samples, frames, etc.) to advance.
26 */
27using token_processing_func_t = std::function<void(const std::vector<std::shared_ptr<Routine>>&, uint64_t)>;
28
29/**
30 * @class TaskScheduler
31 * @brief Token-based multimodal task scheduling system for unified coroutine processing
32 *
33 * TaskScheduler serves as the central orchestrator for coroutine scheduling in the MayaFlux engine,
34 * implementing a token-based architecture that enables seamless integration of different processing
35 * domains while maintaining proven audio scheduling patterns. This design enables true digital-first,
36 * data-driven multimedia task scheduling where audio, visual, and custom coroutines can coexist
37 * in a unified scheduling system.
38 *
39 * **Core Architecture:**
40 * - **Token-Based Scheduling**: Each processing domain (SAMPLE_ACCURATE, FRAME_ACCURATE, etc.)
41 * has its own dedicated scheduling characteristics and clock synchronization
42 * - **Domain-Specific Clocks**: Audio uses SampleClock, video will use FrameClock, etc.
43 * Each domain maintains its own timing reference for precise scheduling
44 * - **Proven Processing Patterns**: Maintains the existing successful audio scheduling flow
45 * while extending it to other domains through token-based routing
46 * - **Cross-Modal Coordination**: Enables coordination between domains for synchronized effects
47 *
48 * The scheduler creates appropriate clocks and task lists based on processing tokens,
49 * ensuring optimal performance for each domain while enabling cross-domain synchronization.
50 */
51class MAYAFLUX_API TaskScheduler {
52public:
53 /**
54 * @brief Constructs a TaskScheduler with the specified sample rate
55 * @param sample_rate The number of samples per second (default: 48000)
56 *
57 * Creates a new TaskScheduler with an internal SampleClock initialized
58 * to the given sample rate. The sample rate determines the relationship
59 * between sample counts and real-time durations for all scheduled tasks.
60 */
61 TaskScheduler(uint32_t default_sample_rate = 48000, uint32_t default_frame_rate = 60);
62
63 /**
64 * @brief Add a routine to the scheduler based on its processing token
65 * @param routine Routine to add (SoundRoutine, GraphicsRoutine, or ComplexRoutine)
66 * @param name Optional name for the routine (for task management)
67 * @param initialize Whether to initialize the routine's state immediately
68 *
69 * The routine's processing token determines which domain it belongs to and
70 * which clock it synchronizes with. Automatically dispatches to the appropriate
71 * token-specific task list and clock synchronization.
72 */
73 void add_task(std::shared_ptr<Routine> routine, const std::string& name = "", bool initialize = false);
74
75 /**
76 * @brief Get a named task
77 * @param name Task name
78 * @return Shared pointer to routine or nullptr
79 */
80 std::shared_ptr<Routine> get_task(const std::string& name) const;
81
82 /**
83 * @brief Cancels and removes a task from the scheduler
84 * @param task Shared pointer to the task to cancel
85 * @return True if the task was found and cancelled, false otherwise
86 *
87 * This method removes a task from the scheduler, preventing it from
88 * executing further. It's used to stop tasks that are no longer needed
89 * or to clean up before shutting down the engine.
90 */
91 bool cancel_task(std::shared_ptr<Routine> task);
92
93 /**
94 * @brief Cancel a task by name
95 * @param name Task name to cancel
96 * @return True if found and cancelled
97 */
98 bool cancel_task(const std::string& name);
99
100 /**
101 * @brief Restart a named task
102 * @param name Task name to restart
103 * @return True if found and restarted
104 */
105 bool restart_task(const std::string& name);
106
107 /**
108 * @brief Get all tasks for a specific processing domain
109 * @param token Processing domain
110 * @return Vector of tasks in the domain
111 */
112 std::vector<std::shared_ptr<Routine>> get_tasks_for_token(ProcessingToken token) const;
113
114 /**
115 * @brief Process all tasks for a specific processing domain
116 * @param token Processing domain to advance
117 * @param processing_units Number of units to process (samples/frames/etc.)
118 *
119 * Advances the appropriate clock and processes all tasks that are ready
120 * to execute in the specified domain. This is the main entry point for
121 * backend-specific processing loops.
122 */
123 void process_token(ProcessingToken token, uint64_t processing_units = 1);
124
125 /**
126 * @brief Process all active domains
127 *
128 * Processes all domains that have active tasks, advancing each domain's
129 * clock by its default processing unit size. Useful for unified processing
130 * in applications that need to advance all domains simultaneously.
131 */
132 void process_all_tokens();
133
134 /**
135 * @brief Register a custom processor for a specific token domain
136 * @param token Processing domain to handle
137 * @param processor Function that receives tasks and processing units for custom scheduling
138 *
139 * Allows registering domain-specific scheduling algorithms. For example,
140 * a graphics backend might register a processor that batches frame-accurate
141 * tasks for optimal GPU utilization.
142 */
143 void register_token_processor(ProcessingToken token, token_processing_func_t processor);
144
145 /**
146 * @brief Convert seconds to processing units for a specific domain
147 * @param seconds Time in seconds
148 * @param token Processing domain (default: audio)
149 * @return Number of processing units (samples/frames/etc.)
150 */
151 uint64_t seconds_to_units(double seconds, ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const;
152
153 /**
154 * @brief Get current processing units for a domain
155 * @param token Processing domain
156 * @return Current position in the domain's timeline
157 */
158 uint64_t current_units(ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const;
159
160 /**
161 * @brief Get processing rate for a domain
162 * @param token Processing domain
163 * @return Processing rate (sample rate, frame rate, etc.)
164 */
165 unsigned int get_rate(ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const;
166
167 /**
168 * @brief Converts a time in seconds to a number of samples
169 * @param seconds Time duration in seconds
170 * @return Equivalent number of samples at the current sample rate
171 *
172 * Convenience method that uses the audio domain's sample rate for conversion.
173 * For domain-specific conversions, use seconds_to_units() with the appropriate token.
174 */
175 uint64_t seconds_to_samples(double seconds) const;
176
177 /**
178 * @brief Get the audio domain's SampleClock (legacy interface)
179 * @return Reference to the audio domain's SampleClock
180 *
181 * Provides backward compatibility for code that expects direct SampleClock access.
182 * New code should use get_clock() or get_typed_clock<SampleClock>().
183 */
184 inline const SampleClock& get_sample_clock() const
185 {
186 return get_typed_clock<SampleClock>(ProcessingToken::SAMPLE_ACCURATE);
187 }
188
189 /**
190 * @brief Gets the primary clock (audio domain for legacy compatibility)
191 * @return Const reference to the audio domain's SampleClock
192 *
193 * Legacy method - returns the audio clock for backward compatibility.
194 * New multimodal code should use get_clock(token) for specific domains.
195 */
196 const SampleClock& get_clock() const { return m_clock; }
197
198 /**
199 * @brief Get the clock for a specific processing domain
200 * @param token Processing domain
201 * @return Reference to the domain's clock, or audio clock if token not found
202 */
203 const IClock& get_clock(ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const;
204
205 /**
206 * @brief Get a typed clock for a specific processing domain
207 * @tparam ClockType Type of clock to retrieve (e.g., SampleClock, FrameClock)
208 * @param token Processing domain
209 * @return Const reference to the typed clock
210 *
211 * This method allows retrieving a specific type of clock for the given
212 * processing domain, ensuring type safety and correct clock usage.
213 */
214 template <typename ClockType>
215 const ClockType& get_typed_clock(ProcessingToken token = ProcessingToken::SAMPLE_ACCURATE) const
216 {
217 return dynamic_cast<const ClockType&>(get_clock(token));
218 }
219
220 /**
221 * @brief Update parameters of a named task
222 * @tparam Args Parameter types
223 * @param name Task name
224 * @param args New parameters
225 * @return True if task found and updated
226 */
227 template <typename... Args>
228 bool update_task_params(const std::string& name, Args&&... args)
229 {
230 auto it = find_task_by_name(name);
231 if (it != m_tasks.end() && it->routine && it->routine->is_active()) {
232 it->routine->update_params(std::forward<Args>(args)...);
233 return true;
234 }
235 return false;
236 }
237
238 /**
239 * @brief Get task state value by name and key
240 * @tparam T State value type
241 * @param name Task name
242 * @param state_key State key
243 * @return Pointer to value or nullptr
244 */
245 template <typename T>
246 T* get_task_state(const std::string& name, const std::string& state_key) const
247 {
248 auto it = find_task_by_name(name);
249 if (it != m_tasks.end() && it->routine && it->routine->is_active()) {
250 return it->routine->get_state<T>(state_key);
251 }
252 return nullptr;
253 }
254
255 /**
256 * @brief Create value accessor function for named task
257 * @tparam T Value type
258 * @param name Task name
259 * @param state_key State key
260 * @return Function returning current value
261 */
262 template <typename T>
263 std::function<T()> create_value_accessor(const std::string& name, const std::string& state_key) const
264 {
265 return [this, name, state_key]() -> T {
266 if (auto value = get_task_state<T>(name, state_key)) {
267 return *value;
268 }
269 return T {};
270 };
271 }
272
273 /**
274 * @brief Generates a unique task ID for new tasks
275 * @return A unique task ID
276 */
277 uint64_t get_next_task_id() const;
278
279 /**
280 * @brief Check if a processing domain has any active tasks
281 * @param token Processing domain to check
282 * @return True if domain has active tasks
283 */
284 bool has_active_tasks(ProcessingToken token) const;
285
286 /**
287 * @brief Get all task names for debugging/inspection
288 * @return Vector of all task names
289 */
290 std::vector<std::string> get_task_names() const;
291
292 /**
293 * @brief Pause all active tasks
294 */
295 void pause_all_tasks();
296
297 /**
298 * @brief Resume all previously paused tasks
299 */
300 void resume_all_tasks();
301
302 /**
303 * @brief Terminate and clear all tasks
304 */
305 void terminate_all_tasks();
306
307 /** @brief Get the task cleanup threshold
308 *
309 * This value determines how many processing units must pass before
310 * the scheduler cleans up completed tasks.
311 */
312 inline uint32_t get_cleanup_threshold() const { return m_cleanup_threshold; }
313
314 /** @brief Set the task cleanup threshold
315 * @param threshold New threshold value
316 *
317 * This value determines how many processing units must pass before
318 * the scheduler cleans up completed tasks. Lower values increase
319 * cleanup frequency, while higher values reduce overhead.
320 */
321 inline void set_cleanup_threshold(uint32_t threshold) { m_cleanup_threshold = threshold; }
322
323 /**
324 * @brief Get current buffer cycle for task scheduling
325 * Updated by AudioSubsystem at each buffer boundary
326 */
328 {
329 return m_current_buffer_cycle;
330 }
331
332 /**
333 * @brief Increment buffer cycle counter
334 * Called by AudioSubsystem at start of each buffer processing
335 */
337 {
338 m_current_buffer_cycle++;
339 }
340
341 void process_buffer_cycle_tasks();
342
343private:
344 /**
345 * @brief Generate automatic name for a routine based on its type
346 * @param routine The routine to name
347 * @return Generated name
348 */
349 std::string auto_generate_name(std::shared_ptr<Routine> routine) const;
350
351 /**
352 * @brief Find task entry by name
353 * @param name Task name to find
354 * @return Iterator to task entry or end()
355 */
356 std::vector<TaskEntry>::iterator find_task_by_name(const std::string& name);
357
358 /**
359 * @brief Find task entry by name (const version)
360 * @param name Task name to find
361 * @return Const iterator to task entry or end()
362 */
363 std::vector<TaskEntry>::const_iterator find_task_by_name(const std::string& name) const;
364
365 /**
366 * @brief Find task entry by routine pointer
367 * @param routine Routine to find
368 * @return Iterator to task entry or end()
369 */
370 std::vector<TaskEntry>::iterator find_task_by_routine(std::shared_ptr<Routine> routine);
371
372 /**
373 * @brief Initialize a processing domain if it doesn't exist
374 * @param token Processing domain to initialize
375 * @param rate Processing rate for the domain
376 */
377 void ensure_domain(ProcessingToken token, unsigned int rate = 0);
378
379 /**
380 * @brief Get the default rate for a processing token
381 * @param token Processing domain
382 * @return Default rate for the domain
383 */
384 unsigned int get_default_rate(ProcessingToken token) const;
385
386 /**
387 * @brief Process tasks in a specific domain with default algorithm
388 * @param token Processing domain
389 * @param processing_units Number of units to process
390 */
391 void process_default(ProcessingToken token, uint64_t processing_units);
392
393 /**
394 * @brief Clean up completed tasks in a domain
395 */
396 void cleanup_completed_tasks();
397
398 /**
399 * @brief Initialize a routine's state for a specific domain
400 * @param routine Routine to initialize
401 * @param token Processing domain
402 */
403 bool initialize_routine_state(std::shared_ptr<Routine> routine, ProcessingToken token);
404
405 /**
406 * @brief Clock instances for each processing domain
407 *
408 * Each domain maintains its own clock for precise timing.
409 * Audio uses SampleClock, graphics will use FrameClock, etc.
410 */
411 std::unordered_map<ProcessingToken, std::unique_ptr<IClock>> m_token_clocks;
412
413 /**
414 * @brief Custom processors for specific domains
415 *
416 * Allows registering domain-specific scheduling algorithms that can
417 * optimize task execution for particular backends or use cases.
418 */
419 std::unordered_map<ProcessingToken, token_processing_func_t> m_token_processors;
420
421 /**
422 * @brief Default processing rates for each domain
423 *
424 * Stores the rate (samples/sec, frames/sec, etc.) for each domain
425 * to enable proper time conversions and clock initialization.
426 */
427 std::unordered_map<ProcessingToken, unsigned int> m_token_rates;
428
429 /**
430 * @brief Task ID counter for unique identification
431 */
432 mutable std::atomic<uint64_t> m_next_task_id { 1 };
433
434 std::vector<TaskEntry> m_tasks;
435
436 /**
437 * @brief The master sample clock for the processing engine
438 *
439 * This clock provides the authoritative time reference for all scheduled
440 * tasks, ensuring they execute with sample-accurate timing relative to
441 * the processing stream.
442 */
444
445 /** @brief Threshold for task cleanup
446 *
447 * This value determines how many processing units must pass before
448 * the scheduler cleans up completed tasks. It helps manage memory
449 * and performance by removing inactive tasks periodically.
450 */
452
453 uint64_t m_current_buffer_cycle {};
454};
455
456}
static MayaFlux::Nodes::ProcessingToken token
Definition Timers.cpp:8
Abstract base interface for all clock types in the multimodal scheduling system.
Definition Clock.hpp:23
Sample-accurate timing system for audio processing domain.
Definition Clock.hpp:90
std::function< T()> create_value_accessor(const std::string &name, const std::string &state_key) const
Create value accessor function for named task.
uint64_t get_current_buffer_cycle() const
Get current buffer cycle for task scheduling Updated by AudioSubsystem at each buffer boundary.
SampleClock m_clock
The master sample clock for the processing engine.
uint32_t get_cleanup_threshold() const
Get the task cleanup threshold.
std::unordered_map< ProcessingToken, unsigned int > m_token_rates
Default processing rates for each domain.
std::vector< TaskEntry > m_tasks
void tick_buffer_cycle()
Increment buffer cycle counter Called by AudioSubsystem at start of each buffer processing.
const SampleClock & get_sample_clock() const
Get the audio domain's SampleClock (legacy interface)
uint32_t m_cleanup_threshold
Threshold for task cleanup.
std::unordered_map< ProcessingToken, token_processing_func_t > m_token_processors
Custom processors for specific domains.
bool update_task_params(const std::string &name, Args &&... args)
Update parameters of a named task.
T * get_task_state(const std::string &name, const std::string &state_key) const
Get task state value by name and key.
const ClockType & get_typed_clock(ProcessingToken token=ProcessingToken::SAMPLE_ACCURATE) const
Get a typed clock for a specific processing domain.
std::vector< std::string > get_task_names() const
Get all task names for debugging/inspection.
void set_cleanup_threshold(uint32_t threshold)
Set the task cleanup threshold.
std::unordered_map< ProcessingToken, std::unique_ptr< IClock > > m_token_clocks
Clock instances for each processing domain.
const SampleClock & get_clock() const
Gets the primary clock (audio domain for legacy compatibility)
Token-based multimodal task scheduling system for unified coroutine processing.
Definition Scheduler.hpp:51
void initialize()
Definition main.cpp:11
std::function< void(const std::vector< std::shared_ptr< Routine > > &, uint64_t)> token_processing_func_t
Function type for processing tasks in a specific token domain.
bool restart_task(const std::string &name)
Restarts a scheduled task.
Definition Chronie.cpp:107
bool cancel_task(const std::string &name)
Cancels a scheduled task.
Definition Chronie.cpp:102
std::shared_ptr< Routine > routine
Definition Scheduler.hpp:9
TaskEntry(std::shared_ptr< Routine > r, const std::string &n)
Definition Scheduler.hpp:12