MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferOperation.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "Capture.hpp"
4
7
8namespace MayaFlux {
9
10namespace Buffers {
11 class BufferProcessor;
12 class BufferManager;
13}
14
15namespace Kakshya {
16 class DynamicSoundStream;
17}
18
19namespace Kriya {
20
21 using TransformVectorFunction = std::function<Kakshya::DataVariant(std::vector<Kakshya::DataVariant>&, uint32_t)>;
22
23 /**
24 * @enum ExecutionStrategy
25 * @brief Defines how operations in a pipeline are coordinated and executed
26 */
27 enum class ExecutionStrategy : uint8_t {
28 /**
29 * PHASED: Traditional phased execution (default)
30 * - All CAPTURE operations complete first (capture phase)
31 * - Then all processing operations execute (process phase)
32 * - Best for: accumulation, windowed analysis, batch processing
33 * - Predictable data availability, clear phase boundaries
34 */
35 PHASED,
36
37 /**
38 * STREAMING: Immediate flow-through execution
39 * - Each capture iteration flows immediately through dependent operations
40 * - Minimal latency, data processed as it arrives
41 * - Best for: real-time effects, low-latency processing, modify_buffer chains
42 * - Natural for operations that modify state continuously
43 */
45
46 /**
47 * PARALLEL: Concurrent capture with synchronization
48 * - Multiple capture operations can run concurrently
49 * - Explicit synchronization points coordinate data flow
50 * - Best for: multi-source capture, independent data streams
51 * - Requires CycleCoordinator for proper synchronization
52 */
54
55 /**
56 * REACTIVE: Data-driven reactive execution
57 * - Operations execute when input data becomes available
58 * - Dynamic dependency resolution
59 * - Best for: event-driven workflows, complex dependencies
60 * - Non-deterministic execution order
61 */
63 };
64
65 class BufferPipeline;
66 class CycleCoordinator;
67
68 /**
69 * @class BufferOperation
70 * @brief Fundamental unit of operation in buffer processing pipelines.
71 *
72 * BufferOperation encapsulates discrete processing steps that can be composed
73 * into complex data flow pipelines. Each operation represents a specific action
74 * such as capturing data, transforming it, routing to destinations, or applying
75 * conditional logic. Operations are designed to be chainable and support
76 * sophisticated scheduling and priority management.
77 *
78 * **Operation Types:**
79 * - **CAPTURE**: Extract data from AudioBuffer using configurable capture strategies
80 * - **TRANSFORM**: Apply functional transformations to data variants
81 * - **ROUTE**: Direct data to AudioBuffer or DynamicSoundStream destinations
82 * - **LOAD**: Read data from containers into buffers with position control
83 * - **SYNC**: Coordinate timing and synchronization across pipeline stages
84 * - **CONDITION**: Apply conditional logic and branching to data flow
85 * - **DISPATCH**: Send data to external handlers and callback systems
86 * - **FUSE**: Combine multiple data sources using custom fusion functions
87 *
88 * **Example Usage:**
89 * ```cpp
90 * // Capture audio with windowed analysis
91 * auto capture_op = BufferOperation::capture_from(input_buffer)
92 * .with_window(512, 0.5f)
93 * .on_data_ready([](const auto& data, uint32_t cycle) {
94 * analyze_spectrum(data);
95 * });
96 *
97 * // Transform and route to output
98 * auto pipeline = BufferPipeline()
99 * >> capture_op
100 * >> BufferOperation::transform([](const auto& data, uint32_t cycle) {
101 * return apply_reverb(data);
102 * })
103 * >> BufferOperation::route_to_container(output_stream);
104 * ```
105 * @class BufferCapture
106 * ...existing intro...
107 *
108 * **Cycle Behavior:**
109 * The `for_cycles(N)` configuration controls how many times the capture operation
110 * executes within a single pipeline cycle. When a capture has `.for_cycles(20)`,
111 * the operation will capture 20 times sequentially, with each capture receiving
112 * incrementing cycle numbers (0, 1, 2... 19) and calling `on_data_ready()` for
113 * each iteration.
114 *
115 * This is distinct from pipeline-level cycle control:
116 * - `.for_cycles(20)` on capture → operation executes 20 times per pipeline cycle
117 * - `execute_scheduled(5, ...)` → pipeline runs 5 times total
118 * - Combined: 5 × 20 = 100 total capture executions
119 *
120 * **Example:**
121 * ```cpp
122 * auto pipeline = BufferPipeline::create(*scheduler);
123 * pipeline >> BufferOperation::capture_from(buffer)
124 * .for_cycles(10) // Capture 10 times per pipeline invocation
125 * .on_data_ready([](const auto& data, uint32_t cycle) {
126 * std::cout << "Capture #" << cycle << '\n'; // Prints 0-9
127 * });
128 * pipeline->execute_scheduled(3, 512); // Runs pipeline 3 times → 30 total captures
129 * ```
130 *
131 *
132 * @see BufferPipeline For pipeline construction and execution
133 * @see BufferCapture For flexible data capture strategies
134 * @see CycleCoordinator For cross-pipeline synchronization
135 */
136 class MAYAFLUX_API BufferOperation {
137 private:
138 enum class ExecutionPhase : uint8_t {
139 AUTO, // Automatically determined by operation type
140 CAPTURE, // Explicitly runs in capture phase
141 PROCESS // Explicitly runs in process phase
142 };
143
144 public:
145 /**
146 * @enum OpType
147 * @brief Defines the fundamental operation types in the processing pipeline.
148 */
149 enum class OpType : uint8_t {
150 CAPTURE, ///< Capture data from source buffer using BufferCapture strategy
151 TRANSFORM, ///< Apply transformation function to data variants
152 ROUTE, ///< Route data to destination (buffer or container)
153 LOAD, ///< Load data from container to buffer with position control
154 SYNC, ///< Synchronize with timing/cycles for coordination
155 CONDITION, ///< Conditional operation for branching logic
156 BRANCH, ///< Branch to sub-pipeline based on conditions
157 DISPATCH, ///< Dispatch to external handler for custom processing
158 FUSE, ///< Fuse multiple sources using custom fusion functions
159 MODIFY ///< Modify Buffer Data using custom quick process
160 };
161
162 /**
163 * @brief Create a capture operation using BufferCapture configuration.
164 * @param capture Configured BufferCapture with desired capture strategy
165 * @return BufferOperation configured for data capture
166 */
168 {
169 return { OpType::CAPTURE, std::move(capture) };
170 }
171
172 /**
173 * @brief Create capture operation from input channel using convenience API.
174 * Creates input buffer automatically and returns configured capture operation.
175 * @param buffer_manager System buffer manager
176 * @param input_channel Input channel to capture from
177 * @param mode Capture mode (default: ACCUMULATE)
178 * @param cycle_count Number of cycles (0 = continuous, default)
179 * @return BufferOperation configured for input capture with default settings
180 */
181 static BufferOperation capture_input(
182 const std::shared_ptr<Buffers::BufferManager>& buffer_manager,
183 uint32_t input_channel,
184 BufferCapture::CaptureMode mode = BufferCapture::CaptureMode::ACCUMULATE,
185 uint32_t cycle_count = 0);
186
187 /**
188 * @brief Create CaptureBuilder for input channel with fluent configuration.
189 * Uses the existing CaptureBuilder pattern but with input buffer creation.
190 * @param buffer_manager System buffer manager
191 * @param input_channel Input channel to capture from
192 * @return CaptureBuilder for fluent configuration
193 */
194 static CaptureBuilder capture_input_from(
195 const std::shared_ptr<Buffers::BufferManager>& buffer_manager,
196 uint32_t input_channel);
197
198 /**
199 * @brief Create a file capture operation that reads from file and stores in stream.
200 * @param filepath Path to audio file
201 * @param channel Channel index (default: 0)
202 * @param cycle_count Number of cycles to capture (0 = continuous)
203 * @return BufferOperation configured for file capture
204 */
205 static BufferOperation capture_file(
206 const std::string& filepath,
207 uint32_t channel = 0,
208 uint32_t cycle_count = 0);
209
210 /**
211 * @brief Create CaptureBuilder for file with fluent configuration.
212 * @param filepath Path to audio file
213 * @param channel Channel index (default: 0)
214 * @return CaptureBuilder for fluent configuration
215 */
216 static CaptureBuilder capture_file_from(
217 const std::string& filepath,
218 uint32_t channel = 0);
219
220 /**
221 * @brief Create operation to route file data to DynamicSoundStream.
222 * @param filepath Path to audio file
223 * @param target_stream Target DynamicSoundStream
224 * @param cycle_count Number of cycles to read (0 = entire file)
225 * @return BufferOperation configured for file to stream routing
226 */
227 static BufferOperation file_to_stream(
228 const std::string& filepath,
229 std::shared_ptr<Kakshya::DynamicSoundStream> target_stream,
230 uint32_t cycle_count = 0);
231
232 /**
233 * @brief Create a transform operation with custom transformation function.
234 * @param transformer Function that transforms DataVariant with cycle information
235 * @return BufferOperation configured for data transformation
236 */
237 static BufferOperation transform(TransformationFunction transformer);
238
239 /**
240 * @brief Create a routing operation to AudioBuffer destination.
241 * @param target Target AudioBuffer to receive data
242 * @return BufferOperation configured for buffer routing
243 */
244 static BufferOperation route_to_buffer(std::shared_ptr<Buffers::AudioBuffer> target);
245
246 /**
247 * @brief Create a routing operation to DynamicSoundStream destination.
248 * @param target Target container to receive data
249 * @return BufferOperation configured for container routing
250 */
251 static BufferOperation route_to_container(std::shared_ptr<Kakshya::DynamicSoundStream> target);
252
253 /**
254 * @brief Create a load operation from container to buffer.
255 * @param source Source container to read from
256 * @param target Target buffer to write to
257 * @param start_frame Starting frame position (default: 0)
258 * @param length Number of frames to load (default: 0 = all)
259 * @return BufferOperation configured for container loading
260 */
261 static BufferOperation load_from_container(std::shared_ptr<Kakshya::DynamicSoundStream> source,
262 std::shared_ptr<Buffers::AudioBuffer> target,
263 uint64_t start_frame = 0,
264 uint32_t length = 0);
265
266 /**
267 * @brief Create a conditional operation for pipeline branching.
268 * @param condition Function that returns true when condition is met
269 * @return BufferOperation configured for conditional execution
270 */
271 static BufferOperation when(std::function<bool(uint32_t)> condition);
272
273 /**
274 * @brief Create a dispatch operation for external processing.
275 * @param handler Function to handle data with cycle information
276 * @return BufferOperation configured for external dispatch
277 */
278 static BufferOperation dispatch_to(OperationFunction handler);
279
280 /**
281 * @brief Create a modify operation for direct buffer manipulation.
282 * @param buffer AudioBuffer to modify in-place
283 * @param modifier Function that modifies buffer data directly
284 * @return BufferOperation configured for buffer modification
285 *
286 * Unlike TRANSFORM which works on data copies, MODIFY attaches a processor
287 * to the buffer that modifies it in-place during buffer processing.
288 * The processor is automatically managed based on pipeline lifecycle.
289 */
290 static BufferOperation modify_buffer(
291 std::shared_ptr<Buffers::AudioBuffer> buffer,
293
294 /**
295 * @brief Create a fusion operation for multiple AudioBuffer sources.
296 * @param sources Vector of source buffers to fuse
297 * @param fusion_func Function that combines multiple DataVariants
298 * @param target Target buffer for fused result
299 * @return BufferOperation configured for buffer fusion
300 */
301 static BufferOperation fuse_data(std::vector<std::shared_ptr<Buffers::AudioBuffer>> sources,
302 TransformVectorFunction fusion_func,
303 std::shared_ptr<Buffers::AudioBuffer> target);
304
305 /**
306 * @brief Create a fusion operation for multiple DynamicSoundStream sources.
307 * @param sources Vector of source containers to fuse
308 * @param fusion_func Function that combines multiple DataVariants
309 * @param target Target container for fused result
310 * @return BufferOperation configured for container fusion
311 */
312 static BufferOperation fuse_containers(std::vector<std::shared_ptr<Kakshya::DynamicSoundStream>> sources,
313 TransformVectorFunction fusion_func,
314 std::shared_ptr<Kakshya::DynamicSoundStream> target);
315
316 /**
317 * @brief Create a CaptureBuilder for fluent capture configuration.
318 * @param buffer AudioBuffer to capture from (must be registered with BufferManager if using AUTOMATIC processing)
319 * @return CaptureBuilder for fluent operation construction
320 *
321 * @note If the buffer uses ProcessingControl::AUTOMATIC, ensure it's registered with
322 * the BufferManager via add_audio_buffer() before pipeline execution.
323 */
324 static CaptureBuilder capture_from(std::shared_ptr<Buffers::AudioBuffer> buffer);
325
326 /**
327 * @brief Set execution priority for scheduler ordering.
328 * @param priority Priority value (0=highest, 255=lowest, default=128)
329 * @return Reference to this operation for chaining
330 */
331 BufferOperation& with_priority(uint8_t priority);
332
333 /**
334 * @brief Set processing token for execution context.
335 * @param token Processing token indicating execution context
336 * @return Reference to this operation for chaining
337 */
339
340 /**
341 * @brief Set cycle interval for periodic execution.
342 * @param n Execute every n cycles (default: 1)
343 * @return Reference to this operation for chaining
344 */
345 BufferOperation& every_n_cycles(uint32_t n);
346
347 /**
348 * @brief Assign identification tag.
349 * @param tag String identifier for debugging and organization
350 * @return Reference to this operation for chaining
351 */
352 BufferOperation& with_tag(const std::string& tag);
353
354 BufferOperation& for_cycles(uint32_t count);
355
356 /**
357 * @brief Getters for internal state (read-only)
358 */
359 inline OpType get_type() const { return m_type; }
360
361 /**
362 * @brief Getters for internal state (read-only)
363 */
364 inline uint8_t get_priority() const { return m_priority; }
365
366 /**
367 * @brief Getters for processing token
368 */
369 inline Buffers::ProcessingToken get_token() const { return m_token; }
370
371 /**
372 * @brief Getters for user defined tag
373 */
374 inline const std::string& get_tag() const { return m_tag; }
375
376 /**
377 * @brief Hint that this operation should execute in capture phase
378 * @return Reference to this operation for chaining
379 */
381 {
382 m_execution_phase = ExecutionPhase::CAPTURE;
383 return *this;
384 }
385
386 /**
387 * @brief Hint that this operation should execute in process phase
388 * @return Reference to this operation for chaining
389 */
390 BufferOperation& as_process_phase();
391
392 /**
393 * @brief Mark this operation as streaming (executes continuously)
394 * Useful for modify_buffer and similar stateful operations
395 * @return Reference to this operation for chaining
396 */
397 BufferOperation& as_streaming();
398
399 /**
400 * @brief Check if this operation is a streaming operation
401 */
402 inline bool is_streaming() const { return m_is_streaming; }
403
404 /**
405 * @brief Get the execution phase hint for this operation
406 */
407 ExecutionPhase get_execution_phase() const { return m_execution_phase; }
408
409 BufferOperation(OpType type, BufferCapture capture);
410
411 explicit BufferOperation(OpType type);
412
413 static bool is_capture_phase_operation(const BufferOperation& op);
414
415 static bool is_process_phase_operation(const BufferOperation& op);
416
417 private:
418 ExecutionPhase m_execution_phase { ExecutionPhase::AUTO };
421 uint32_t m_modify_cycle_count {};
422 bool m_is_streaming {};
423
426
427 std::shared_ptr<Buffers::AudioBuffer> m_target_buffer;
428 std::shared_ptr<Kakshya::DynamicSoundStream> m_target_container;
429
430 std::shared_ptr<Buffers::BufferProcessor> m_attached_processor;
431
432 std::shared_ptr<Kakshya::DynamicSoundStream> m_source_container;
433 uint64_t m_start_frame {};
434 uint32_t m_load_length {};
435
436 std::function<bool(uint32_t)> m_condition;
438
439 std::vector<std::shared_ptr<Buffers::AudioBuffer>> m_source_buffers;
440 std::vector<std::shared_ptr<Kakshya::DynamicSoundStream>> m_source_containers;
442
443 uint8_t m_priority = 128;
444 Buffers::ProcessingToken m_token = Buffers::ProcessingToken::AUDIO_BACKEND;
445 uint32_t m_cycle_interval = 1;
446 std::string m_tag;
447
448 friend class BufferPipeline;
449 };
450
451} // namespace Kriya
452
453} // namespace MayaFlux
static MayaFlux::Nodes::ProcessingToken token
Definition Timers.cpp:8
CaptureMode
Defines the data capture and retention strategy.
Definition Capture.hpp:58
Flexible data capture interface for buffer-based processing pipelines.
Definition Capture.hpp:52
std::function< bool(uint32_t)> m_condition
std::vector< std::shared_ptr< Buffers::AudioBuffer > > m_source_buffers
uint8_t get_priority() const
Getters for internal state (read-only)
std::shared_ptr< Buffers::AudioBuffer > m_target_buffer
std::vector< std::shared_ptr< Kakshya::DynamicSoundStream > > m_source_containers
Buffers::BufferProcessingFunction m_buffer_modifier
std::shared_ptr< Kakshya::DynamicSoundStream > m_target_container
Buffers::ProcessingToken get_token() const
Getters for processing token.
static BufferOperation capture(BufferCapture capture)
Create a capture operation using BufferCapture configuration.
bool is_streaming() const
Check if this operation is a streaming operation.
TransformVectorFunction m_fusion_function
std::shared_ptr< Buffers::BufferProcessor > m_attached_processor
OpType get_type() const
Getters for internal state (read-only)
const std::string & get_tag() const
Getters for user defined tag.
std::shared_ptr< Kakshya::DynamicSoundStream > m_source_container
TransformationFunction m_transformer
OpType
Defines the fundamental operation types in the processing pipeline.
BufferOperation & as_capture_phase()
Hint that this operation should execute in capture phase.
ExecutionPhase get_execution_phase() const
Get the execution phase hint for this operation.
Fundamental unit of operation in buffer processing pipelines.
Coroutine-based execution engine for composable, multi-strategy buffer processing.
Fluent builder interface for constructing BufferCapture configurations.
Definition Capture.hpp:231
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
std::variant< AudioProcessingFunction, GraphicsProcessingFunction > BufferProcessingFunction
@ Buffers
Buffers, Managers, processors and processing chains.
@ Kakshya
Containers[Signalsource, Stream, File], Regions, DataProcessors.
@ Kriya
Automatable tasks and fluent scheduling api for Nodes and Buffers.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
Definition NDData.hpp:73
std::function< Kakshya::DataVariant(std::vector< Kakshya::DataVariant > &, uint32_t)> TransformVectorFunction
ExecutionStrategy
Defines how operations in a pipeline are coordinated and executed.
@ PHASED
PHASED: Traditional phased execution (default)
@ STREAMING
STREAMING: Immediate flow-through execution.
@ PARALLEL
PARALLEL: Concurrent capture with synchronization.
@ REACTIVE
REACTIVE: Data-driven reactive execution.
std::function< Kakshya::DataVariant(Kakshya::DataVariant &, uint32_t)> TransformationFunction
Definition Capture.hpp:16
std::function< void(Kakshya::DataVariant &, uint32_t)> OperationFunction
Definition Capture.hpp:15
Main namespace for the Maya Flux audio engine.
Definition LiveAid.hpp:6