MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferProcessingChain.cpp
Go to the documentation of this file.
2
3#include "Buffer.hpp"
4
6
7namespace MayaFlux::Buffers {
8
9bool BufferProcessingChain::add_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason)
10{
11 if (m_is_processing.load(std::memory_order_acquire) || processor->m_active_processing.load(std::memory_order_acquire) > 0) {
12 return queue_pending_processor_op(processor, buffer, true, rejection_reason);
13 }
14
15 return add_processor_direct(processor, buffer, rejection_reason);
16}
17
18bool BufferProcessingChain::add_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason)
19{
20 auto processor_token = processor->get_processing_token();
21
22 switch (m_enforcement_strategy) {
24 if (processor_token != m_token_filter_mask) {
25 if (rejection_reason) {
26 *rejection_reason = "Processor token (" + std::to_string(static_cast<uint32_t>(processor_token)) + ") does not exactly match chain's preferred token (" + std::to_string(static_cast<uint32_t>(m_token_filter_mask)) + ") in STRICT mode";
27 }
28 return false;
29 }
30 break;
31
33 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
34 if (rejection_reason) {
35 *rejection_reason = "Processor token (" + std::to_string(static_cast<uint32_t>(processor_token)) + ") is not compatible with chain's preferred token (" + std::to_string(static_cast<uint32_t>(m_token_filter_mask)) + ") in FILTERED mode";
36 }
37 return false;
38 }
39 break;
40
42 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
43 m_conditional_processors[buffer].insert(processor);
44 }
45 break;
46
48 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
49 m_pending_removal[buffer].insert(processor);
50 }
51 break;
52
54 break;
55 }
56
57 auto& processors = m_buffer_processors[buffer];
58
59 auto it = std::ranges::find(processors, processor);
60
61 if (it != processors.end()) {
62 if (rejection_reason) {
63 *rejection_reason = "Processor already exists in chain for this buffer";
64 }
65 return false;
66 }
67
68 processors.push_back(processor);
69 processor->on_attach(buffer);
70
71 return true;
72}
73
74void BufferProcessingChain::remove_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
75{
76 if (m_is_processing.load(std::memory_order_acquire)) {
77 queue_pending_processor_op(processor, buffer, false);
78 return;
79 }
80
81 remove_processor_direct(processor, buffer);
82}
83
84void BufferProcessingChain::remove_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
85{
86 auto& processors = m_buffer_processors[buffer];
87 auto it = std::ranges::find(processors, processor);
88
89 if (it != processors.end()) {
90 processor->on_detach(buffer);
91 processors.erase(it);
92
93 m_conditional_processors[buffer].erase(processor);
94 m_pending_removal[buffer].erase(processor);
95 }
96}
97
98void BufferProcessingChain::process_non_owning(const std::shared_ptr<Buffer>& buffer)
99{
100 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
102 }
103
104 auto it = m_buffer_processors.find(buffer);
105 if (it == m_buffer_processors.end() || it->second.empty()) {
106 return;
107 }
108
109 for (auto& processor : it->second) {
110 bool should_process = true;
111
113 auto processor_token = processor->get_processing_token();
114 should_process = are_tokens_compatible(m_token_filter_mask, processor_token);
115 }
116
117 if (should_process) {
118 processor->process_non_owning(buffer);
119 }
120 }
121
124 }
125}
126
127void BufferProcessingChain::process(const std::shared_ptr<Buffer>& buffer)
128{
129 bool expected = false;
130 if (!m_is_processing.compare_exchange_strong(expected, true,
131 std::memory_order_acquire, std::memory_order_relaxed)) {
132 return;
133 }
134
135 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
137 }
138
139 auto it = m_buffer_processors.find(buffer);
140 if (it == m_buffer_processors.end() || it->second.empty()) {
141 m_is_processing.store(false, std::memory_order_release);
142 return;
143 }
144
145 for (auto& processor : it->second) {
146 bool should_process = true;
147
149 auto processor_token = processor->get_processing_token();
150 should_process = are_tokens_compatible(m_token_filter_mask, processor_token);
151 }
152
153 if (should_process) {
154 processor->process(buffer);
155 }
156 }
157
160 }
161
162 m_is_processing.store(false, std::memory_order_release);
163}
164
165void BufferProcessingChain::process_complete(const std::shared_ptr<Buffer>& buffer)
166{
167 preprocess(buffer);
168 process(buffer);
169 postprocess(buffer);
170 process_final(buffer);
171}
172
174{
175 for (auto& m_pending_op : m_pending_ops) {
176 if (m_pending_op.active.load(std::memory_order_acquire)) {
177 auto& op = m_pending_op;
178
179 if (op.is_addition) {
180 add_processor_direct(op.processor, op.buffer);
181 } else {
182 remove_processor_direct(op.processor, op.buffer);
183 }
184
185 op.processor.reset();
186 op.buffer.reset();
187 op.active.store(false, std::memory_order_release);
188 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
189 }
190 }
191}
192
193void BufferProcessingChain::add_preprocessor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
194{
195 processor->on_attach(buffer);
196 m_preprocessors[buffer] = processor;
197}
198
199void BufferProcessingChain::add_postprocessor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
200{
201 processor->on_attach(buffer);
202 m_postprocessors[buffer] = processor;
203}
204
205void BufferProcessingChain::add_final_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
206{
207 processor->on_attach(buffer);
208 m_final_processors[buffer] = processor;
209}
210
211bool BufferProcessingChain::has_processors(const std::shared_ptr<Buffer>& buffer) const
212{
213 auto it = m_buffer_processors.find(buffer);
214 return it != m_buffer_processors.end() && !it->second.empty();
215}
216
217const std::vector<std::shared_ptr<BufferProcessor>>& BufferProcessingChain::get_processors(const std::shared_ptr<Buffer>& buffer) const
218{
219 static const std::vector<std::shared_ptr<BufferProcessor>> empty_vector;
220
221 auto it = m_buffer_processors.find(buffer);
222 if (it != m_buffer_processors.end()) {
223 return it->second;
224 }
225
226 return empty_vector;
227}
228
229std::shared_ptr<BufferProcessor>
230BufferProcessingChain::get_preprocessor(const std::shared_ptr<Buffer>& buffer) const
231{
232 auto it = m_preprocessors.find(buffer);
233 return it != m_preprocessors.end() ? it->second : nullptr;
234}
235
236std::shared_ptr<BufferProcessor>
237BufferProcessingChain::get_postprocessor(const std::shared_ptr<Buffer>& buffer) const
238{
239 auto it = m_postprocessors.find(buffer);
240 return it != m_postprocessors.end() ? it->second : nullptr;
241}
242
243std::shared_ptr<BufferProcessor>
244BufferProcessingChain::get_final_processor(const std::shared_ptr<Buffer>& buffer) const
245{
246 auto it = m_final_processors.find(buffer);
247 return it != m_final_processors.end() ? it->second : nullptr;
248}
249
250void BufferProcessingChain::preprocess(const std::shared_ptr<Buffer>& buffer)
251{
252 auto pre_it = m_preprocessors.find(buffer);
253 if (pre_it != m_preprocessors.end()) {
254 pre_it->second->process(buffer);
255 }
256}
257
258void BufferProcessingChain::postprocess(const std::shared_ptr<Buffer>& buffer)
259{
260 auto post_it = m_postprocessors.find(buffer);
261 if (post_it != m_postprocessors.end()) {
262 post_it->second->process(buffer);
263 }
264}
265
266void BufferProcessingChain::process_final(const std::shared_ptr<Buffer>& buffer)
267{
268 auto final_it = m_final_processors.find(buffer);
269 if (final_it != m_final_processors.end()) {
270 final_it->second->process(buffer);
271 }
272}
273
274void BufferProcessingChain::merge_chain(const std::shared_ptr<BufferProcessingChain>& other)
275{
276 for (const auto& [buffer, processors] : other->get_chain()) {
277 if (!m_buffer_processors.count(buffer)) {
278 m_buffer_processors.try_emplace(buffer, processors);
279 } else {
280 auto& target_processors = m_buffer_processors[buffer];
281 target_processors.reserve(target_processors.size() + processors.size());
282
283 for (const auto& processor : processors) {
284 if (std::ranges::find(target_processors, processor) == target_processors.end()) {
285 target_processors.push_back(processor);
286 }
287 }
288 }
289 }
290}
291
292void BufferProcessingChain::optimize_for_tokens(const std::shared_ptr<Buffer>& buffer)
293{
294 auto& processors = m_buffer_processors[buffer];
295 if (processors.empty()) {
296 return;
297 }
298
299 std::vector<std::shared_ptr<BufferProcessor>> compatible_processors;
300 std::vector<std::shared_ptr<BufferProcessor>> incompatible_processors;
301
302 for (auto& processor : processors) {
303 auto processor_token = processor->get_processing_token();
304 if (are_tokens_compatible(m_token_filter_mask, processor_token)) {
305 compatible_processors.push_back(processor);
306 } else {
307 incompatible_processors.push_back(processor);
308 }
309 }
310
311 processors.clear();
312
313 processors.insert(processors.end(), compatible_processors.begin(), compatible_processors.end());
314
316 processors.insert(processors.end(), incompatible_processors.begin(), incompatible_processors.end());
317 }
318}
319
320void BufferProcessingChain::cleanup_rejected_processors(const std::shared_ptr<Buffer>& buffer)
321{
322 auto& processors = m_buffer_processors[buffer];
323
324 std::erase_if(processors, [this](const std::shared_ptr<BufferProcessor>& processor) {
325 auto processor_token = processor->get_processing_token();
326 return !are_tokens_compatible(m_token_filter_mask, processor_token);
327 });
328
329 m_pending_removal[buffer].clear();
330}
331
332std::vector<TokenCompatibilityReport> BufferProcessingChain::analyze_token_compatibility() const
333{
334 std::vector<TokenCompatibilityReport> reports;
335
336 for (const auto& [buffer, processors] : m_buffer_processors) {
338 report.buffer = buffer;
341
342 for (const auto& processor : processors) {
344 info.processor = processor;
345 info.processor_token = processor->get_processing_token();
349
350 report.processor_infos.push_back(info);
351 }
352
353 reports.push_back(report);
354 }
355
356 return reports;
357}
358
359bool BufferProcessingChain::validate_all_processors(std::vector<std::string>* incompatibility_reasons) const
360{
361 bool all_compatible = true;
362
363 for (const auto& [buffer, processors] : m_buffer_processors) {
364 for (const auto& processor : processors) {
365 auto processor_token = processor->get_processing_token();
366 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
367 all_compatible = false;
368 if (incompatibility_reasons) {
369 incompatibility_reasons->push_back(
370 "Processor with token " + std::to_string(static_cast<uint32_t>(processor_token)) + " incompatible with chain preferred token " + std::to_string(static_cast<uint32_t>(m_token_filter_mask)));
371 }
372 }
373 }
374 }
375
376 return all_compatible;
377}
378
380{
381 for (const auto& [buffer, processors] : m_buffer_processors) {
382 for (auto& processor : processors) {
383 auto processor_token = processor->get_processing_token();
384 if (processor_token != m_token_filter_mask && are_tokens_compatible(m_token_filter_mask, processor_token)) {
385
386 try {
387 processor->set_processing_token(m_token_filter_mask);
388 } catch (const std::exception& e) {
390 std::source_location::current(),
391 "Failed to enforce chain token on processor: " + std::string(e.what()));
392 }
393 }
394 }
395 }
396}
397
398bool BufferProcessingChain::queue_pending_processor_op(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, bool is_addition, std::string* rejection_reason)
399{
400 for (auto& m_pending_op : m_pending_ops) {
401 bool expected = false;
402 if (m_pending_op.active.compare_exchange_strong(
403 expected, true,
404 std::memory_order_acquire,
405 std::memory_order_relaxed)) {
406
407 m_pending_op.processor = processor;
408 m_pending_op.buffer = buffer;
409 m_pending_op.is_addition = is_addition;
410 m_pending_count.fetch_add(1, std::memory_order_relaxed);
411 return true;
412 }
413 }
414
415 if (rejection_reason && is_addition) {
416 *rejection_reason = "Processor operation queue full";
417 }
418 return false;
419}
420
421}
std::shared_ptr< BufferProcessor > get_preprocessor(const std::shared_ptr< Buffer > &buffer) const
Returns the preprocessor for a buffer, or nullptr if none is set.
void process_complete(const std::shared_ptr< Buffer > &buffer)
Applies preprocessors, processing chain, post processors and final processors sequentially to a buffe...
void add_final_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Sets a special processor to be applied after the main pipeline.
void process(const std::shared_ptr< Buffer > &buffer)
Applies the transformation pipeline to a buffer with intelligent execution.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_final_processors
Map of buffers to their final processors.
void add_preprocessor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Sets a preprocessor to be applied before the main pipeline.
void add_postprocessor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Sets a postprocessor to be applied after the main pipeline.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_pending_removal
Map of buffers to processors pending removal.
std::vector< TokenCompatibilityReport > analyze_token_compatibility() const
Analyzes token compatibility across all processors in the chain.
ProcessingToken m_token_filter_mask
Preferred processing token for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > m_buffer_processors
Map of buffers to their processor sequences.
void process_non_owning(const std::shared_ptr< Buffer > &buffer)
Internal processing method for non-owning buffer contexts.
void preprocess(const std::shared_ptr< Buffer > &buffer)
Applies the preprocessor to a buffer.
std::shared_ptr< BufferProcessor > get_postprocessor(const std::shared_ptr< Buffer > &buffer) const
Returns the postprocessor for a buffer, or nullptr if none is set.
bool validate_all_processors(std::vector< std::string > *incompatibility_reasons=nullptr) const
Validates all processors in the chain against the preferred processing token.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_preprocessors
Map of buffers to their preprocessors.
const std::vector< std::shared_ptr< BufferProcessor > > & get_processors(const std::shared_ptr< Buffer > &buffer) const
Gets all processors in a buffer's transformation pipeline.
bool add_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, std::string *rejection_reason=nullptr)
Adds a processor to the transformation pipeline for a specific buffer.
void remove_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Removes a processor from the pipeline for a specific buffer.
void cleanup_rejected_processors(const std::shared_ptr< Buffer > &buffer)
Validates the processing token against the chain's preferred token.
void merge_chain(const std::shared_ptr< BufferProcessingChain > &other)
Combines another processing pipeline into this one with optimization.
void process_pending_processor_operations()
Process pending processor operations.
void enforce_chain_token_on_processors()
Enforces the chain's preferred processing token on all processors.
std::shared_ptr< BufferProcessor > get_final_processor(const std::shared_ptr< Buffer > &buffer) const
Returns the final processor for a buffer, or nullptr if none is set.
TokenEnforcementStrategy m_enforcement_strategy
Token enforcement strategy for this chain.
void remove_processor_direct(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
void postprocess(const std::shared_ptr< Buffer > &buffer)
Applies the postprocessor to a buffer.
bool queue_pending_processor_op(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, bool is_addition, std::string *rejection_reason=nullptr)
void optimize_for_tokens(const std::shared_ptr< Buffer > &buffer)
Optimizes the processing pipeline for improved performance.
PendingProcessorOp m_pending_ops[MAX_PENDING_PROCESSORS]
bool has_processors(const std::shared_ptr< Buffer > &buffer) const
Checks if a buffer has any processors in its pipeline.
void process_final(const std::shared_ptr< Buffer > &buffer)
Applies the final processor to a buffer with guaranteed execution.
bool add_processor_direct(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, std::string *rejection_reason=nullptr)
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_conditional_processors
Map of buffers to processors that are conditionally applied.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_postprocessors
Map of buffers to their postprocessors.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
@ OVERRIDE_SKIP
Allows token overrides but skips processing for incompatible operations.
@ STRICT
Strictly enforces token assignment with no cross-token sharing.
@ OVERRIDE_REJECT
Allows token overrides but rejects incompatible processors from chains.
@ FILTERED
Filters processors through token enumeration, allowing compatible combinations.
@ IGNORE
Ignores token assignments completely, allowing any processing combination.
@ BufferProcessing
Buffer processing (Buffers::BufferManager, processing chains)
@ Buffers
Buffers, Managers, processors and processing chains.
std::shared_ptr< BufferProcessor > processor
Holds information about a processor's compatibility with a buffer's processing token.
std::vector< ProcessorTokenInfo > processor_infos
Holds the results of token compatibility analysis for a buffer processing chain.