MayaFlux 0.2.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferProcessingChain.cpp
Go to the documentation of this file.
2
3#include "Buffer.hpp"
4
6
7namespace MayaFlux::Buffers {
8
9bool BufferProcessingChain::add_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason)
10{
11 if (m_is_processing.load(std::memory_order_acquire) || processor->m_active_processing.load(std::memory_order_acquire) > 0) {
12 return queue_pending_processor_op(processor, buffer, true, rejection_reason);
13 }
14
15 return add_processor_direct(processor, buffer, rejection_reason);
16}
17
18bool BufferProcessingChain::add_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason)
19{
20 auto processor_token = processor->get_processing_token();
21
22 switch (m_enforcement_strategy) {
24 if (processor_token != m_token_filter_mask) {
25 if (rejection_reason) {
26 *rejection_reason = "Processor token (" + std::to_string(static_cast<uint32_t>(processor_token)) + ") does not exactly match chain's preferred token (" + std::to_string(static_cast<uint32_t>(m_token_filter_mask)) + ") in STRICT mode";
27 }
28 return false;
29 }
30 break;
31
33 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
34 if (rejection_reason) {
35 *rejection_reason = "Processor token (" + std::to_string(static_cast<uint32_t>(processor_token)) + ") is not compatible with chain's preferred token (" + std::to_string(static_cast<uint32_t>(m_token_filter_mask)) + ") in FILTERED mode";
36 }
37 return false;
38 }
39 break;
40
42 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
43 m_conditional_processors[buffer].insert(processor);
44 }
45 break;
46
48 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
49 m_pending_removal[buffer].insert(processor);
50 }
51 break;
52
54 break;
55 }
56
57 auto& processors = m_buffer_processors[buffer];
58
59 auto it = std::ranges::find(processors, processor);
60
61 if (it != processors.end()) {
62 if (rejection_reason) {
63 *rejection_reason = "Processor already exists in chain for this buffer";
64 }
65 return false;
66 }
67
68 processors.push_back(processor);
69 processor->on_attach(buffer);
70
71 return true;
72}
73
74void BufferProcessingChain::remove_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
75{
76 if (m_is_processing.load(std::memory_order_acquire)) {
77 queue_pending_processor_op(processor, buffer, false);
78 return;
79 }
80
81 remove_processor_direct(processor, buffer);
82}
83
84void BufferProcessingChain::remove_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
85{
86 auto& processors = m_buffer_processors[buffer];
87 auto it = std::ranges::find(processors, processor);
88
89 if (it != processors.end()) {
90 processor->on_detach(buffer);
91 processors.erase(it);
92
93 m_conditional_processors[buffer].erase(processor);
94 m_pending_removal[buffer].erase(processor);
95 }
96}
97
98void BufferProcessingChain::process_non_owning(const std::shared_ptr<Buffer>& buffer)
99{
100 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
102 }
103
104 auto it = m_buffer_processors.find(buffer);
105 if (it == m_buffer_processors.end() || it->second.empty()) {
106 return;
107 }
108
109 for (auto& processor : it->second) {
110 bool should_process = true;
111
113 auto processor_token = processor->get_processing_token();
114 should_process = are_tokens_compatible(m_token_filter_mask, processor_token);
115 }
116
117 if (should_process) {
118 processor->process_non_owning(buffer);
119 }
120 }
121
124 }
125}
126
127void BufferProcessingChain::process(const std::shared_ptr<Buffer>& buffer)
128{
129 bool expected = false;
130 if (!m_is_processing.compare_exchange_strong(expected, true,
131 std::memory_order_acquire, std::memory_order_relaxed)) {
132 return;
133 }
134
135 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
137 }
138
139 auto it = m_buffer_processors.find(buffer);
140 if (it == m_buffer_processors.end() || it->second.empty()) {
141 m_is_processing.store(false, std::memory_order_release);
142 return;
143 }
144
145 for (auto& processor : it->second) {
146 bool should_process = true;
147
149 auto processor_token = processor->get_processing_token();
150 should_process = are_tokens_compatible(m_token_filter_mask, processor_token);
151 }
152
153 if (should_process) {
154 processor->process(buffer);
155 }
156 }
157
160 }
161
162 m_is_processing.store(false, std::memory_order_release);
163}
164
165void BufferProcessingChain::process_complete(const std::shared_ptr<Buffer>& buffer)
166{
167 preprocess(buffer);
168 process(buffer);
169 postprocess(buffer);
170 process_final(buffer);
171}
172
174{
175 for (auto& m_pending_op : m_pending_ops) {
176 if (m_pending_op.active.load(std::memory_order_acquire)) {
177 auto& op = m_pending_op;
178
179 if (op.is_addition) {
180 add_processor_direct(op.processor, op.buffer);
181 } else {
182 remove_processor_direct(op.processor, op.buffer);
183 }
184
185 op.processor.reset();
186 op.buffer.reset();
187 op.active.store(false, std::memory_order_release);
188 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
189 }
190 }
191}
192
193void BufferProcessingChain::add_preprocessor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
194{
195 processor->on_attach(buffer);
196 m_preprocessors[buffer] = processor;
197}
198
199void BufferProcessingChain::add_postprocessor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
200{
201 processor->on_attach(buffer);
202 m_postprocessors[buffer] = processor;
203}
204
205void BufferProcessingChain::add_final_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
206{
207 processor->on_attach(buffer);
208 m_final_processors[buffer] = processor;
209}
210
211bool BufferProcessingChain::has_processors(const std::shared_ptr<Buffer>& buffer) const
212{
213 auto it = m_buffer_processors.find(buffer);
214 return it != m_buffer_processors.end() && !it->second.empty();
215}
216
217const std::vector<std::shared_ptr<BufferProcessor>>& BufferProcessingChain::get_processors(const std::shared_ptr<Buffer>& buffer) const
218{
219 static const std::vector<std::shared_ptr<BufferProcessor>> empty_vector;
220
221 auto it = m_buffer_processors.find(buffer);
222 if (it != m_buffer_processors.end()) {
223 return it->second;
224 }
225
226 return empty_vector;
227}
228
229void BufferProcessingChain::preprocess(const std::shared_ptr<Buffer>& buffer)
230{
231 auto pre_it = m_preprocessors.find(buffer);
232 if (pre_it != m_preprocessors.end()) {
233 pre_it->second->process(buffer);
234 }
235}
236
237void BufferProcessingChain::postprocess(const std::shared_ptr<Buffer>& buffer)
238{
239 auto post_it = m_postprocessors.find(buffer);
240 if (post_it != m_postprocessors.end()) {
241 post_it->second->process(buffer);
242 }
243}
244
245void BufferProcessingChain::process_final(const std::shared_ptr<Buffer>& buffer)
246{
247 auto final_it = m_final_processors.find(buffer);
248 if (final_it != m_final_processors.end()) {
249 final_it->second->process(buffer);
250 }
251}
252
253void BufferProcessingChain::merge_chain(const std::shared_ptr<BufferProcessingChain>& other)
254{
255 for (const auto& [buffer, processors] : other->get_chain()) {
256 if (!m_buffer_processors.count(buffer)) {
257 m_buffer_processors.try_emplace(buffer, processors);
258 } else {
259 auto& target_processors = m_buffer_processors[buffer];
260 target_processors.reserve(target_processors.size() + processors.size());
261
262 for (const auto& processor : processors) {
263 if (std::ranges::find(target_processors, processor) == target_processors.end()) {
264 target_processors.push_back(processor);
265 }
266 }
267 }
268 }
269}
270
271void BufferProcessingChain::optimize_for_tokens(const std::shared_ptr<Buffer>& buffer)
272{
273 auto& processors = m_buffer_processors[buffer];
274 if (processors.empty()) {
275 return;
276 }
277
278 std::vector<std::shared_ptr<BufferProcessor>> compatible_processors;
279 std::vector<std::shared_ptr<BufferProcessor>> incompatible_processors;
280
281 for (auto& processor : processors) {
282 auto processor_token = processor->get_processing_token();
283 if (are_tokens_compatible(m_token_filter_mask, processor_token)) {
284 compatible_processors.push_back(processor);
285 } else {
286 incompatible_processors.push_back(processor);
287 }
288 }
289
290 processors.clear();
291
292 processors.insert(processors.end(), compatible_processors.begin(), compatible_processors.end());
293
295 processors.insert(processors.end(), incompatible_processors.begin(), incompatible_processors.end());
296 }
297}
298
299void BufferProcessingChain::cleanup_rejected_processors(const std::shared_ptr<Buffer>& buffer)
300{
301 auto& processors = m_buffer_processors[buffer];
302
303 processors.erase(
304 std::remove_if(processors.begin(), processors.end(),
305 [this](const std::shared_ptr<BufferProcessor>& processor) {
306 auto processor_token = processor->get_processing_token();
307 return !are_tokens_compatible(m_token_filter_mask, processor_token);
308 }),
309 processors.end());
310
311 m_pending_removal[buffer].clear();
312}
313
314std::vector<TokenCompatibilityReport> BufferProcessingChain::analyze_token_compatibility() const
315{
316 std::vector<TokenCompatibilityReport> reports;
317
318 for (const auto& [buffer, processors] : m_buffer_processors) {
320 report.buffer = buffer;
323
324 for (const auto& processor : processors) {
326 info.processor = processor;
327 info.processor_token = processor->get_processing_token();
331
332 report.processor_infos.push_back(info);
333 }
334
335 reports.push_back(report);
336 }
337
338 return reports;
339}
340
341bool BufferProcessingChain::validate_all_processors(std::vector<std::string>* incompatibility_reasons) const
342{
343 bool all_compatible = true;
344
345 for (const auto& [buffer, processors] : m_buffer_processors) {
346 for (const auto& processor : processors) {
347 auto processor_token = processor->get_processing_token();
348 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
349 all_compatible = false;
350 if (incompatibility_reasons) {
351 incompatibility_reasons->push_back(
352 "Processor with token " + std::to_string(static_cast<uint32_t>(processor_token)) + " incompatible with chain preferred token " + std::to_string(static_cast<uint32_t>(m_token_filter_mask)));
353 }
354 }
355 }
356 }
357
358 return all_compatible;
359}
360
362{
363 for (const auto& [buffer, processors] : m_buffer_processors) {
364 for (auto& processor : processors) {
365 auto processor_token = processor->get_processing_token();
366 if (processor_token != m_token_filter_mask && are_tokens_compatible(m_token_filter_mask, processor_token)) {
367
368 try {
369 processor->set_processing_token(m_token_filter_mask);
370 } catch (const std::exception& e) {
372 std::source_location::current(),
373 "Failed to enforce chain token on processor: " + std::string(e.what()));
374 }
375 }
376 }
377 }
378}
379
380bool BufferProcessingChain::queue_pending_processor_op(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, bool is_addition, std::string* rejection_reason)
381{
382 for (auto& m_pending_op : m_pending_ops) {
383 bool expected = false;
384 if (m_pending_op.active.compare_exchange_strong(
385 expected, true,
386 std::memory_order_acquire,
387 std::memory_order_relaxed)) {
388
389 m_pending_op.processor = processor;
390 m_pending_op.buffer = buffer;
391 m_pending_op.is_addition = is_addition;
392 m_pending_count.fetch_add(1, std::memory_order_relaxed);
393 return true;
394 }
395 }
396
397 if (rejection_reason && is_addition) {
398 *rejection_reason = "Processor operation queue full";
399 }
400 return false;
401}
402
403}
void process_complete(const std::shared_ptr< Buffer > &buffer)
Applies preprocessors, processing chain, post processors and final processors sequentially to a buffe...
void add_final_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Sets a special processor to be applied after the main pipeline.
void process(const std::shared_ptr< Buffer > &buffer)
Applies the transformation pipeline to a buffer with intelligent execution.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_final_processors
Map of buffers to their final processors.
void add_preprocessor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Sets a preprocessor to be applied before the main pipeline.
void add_postprocessor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Sets a postprocessor to be applied after the main pipeline.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_pending_removal
Map of buffers to processors pending removal.
std::vector< TokenCompatibilityReport > analyze_token_compatibility() const
Analyzes token compatibility across all processors in the chain.
ProcessingToken m_token_filter_mask
Preferred processing token for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > m_buffer_processors
Map of buffers to their processor sequences.
void process_non_owning(const std::shared_ptr< Buffer > &buffer)
Internal processing method for non-owning buffer contexts.
void preprocess(const std::shared_ptr< Buffer > &buffer)
Applies the preprocessor to a buffer.
bool validate_all_processors(std::vector< std::string > *incompatibility_reasons=nullptr) const
Validates all processors in the chain against the preferred processing token.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_preprocessors
Map of buffers to their preprocessors.
const std::vector< std::shared_ptr< BufferProcessor > > & get_processors(const std::shared_ptr< Buffer > &buffer) const
Gets all processors in a buffer's transformation pipeline.
bool add_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, std::string *rejection_reason=nullptr)
Adds a processor to the transformation pipeline for a specific buffer.
void remove_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Removes a processor from the pipeline for a specific buffer.
void cleanup_rejected_processors(const std::shared_ptr< Buffer > &buffer)
Validates the processing token against the chain's preferred token.
void merge_chain(const std::shared_ptr< BufferProcessingChain > &other)
Combines another processing pipeline into this one with optimization.
void process_pending_processor_operations()
Process pending processor operations.
void enforce_chain_token_on_processors()
Enforces the chain's preferred processing token on all processors.
TokenEnforcementStrategy m_enforcement_strategy
Token enforcement strategy for this chain.
void remove_processor_direct(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
void postprocess(const std::shared_ptr< Buffer > &buffer)
Applies the postprocessor to a buffer.
bool queue_pending_processor_op(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, bool is_addition, std::string *rejection_reason=nullptr)
void optimize_for_tokens(const std::shared_ptr< Buffer > &buffer)
Optimizes the processing pipeline for improved performance.
PendingProcessorOp m_pending_ops[MAX_PENDING_PROCESSORS]
bool has_processors(const std::shared_ptr< Buffer > &buffer) const
Checks if a buffer has any processors in its pipeline.
void process_final(const std::shared_ptr< Buffer > &buffer)
Applies the final processor to a buffer with guaranteed execution.
bool add_processor_direct(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, std::string *rejection_reason=nullptr)
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_conditional_processors
Map of buffers to processors that are conditionally applied.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_postprocessors
Map of buffers to their postprocessors.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
@ OVERRIDE_SKIP
Allows token overrides but skips processing for incompatible operations.
@ STRICT
Strictly enforces token assignment with no cross-token sharing.
@ OVERRIDE_REJECT
Allows token overrides but rejects incompatible processors from chains.
@ FILTERED
Filters processors through token enumeration, allowing compatible combinations.
@ IGNORE
Ignores token assignments completely, allowing any processing combination.
@ BufferProcessing
Buffer processing (Buffers::BufferManager, processing chains)
@ Buffers
Buffers, Managers, processors and processing chains.
std::shared_ptr< BufferProcessor > processor
Holds information about a processor's compatibility with a buffer's processing token.
std::vector< ProcessorTokenInfo > processor_infos
Holds the results of token compatibility analysis for a buffer processing chain.