MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
BufferProcessingChain.cpp
Go to the documentation of this file.
2
3#include "Buffer.hpp"
4
6
7namespace MayaFlux::Buffers {
8
9bool BufferProcessingChain::add_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason)
10{
11 if (m_is_processing.load(std::memory_order_acquire) || processor->m_active_processing.load(std::memory_order_acquire) > 0) {
12 return queue_pending_processor_op(processor, buffer, true, rejection_reason);
13 }
14
15 return add_processor_direct(processor, buffer, rejection_reason);
16}
17
18bool BufferProcessingChain::add_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, std::string* rejection_reason)
19{
20 auto processor_token = processor->get_processing_token();
21
22 switch (m_enforcement_strategy) {
24 if (processor_token != m_token_filter_mask) {
25 if (rejection_reason) {
26 *rejection_reason = "Processor token (" + std::to_string(static_cast<uint32_t>(processor_token)) + ") does not exactly match chain's preferred token (" + std::to_string(static_cast<uint32_t>(m_token_filter_mask)) + ") in STRICT mode";
27 }
28 return false;
29 }
30 break;
31
33 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
34 if (rejection_reason) {
35 *rejection_reason = "Processor token (" + std::to_string(static_cast<uint32_t>(processor_token)) + ") is not compatible with chain's preferred token (" + std::to_string(static_cast<uint32_t>(m_token_filter_mask)) + ") in FILTERED mode";
36 }
37 return false;
38 }
39 break;
40
42 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
43 m_conditional_processors[buffer].insert(processor);
44 }
45 break;
46
48 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
49 m_pending_removal[buffer].insert(processor);
50 }
51 break;
52
54 break;
55 }
56
57 auto& processors = m_buffer_processors[buffer];
58
59 auto it = std::ranges::find(processors, processor);
60
61 if (it != processors.end()) {
62 if (rejection_reason) {
63 *rejection_reason = "Processor already exists in chain for this buffer";
64 }
65 return false;
66 }
67
68 processors.push_back(processor);
69 processor->on_attach(buffer);
70
71 return true;
72}
73
74void BufferProcessingChain::remove_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
75{
76 if (m_is_processing.load(std::memory_order_acquire)) {
77 queue_pending_processor_op(processor, buffer, false);
78 return;
79 }
80
81 remove_processor_direct(processor, buffer);
82}
83
84void BufferProcessingChain::remove_processor_direct(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
85{
86 auto& processors = m_buffer_processors[buffer];
87 auto it = std::ranges::find(processors, processor);
88
89 if (it != processors.end()) {
90 processor->on_detach(buffer);
91 processors.erase(it);
92
93 m_conditional_processors[buffer].erase(processor);
94 m_pending_removal[buffer].erase(processor);
95 }
96}
97
98void BufferProcessingChain::process_non_owning(const std::shared_ptr<Buffer>& buffer)
99{
100 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
102 }
103
104 auto it = m_buffer_processors.find(buffer);
105 if (it == m_buffer_processors.end() || it->second.empty()) {
106 return;
107 }
108
109 for (auto& processor : it->second) {
110 bool should_process = true;
111
113 auto processor_token = processor->get_processing_token();
114 should_process = are_tokens_compatible(m_token_filter_mask, processor_token);
115 }
116
117 if (should_process) {
118 processor->process_non_owning(buffer); // non-owning calls non-owning
119 }
120 }
121
124 }
125}
126
127void BufferProcessingChain::process(const std::shared_ptr<Buffer>& buffer)
128{
129 bool expected = false;
130 if (!m_is_processing.compare_exchange_strong(expected, true,
131 std::memory_order_acquire, std::memory_order_relaxed)) {
132 return;
133 }
134
135 if (m_pending_count.load(std::memory_order_relaxed) > 0) {
137 }
138
139 auto it = m_buffer_processors.find(buffer);
140 if (it == m_buffer_processors.end() || it->second.empty()) {
141 m_is_processing.store(false, std::memory_order_release);
142 return;
143 }
144
145 for (auto& processor : it->second) {
146 bool should_process = true;
147
149 auto processor_token = processor->get_processing_token();
150 should_process = are_tokens_compatible(m_token_filter_mask, processor_token);
151 }
152
153 if (should_process) {
154 processor->process(buffer);
155 }
156 }
157
160 }
161
162 m_is_processing.store(false, std::memory_order_release);
163}
164
166{
167 for (auto& m_pending_op : m_pending_ops) {
168 if (m_pending_op.active.load(std::memory_order_acquire)) {
169 auto& op = m_pending_op;
170
171 if (op.is_addition) {
172 add_processor_direct(op.processor, op.buffer);
173 } else {
174 remove_processor_direct(op.processor, op.buffer);
175 }
176
177 op.processor.reset();
178 op.buffer.reset();
179 op.active.store(false, std::memory_order_release);
180 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
181 }
182 }
183}
184
185void BufferProcessingChain::add_final_processor(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer)
186{
187 processor->on_attach(buffer);
188 m_final_processors[buffer] = processor;
189}
190
191bool BufferProcessingChain::has_processors(const std::shared_ptr<Buffer>& buffer) const
192{
193 auto it = m_buffer_processors.find(buffer);
194 return it != m_buffer_processors.end() && !it->second.empty();
195}
196
197const std::vector<std::shared_ptr<BufferProcessor>>& BufferProcessingChain::get_processors(const std::shared_ptr<Buffer>& buffer) const
198{
199 static const std::vector<std::shared_ptr<BufferProcessor>> empty_vector;
200
201 auto it = m_buffer_processors.find(buffer);
202 if (it != m_buffer_processors.end()) {
203 return it->second;
204 }
205
206 return empty_vector;
207}
208
209void BufferProcessingChain::process_final(const std::shared_ptr<Buffer>& buffer)
210{
211 auto final_it = m_final_processors.find(buffer);
212 if (final_it != m_final_processors.end()) {
213 final_it->second->process(buffer);
214 }
215}
216
217void BufferProcessingChain::merge_chain(const std::shared_ptr<BufferProcessingChain>& other)
218{
219 for (const auto& [buffer, processors] : other->get_chain()) {
220 if (!m_buffer_processors.count(buffer)) {
221 m_buffer_processors.try_emplace(buffer, processors);
222 } else {
223 auto& target_processors = m_buffer_processors[buffer];
224 target_processors.reserve(target_processors.size() + processors.size());
225
226 for (const auto& processor : processors) {
227 if (std::ranges::find(target_processors, processor) == target_processors.end()) {
228 target_processors.push_back(processor);
229 }
230 }
231 }
232 }
233}
234
235void BufferProcessingChain::optimize_for_tokens(const std::shared_ptr<Buffer>& buffer)
236{
237 auto& processors = m_buffer_processors[buffer];
238 if (processors.empty()) {
239 return;
240 }
241
242 std::vector<std::shared_ptr<BufferProcessor>> compatible_processors;
243 std::vector<std::shared_ptr<BufferProcessor>> incompatible_processors;
244
245 for (auto& processor : processors) {
246 auto processor_token = processor->get_processing_token();
247 if (are_tokens_compatible(m_token_filter_mask, processor_token)) {
248 compatible_processors.push_back(processor);
249 } else {
250 incompatible_processors.push_back(processor);
251 }
252 }
253
254 processors.clear();
255
256 processors.insert(processors.end(), compatible_processors.begin(), compatible_processors.end());
257
259 processors.insert(processors.end(), incompatible_processors.begin(), incompatible_processors.end());
260 }
261}
262
263void BufferProcessingChain::cleanup_rejected_processors(const std::shared_ptr<Buffer>& buffer)
264{
265 auto& processors = m_buffer_processors[buffer];
266
267 processors.erase(
268 std::remove_if(processors.begin(), processors.end(),
269 [this](const std::shared_ptr<BufferProcessor>& processor) {
270 auto processor_token = processor->get_processing_token();
271 return !are_tokens_compatible(m_token_filter_mask, processor_token);
272 }),
273 processors.end());
274
275 m_pending_removal[buffer].clear();
276}
277
278std::vector<TokenCompatibilityReport> BufferProcessingChain::analyze_token_compatibility() const
279{
280 std::vector<TokenCompatibilityReport> reports;
281
282 for (const auto& [buffer, processors] : m_buffer_processors) {
284 report.buffer = buffer;
287
288 for (const auto& processor : processors) {
290 info.processor = processor;
291 info.processor_token = processor->get_processing_token();
295
296 report.processor_infos.push_back(info);
297 }
298
299 reports.push_back(report);
300 }
301
302 return reports;
303}
304
305bool BufferProcessingChain::validate_all_processors(std::vector<std::string>* incompatibility_reasons) const
306{
307 bool all_compatible = true;
308
309 for (const auto& [buffer, processors] : m_buffer_processors) {
310 for (const auto& processor : processors) {
311 auto processor_token = processor->get_processing_token();
312 if (!are_tokens_compatible(m_token_filter_mask, processor_token)) {
313 all_compatible = false;
314 if (incompatibility_reasons) {
315 incompatibility_reasons->push_back(
316 "Processor with token " + std::to_string(static_cast<uint32_t>(processor_token)) + " incompatible with chain preferred token " + std::to_string(static_cast<uint32_t>(m_token_filter_mask)));
317 }
318 }
319 }
320 }
321
322 return all_compatible;
323}
324
326{
327 for (const auto& [buffer, processors] : m_buffer_processors) {
328 for (auto& processor : processors) {
329 auto processor_token = processor->get_processing_token();
330 if (processor_token != m_token_filter_mask && are_tokens_compatible(m_token_filter_mask, processor_token)) {
331
332 // Try to set the processor to the chain's preferred token
333 try {
334 processor->set_processing_token(m_token_filter_mask);
335 } catch (const std::exception& e) {
337 std::source_location::current(),
338 "Failed to enforce chain token on processor: " + std::string(e.what()));
339 }
340 }
341 }
342 }
343}
344
345bool BufferProcessingChain::queue_pending_processor_op(const std::shared_ptr<BufferProcessor>& processor, const std::shared_ptr<Buffer>& buffer, bool is_addition, std::string* rejection_reason)
346{
347 for (auto& m_pending_op : m_pending_ops) {
348 bool expected = false;
349 if (m_pending_op.active.compare_exchange_strong(
350 expected, true,
351 std::memory_order_acquire,
352 std::memory_order_relaxed)) {
353
354 m_pending_op.processor = processor;
355 m_pending_op.buffer = buffer;
356 m_pending_op.is_addition = is_addition;
357 m_pending_count.fetch_add(1, std::memory_order_relaxed);
358 return true;
359 }
360 }
361
362 // Queue full - drop operation (true lock-free behavior)
363 if (rejection_reason && is_addition) {
364 *rejection_reason = "Processor operation queue full";
365 }
366 return false;
367}
368
369}
void add_final_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Sets a special processor to be applied after the main pipeline.
void process(const std::shared_ptr< Buffer > &buffer)
Applies the transformation pipeline to a buffer with intelligent execution.
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_final_processors
Map of buffers to their final processors.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_pending_removal
Map of buffers to processors pending removal.
std::vector< TokenCompatibilityReport > analyze_token_compatibility() const
Analyzes token compatibility across all processors in the chain.
ProcessingToken m_token_filter_mask
Preferred processing token for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > m_buffer_processors
Map of buffers to their processor sequences.
void process_non_owning(const std::shared_ptr< Buffer > &buffer)
Internal processing method for non-owning buffer contexts.
bool validate_all_processors(std::vector< std::string > *incompatibility_reasons=nullptr) const
Validates all processors in the chain against the preferred processing token.
const std::vector< std::shared_ptr< BufferProcessor > > & get_processors(const std::shared_ptr< Buffer > &buffer) const
Gets all processors in a buffer's transformation pipeline.
bool add_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, std::string *rejection_reason=nullptr)
Adds a processor to the transformation pipeline for a specific buffer.
void remove_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Removes a processor from the pipeline for a specific buffer.
void cleanup_rejected_processors(const std::shared_ptr< Buffer > &buffer)
Validates the processing token against the chain's preferred token.
void merge_chain(const std::shared_ptr< BufferProcessingChain > &other)
Combines another processing pipeline into this one with optimization.
void process_pending_processor_operations()
Process pending processor operations.
void enforce_chain_token_on_processors()
Enforces the chain's preferred processing token on all processors.
TokenEnforcementStrategy m_enforcement_strategy
Token enforcement strategy for this chain.
void remove_processor_direct(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
bool queue_pending_processor_op(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, bool is_addition, std::string *rejection_reason=nullptr)
void optimize_for_tokens(const std::shared_ptr< Buffer > &buffer)
Optimizes the processing pipeline for improved performance.
PendingProcessorOp m_pending_ops[MAX_PENDING_PROCESSORS]
bool has_processors(const std::shared_ptr< Buffer > &buffer) const
Checks if a buffer has any processors in its pipeline.
void process_final(const std::shared_ptr< Buffer > &buffer)
Applies the final processor to a buffer with guaranteed execution.
bool add_processor_direct(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, std::string *rejection_reason=nullptr)
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_conditional_processors
Map of buffers to processors that are conditionally applied.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
@ OVERRIDE_SKIP
Allows token overrides but skips processing for incompatible operations.
@ STRICT
Strictly enforces token assignment with no cross-token sharing.
@ OVERRIDE_REJECT
Allows token overrides but rejects incompatible processors from chains.
@ FILTERED
Filters processors through token enumeration, allowing compatible combinations.
@ IGNORE
Ignores token assignments completely, allowing any processing combination.
@ BufferProcessing
Buffer processing (Buffers::BufferManager, processing chains)
@ Buffers
Buffers, Managers, processors and processing chains.
std::shared_ptr< BufferProcessor > processor
Holds information about a processor's compatibility with a buffer's processing token.
std::vector< ProcessorTokenInfo > processor_infos
Holds the results of token compatibility analysis for a buffer processing chain.