MayaFlux 0.3.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferOperation.hpp
Go to the documentation of this file.
1#pragma once
2
3#include "Capture.hpp"
4
7
8namespace MayaFlux {
9
10namespace Buffers {
11 class BufferProcessor;
12 class BufferManager;
13}
14
15namespace Kakshya {
16 class DynamicSoundStream;
17}
18
19namespace IO {
20 class IOManager;
21}
22
23namespace Kriya {
24
25 using TransformVectorFunction = std::function<Kakshya::DataVariant(std::vector<Kakshya::DataVariant>&, uint32_t)>;
26
27 /**
28 * @enum ExecutionStrategy
29 * @brief Defines how operations in a pipeline are coordinated and executed
30 */
31 enum class ExecutionStrategy : uint8_t {
32 /**
33 * PHASED: Traditional phased execution (default)
34 * - All CAPTURE operations complete first (capture phase)
35 * - Then all processing operations execute (process phase)
36 * - Best for: accumulation, windowed analysis, batch processing
37 * - Predictable data availability, clear phase boundaries
38 */
39 PHASED,
40
41 /**
42 * STREAMING: Immediate flow-through execution
43 * - Each capture iteration flows immediately through dependent operations
44 * - Minimal latency, data processed as it arrives
45 * - Best for: real-time effects, low-latency processing, modify_buffer chains
46 * - Natural for operations that modify state continuously
47 */
49
50 /**
51 * PARALLEL: Concurrent capture with synchronization
52 * - Multiple capture operations can run concurrently
53 * - Explicit synchronization points coordinate data flow
54 * - Best for: multi-source capture, independent data streams
55 * - Requires CycleCoordinator for proper synchronization
56 */
58
59 /**
60 * REACTIVE: Data-driven reactive execution
61 * - Operations execute when input data becomes available
62 * - Dynamic dependency resolution
63 * - Best for: event-driven workflows, complex dependencies
64 * - Non-deterministic execution order
65 */
67 };
68
69 class BufferPipeline;
70 class CycleCoordinator;
71
72 /**
73 * @class BufferOperation
74 * @brief Fundamental unit of operation in buffer processing pipelines.
75 *
76 * BufferOperation encapsulates discrete processing steps that can be composed
77 * into complex data flow pipelines. Each operation represents a specific action
78 * such as capturing data, transforming it, routing to destinations, or applying
79 * conditional logic. Operations are designed to be chainable and support
80 * sophisticated scheduling and priority management.
81 *
82 * **Operation Types:**
83 * - **CAPTURE**: Extract data from AudioBuffer using configurable capture strategies
84 * - **TRANSFORM**: Apply functional transformations to data variants
85 * - **ROUTE**: Direct data to AudioBuffer or DynamicSoundStream destinations
86 * - **LOAD**: Read data from containers into buffers with position control
87 * - **SYNC**: Coordinate timing and synchronization across pipeline stages
88 * - **CONDITION**: Apply conditional logic and branching to data flow
89 * - **DISPATCH**: Send data to external handlers and callback systems
90 * - **FUSE**: Combine multiple data sources using custom fusion functions
91 *
92 * **Example Usage:**
93 * ```cpp
94 * // Capture audio with windowed analysis
95 * auto capture_op = BufferOperation::capture_from(input_buffer)
96 * .with_window(512, 0.5f)
97 * .on_data_ready([](const auto& data, uint32_t cycle) {
98 * analyze_spectrum(data);
99 * });
100 *
101 * // Transform and route to output
102 * auto pipeline = BufferPipeline()
103 * >> capture_op
104 * >> BufferOperation::transform([](const auto& data, uint32_t cycle) {
105 * return apply_reverb(data);
106 * })
107 * >> BufferOperation::route_to_container(output_stream);
108 * ```
109 * @class BufferCapture
110 * ...existing intro...
111 *
112 * **Cycle Behavior:**
113 * The `for_cycles(N)` configuration controls how many times the capture operation
114 * executes within a single pipeline cycle. When a capture has `.for_cycles(20)`,
115 * the operation will capture 20 times sequentially, with each capture receiving
116 * incrementing cycle numbers (0, 1, 2... 19) and calling `on_data_ready()` for
117 * each iteration.
118 *
119 * This is distinct from pipeline-level cycle control:
120 * - `.for_cycles(20)` on capture → operation executes 20 times per pipeline cycle
121 * - `execute_scheduled(5, ...)` → pipeline runs 5 times total
122 * - Combined: 5 × 20 = 100 total capture executions
123 *
124 * **Example:**
125 * ```cpp
126 * auto pipeline = BufferPipeline::create(*scheduler);
127 * pipeline >> BufferOperation::capture_from(buffer)
128 * .for_cycles(10) // Capture 10 times per pipeline invocation
129 * .on_data_ready([](const auto& data, uint32_t cycle) {
130 * std::cout << "Capture #" << cycle << '\n'; // Prints 0-9
131 * });
132 * pipeline->execute_scheduled(3, 512); // Runs pipeline 3 times → 30 total captures
133 * ```
134 *
135 *
136 * @see BufferPipeline For pipeline construction and execution
137 * @see BufferCapture For flexible data capture strategies
138 * @see CycleCoordinator For cross-pipeline synchronization
139 */
140 class MAYAFLUX_API BufferOperation {
141 private:
142 enum class ExecutionPhase : uint8_t {
143 AUTO, // Automatically determined by operation type
144 CAPTURE, // Explicitly runs in capture phase
145 PROCESS // Explicitly runs in process phase
146 };
147
148 public:
149 /**
150 * @enum OpType
151 * @brief Defines the fundamental operation types in the processing pipeline.
152 */
153 enum class OpType : uint8_t {
154 CAPTURE, ///< Capture data from source buffer using BufferCapture strategy
155 TRANSFORM, ///< Apply transformation function to data variants
156 ROUTE, ///< Route data to destination (buffer or container)
157 LOAD, ///< Load data from container to buffer with position control
158 SYNC, ///< Synchronize with timing/cycles for coordination
159 CONDITION, ///< Conditional operation for branching logic
160 BRANCH, ///< Branch to sub-pipeline based on conditions
161 DISPATCH, ///< Dispatch to external handler for custom processing
162 FUSE, ///< Fuse multiple sources using custom fusion functions
163 MODIFY ///< Modify Buffer Data using custom quick process
164 };
165
166 /**
167 * @brief Create a capture operation using BufferCapture configuration.
168 * @param capture Configured BufferCapture with desired capture strategy
169 * @return BufferOperation configured for data capture
170 */
172 {
173 return { OpType::CAPTURE, std::move(capture) };
174 }
175
176 /**
177 * @brief Create capture operation from input channel using convenience API.
178 * Creates input buffer automatically and returns configured capture operation.
179 * @param buffer_manager System buffer manager
180 * @param input_channel Input channel to capture from
181 * @param mode Capture mode (default: ACCUMULATE)
182 * @param cycle_count Number of cycles (0 = continuous, default)
183 * @return BufferOperation configured for input capture with default settings
184 */
185 static BufferOperation capture_input(
186 const std::shared_ptr<Buffers::BufferManager>& buffer_manager,
187 uint32_t input_channel,
188 BufferCapture::CaptureMode mode = BufferCapture::CaptureMode::ACCUMULATE,
189 uint32_t cycle_count = 0);
190
191 /**
192 * @brief Create CaptureBuilder for input channel with fluent configuration.
193 * Uses the existing CaptureBuilder pattern but with input buffer creation.
194 * @param buffer_manager System buffer manager
195 * @param input_channel Input channel to capture from
196 * @return CaptureBuilder for fluent configuration
197 */
198 static CaptureBuilder capture_input_from(
199 const std::shared_ptr<Buffers::BufferManager>& buffer_manager,
200 uint32_t input_channel);
201
202 /**
203 * @brief Create a file capture operation that reads from file and stores in stream.
204 * @param io_manager IOManager for file loading
205 * @param filepath Path to audio file
206 * @param channel Channel index (default: 0)
207 * @param cycle_count Number of cycles to capture (0 = continuous)
208 * @return BufferOperation configured for file capture
209 */
210 static BufferOperation capture_file(
211 const std::shared_ptr<IO::IOManager>& io_manager,
212 const std::string& filepath,
213 uint32_t channel = 0,
214 uint32_t cycle_count = 0);
215
216 /**
217 * @brief Create CaptureBuilder for file with fluent configuration.
218 * @param io_manager IOManager for file loading
219 * @param filepath Path to audio file
220 * @param channel Channel index (default: 0)
221 * @return CaptureBuilder for fluent configuration
222 */
223 static CaptureBuilder capture_file_from(
224 const std::shared_ptr<IO::IOManager>& io_manager,
225 const std::string& filepath,
226 uint32_t channel = 0);
227
228 /**
229 * @brief Create operation to route file data to DynamicSoundStream.
230 * @param io_manager IOManager for file loading
231 * @param filepath Path to audio file
232 * @param target_stream Target DynamicSoundStream
233 * @param cycle_count Number of cycles to read (0 = entire file)
234 * @return BufferOperation configured for file to stream routing
235 */
236 static BufferOperation file_to_stream(
237 const std::shared_ptr<IO::IOManager>& io_manager,
238 const std::string& filepath,
239 std::shared_ptr<Kakshya::DynamicSoundStream> target_stream,
240 uint32_t cycle_count = 0);
241
242 /**
243 * @brief Create a transform operation with custom transformation function.
244 * @param transformer Function that transforms DataVariant with cycle information
245 * @return BufferOperation configured for data transformation
246 */
247 static BufferOperation transform(TransformationFunction transformer);
248
249 /**
250 * @brief Create a routing operation to AudioBuffer destination.
251 * @param target Target AudioBuffer to receive data
252 * @return BufferOperation configured for buffer routing
253 */
254 static BufferOperation route_to_buffer(std::shared_ptr<Buffers::AudioBuffer> target);
255
256 /**
257 * @brief Create a routing operation to DynamicSoundStream destination.
258 * @param target Target container to receive data
259 * @return BufferOperation configured for container routing
260 */
261 static BufferOperation route_to_container(std::shared_ptr<Kakshya::DynamicSoundStream> target);
262
263 /**
264 * @brief Create a load operation from container to buffer.
265 * @param source Source container to read from
266 * @param target Target buffer to write to
267 * @param start_frame Starting frame position (default: 0)
268 * @param length Number of frames to load (default: 0 = all)
269 * @return BufferOperation configured for container loading
270 */
271 static BufferOperation load_from_container(std::shared_ptr<Kakshya::DynamicSoundStream> source,
272 std::shared_ptr<Buffers::AudioBuffer> target,
273 uint64_t start_frame = 0,
274 uint32_t length = 0);
275
276 /**
277 * @brief Create a conditional operation for pipeline branching.
278 * @param condition Function that returns true when condition is met
279 * @return BufferOperation configured for conditional execution
280 */
281 static BufferOperation when(std::function<bool(uint32_t)> condition);
282
283 /**
284 * @brief Create a dispatch operation for external processing.
285 * @param handler Function to handle data with cycle information
286 * @return BufferOperation configured for external dispatch
287 */
288 static BufferOperation dispatch_to(OperationFunction handler);
289
290 /**
291 * @brief Create a modify operation for direct buffer manipulation.
292 * @param buffer AudioBuffer to modify in-place
293 * @param modifier Function that modifies buffer data directly
294 * @return BufferOperation configured for buffer modification
295 *
296 * Unlike TRANSFORM which works on data copies, MODIFY attaches a processor
297 * to the buffer that modifies it in-place during buffer processing.
298 * The processor is automatically managed based on pipeline lifecycle.
299 */
300 static BufferOperation modify_buffer(
301 std::shared_ptr<Buffers::AudioBuffer> buffer,
303
304 /**
305 * @brief Create a fusion operation for multiple AudioBuffer sources.
306 * @param sources Vector of source buffers to fuse
307 * @param fusion_func Function that combines multiple DataVariants
308 * @param target Target buffer for fused result
309 * @return BufferOperation configured for buffer fusion
310 */
311 static BufferOperation fuse_data(std::vector<std::shared_ptr<Buffers::AudioBuffer>> sources,
312 TransformVectorFunction fusion_func,
313 std::shared_ptr<Buffers::AudioBuffer> target);
314
315 /**
316 * @brief Create a fusion operation for multiple DynamicSoundStream sources.
317 * @param sources Vector of source containers to fuse
318 * @param fusion_func Function that combines multiple DataVariants
319 * @param target Target container for fused result
320 * @return BufferOperation configured for container fusion
321 */
322 static BufferOperation fuse_containers(std::vector<std::shared_ptr<Kakshya::DynamicSoundStream>> sources,
323 TransformVectorFunction fusion_func,
324 std::shared_ptr<Kakshya::DynamicSoundStream> target);
325
326 /**
327 * @brief Create a CaptureBuilder for fluent capture configuration.
328 * @param buffer AudioBuffer to capture from (must be registered with BufferManager if using AUTOMATIC processing)
329 * @return CaptureBuilder for fluent operation construction
330 *
331 * @note If the buffer uses ProcessingControl::AUTOMATIC, ensure it's registered with
332 * the BufferManager via add_audio_buffer() before pipeline execution.
333 */
334 static CaptureBuilder capture_from(std::shared_ptr<Buffers::AudioBuffer> buffer);
335
336 /**
337 * @brief Set execution priority for scheduler ordering.
338 * @param priority Priority value (0=highest, 255=lowest, default=128)
339 * @return Reference to this operation for chaining
340 */
341 BufferOperation& with_priority(uint8_t priority);
342
343 /**
344 * @brief Set processing token for execution context.
345 * @param token Processing token indicating execution context
346 * @return Reference to this operation for chaining
347 */
349
350 /**
351 * @brief Set cycle interval for periodic execution.
352 * @param n Execute every n cycles (default: 1)
353 * @return Reference to this operation for chaining
354 */
355 BufferOperation& every_n_cycles(uint32_t n);
356
357 /**
358 * @brief Assign identification tag.
359 * @param tag String identifier for debugging and organization
360 * @return Reference to this operation for chaining
361 */
362 BufferOperation& with_tag(const std::string& tag);
363
364 BufferOperation& for_cycles(uint32_t count);
365
366 /**
367 * @brief Getters for internal state (read-only)
368 */
369 inline OpType get_type() const { return m_type; }
370
371 /**
372 * @brief Getters for internal state (read-only)
373 */
374 inline uint8_t get_priority() const { return m_priority; }
375
376 /**
377 * @brief Getters for processing token
378 */
379 inline Buffers::ProcessingToken get_token() const { return m_token; }
380
381 /**
382 * @brief Getters for user defined tag
383 */
384 inline const std::string& get_tag() const { return m_tag; }
385
386 /**
387 * @brief Hint that this operation should execute in capture phase
388 * @return Reference to this operation for chaining
389 */
391 {
392 m_execution_phase = ExecutionPhase::CAPTURE;
393 return *this;
394 }
395
396 /**
397 * @brief Hint that this operation should execute in process phase
398 * @return Reference to this operation for chaining
399 */
400 BufferOperation& as_process_phase();
401
402 /**
403 * @brief Mark this operation as streaming (executes continuously)
404 * Useful for modify_buffer and similar stateful operations
405 * @return Reference to this operation for chaining
406 */
407 BufferOperation& as_streaming();
408
409 /**
410 * @brief Check if this operation is a streaming operation
411 */
412 inline bool is_streaming() const { return m_is_streaming; }
413
414 /**
415 * @brief Get the execution phase hint for this operation
416 */
417 ExecutionPhase get_execution_phase() const { return m_execution_phase; }
418
419 BufferOperation(OpType type, BufferCapture capture);
420
421 explicit BufferOperation(OpType type);
422
423 static bool is_capture_phase_operation(const BufferOperation& op);
424
425 static bool is_process_phase_operation(const BufferOperation& op);
426
427 private:
428 ExecutionPhase m_execution_phase { ExecutionPhase::AUTO };
431 uint32_t m_modify_cycle_count {};
432 bool m_is_streaming {};
433
436
437 std::shared_ptr<Buffers::AudioBuffer> m_target_buffer;
438 std::shared_ptr<Kakshya::DynamicSoundStream> m_target_container;
439
440 std::shared_ptr<Buffers::BufferProcessor> m_attached_processor;
441
442 std::shared_ptr<Kakshya::DynamicSoundStream> m_source_container;
443 uint64_t m_start_frame {};
444 uint32_t m_load_length {};
445
446 std::function<bool(uint32_t)> m_condition;
448
449 std::vector<std::shared_ptr<Buffers::AudioBuffer>> m_source_buffers;
450 std::vector<std::shared_ptr<Kakshya::DynamicSoundStream>> m_source_containers;
452
453 uint8_t m_priority = 128;
454 Buffers::ProcessingToken m_token = Buffers::ProcessingToken::AUDIO_BACKEND;
455 uint32_t m_cycle_interval = 1;
456 std::string m_tag;
457
458 friend class BufferPipeline;
459 };
460
461} // namespace Kriya
462
463} // namespace MayaFlux
Eigen::Index count
CaptureMode
Defines the data capture and retention strategy.
Definition Capture.hpp:58
Flexible data capture interface for buffer-based processing pipelines.
Definition Capture.hpp:52
std::function< bool(uint32_t)> m_condition
std::vector< std::shared_ptr< Buffers::AudioBuffer > > m_source_buffers
Buffers::AudioProcessingFunction m_buffer_modifier
uint8_t get_priority() const
Getters for internal state (read-only)
std::shared_ptr< Buffers::AudioBuffer > m_target_buffer
std::vector< std::shared_ptr< Kakshya::DynamicSoundStream > > m_source_containers
std::shared_ptr< Kakshya::DynamicSoundStream > m_target_container
Buffers::ProcessingToken get_token() const
Getters for processing token.
static BufferOperation capture(BufferCapture capture)
Create a capture operation using BufferCapture configuration.
bool is_streaming() const
Check if this operation is a streaming operation.
TransformVectorFunction m_fusion_function
std::shared_ptr< Buffers::BufferProcessor > m_attached_processor
OpType get_type() const
Getters for internal state (read-only)
const std::string & get_tag() const
Getters for user defined tag.
std::shared_ptr< Kakshya::DynamicSoundStream > m_source_container
TransformationFunction m_transformer
OpType
Defines the fundamental operation types in the processing pipeline.
BufferOperation & as_capture_phase()
Hint that this operation should execute in capture phase.
ExecutionPhase get_execution_phase() const
Get the execution phase hint for this operation.
Fundamental unit of operation in buffer processing pipelines.
Coroutine-based execution engine for composable, multi-strategy buffer processing.
Fluent builder interface for constructing BufferCapture configurations.
Definition Capture.hpp:231
std::function< void(const std::shared_ptr< AudioBuffer > &)> AudioProcessingFunction
Audio processing function - receives correctly-typed AudioBuffer.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
@ Buffers
Buffers, Managers, processors and processing chains.
@ Kakshya
Containers[Signalsource, Stream, File], Regions, DataProcessors.
@ IO
Networking, file handling, streaming.
@ Kriya
Automatable tasks and fluent scheduling api for Nodes and Buffers.
std::variant< std::vector< double >, std::vector< float >, std::vector< uint8_t >, std::vector< uint16_t >, std::vector< uint32_t >, std::vector< std::complex< float > >, std::vector< std::complex< double > >, std::vector< glm::vec2 >, std::vector< glm::vec3 >, std::vector< glm::vec4 >, std::vector< glm::mat4 > > DataVariant
Multi-type data storage for different precision needs.
Definition NDData.hpp:73
std::function< Kakshya::DataVariant(std::vector< Kakshya::DataVariant > &, uint32_t)> TransformVectorFunction
ExecutionStrategy
Defines how operations in a pipeline are coordinated and executed.
@ PHASED
PHASED: Traditional phased execution (default)
@ STREAMING
STREAMING: Immediate flow-through execution.
@ PARALLEL
PARALLEL: Concurrent capture with synchronization.
@ REACTIVE
REACTIVE: Data-driven reactive execution.
std::function< Kakshya::DataVariant(Kakshya::DataVariant &, uint32_t)> TransformationFunction
Definition Capture.hpp:16
std::function< void(Kakshya::DataVariant &, uint32_t)> OperationFunction
Definition Capture.hpp:15
Main namespace for the Maya Flux audio engine.
Definition LiveAid.hpp:6