11 if (
m_is_processing.load(std::memory_order_acquire) || processor->m_active_processing.load(std::memory_order_acquire) > 0) {
20 auto processor_token = processor->get_processing_token();
25 if (rejection_reason) {
26 *rejection_reason =
"Processor token (" + std::to_string(
static_cast<uint32_t
>(processor_token)) +
") does not exactly match chain's preferred token (" + std::to_string(
static_cast<uint32_t
>(
m_token_filter_mask)) +
") in STRICT mode";
34 if (rejection_reason) {
35 *rejection_reason =
"Processor token (" + std::to_string(
static_cast<uint32_t
>(processor_token)) +
") is not compatible with chain's preferred token (" + std::to_string(
static_cast<uint32_t
>(
m_token_filter_mask)) +
") in FILTERED mode";
59 auto it = std::ranges::find(processors, processor);
61 if (it != processors.end()) {
62 if (rejection_reason) {
63 *rejection_reason =
"Processor already exists in chain for this buffer";
68 processors.push_back(processor);
69 processor->on_attach(buffer);
87 auto it = std::ranges::find(processors, processor);
89 if (it != processors.end()) {
90 processor->on_detach(buffer);
109 for (
auto& processor : it->second) {
110 bool should_process =
true;
113 auto processor_token = processor->get_processing_token();
117 if (should_process) {
118 processor->process_non_owning(buffer);
129 bool expected =
false;
131 std::memory_order_acquire, std::memory_order_relaxed)) {
145 for (
auto& processor : it->second) {
146 bool should_process =
true;
149 auto processor_token = processor->get_processing_token();
153 if (should_process) {
154 processor->process(buffer);
168 if (m_pending_op.active.load(std::memory_order_acquire)) {
169 auto& op = m_pending_op;
171 if (op.is_addition) {
177 op.processor.reset();
179 op.active.store(
false, std::memory_order_release);
187 processor->on_attach(buffer);
199 static const std::vector<std::shared_ptr<BufferProcessor>> empty_vector;
213 final_it->second->process(buffer);
219 for (
const auto& [buffer, processors] : other->get_chain()) {
224 target_processors.reserve(target_processors.size() + processors.size());
226 for (
const auto& processor : processors) {
227 if (std::ranges::find(target_processors, processor) == target_processors.end()) {
228 target_processors.push_back(processor);
238 if (processors.empty()) {
242 std::vector<std::shared_ptr<BufferProcessor>> compatible_processors;
243 std::vector<std::shared_ptr<BufferProcessor>> incompatible_processors;
245 for (
auto& processor : processors) {
246 auto processor_token = processor->get_processing_token();
248 compatible_processors.push_back(processor);
250 incompatible_processors.push_back(processor);
256 processors.insert(processors.end(), compatible_processors.begin(), compatible_processors.end());
259 processors.insert(processors.end(), incompatible_processors.begin(), incompatible_processors.end());
268 std::remove_if(processors.begin(), processors.end(),
269 [
this](
const std::shared_ptr<BufferProcessor>& processor) {
270 auto processor_token = processor->get_processing_token();
271 return !are_tokens_compatible(m_token_filter_mask, processor_token);
280 std::vector<TokenCompatibilityReport> reports;
288 for (
const auto& processor : processors) {
299 reports.push_back(report);
307 bool all_compatible =
true;
310 for (
const auto& processor : processors) {
311 auto processor_token = processor->get_processing_token();
313 all_compatible =
false;
314 if (incompatibility_reasons) {
315 incompatibility_reasons->push_back(
316 "Processor with token " + std::to_string(
static_cast<uint32_t
>(processor_token)) +
" incompatible with chain preferred token " + std::to_string(
static_cast<uint32_t
>(
m_token_filter_mask)));
322 return all_compatible;
328 for (
auto& processor : processors) {
329 auto processor_token = processor->get_processing_token();
335 }
catch (
const std::exception& e) {
337 std::source_location::current(),
338 "Failed to enforce chain token on processor: " + std::string(e.what()));
348 bool expected =
false;
349 if (m_pending_op.active.compare_exchange_strong(
351 std::memory_order_acquire,
352 std::memory_order_relaxed)) {
354 m_pending_op.processor = processor;
355 m_pending_op.buffer = buffer;
356 m_pending_op.is_addition = is_addition;
363 if (rejection_reason && is_addition) {
364 *rejection_reason =
"Processor operation queue full";
void add_final_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Sets a special processor to be applied after the main pipeline.
void process(const std::shared_ptr< Buffer > &buffer)
Applies the transformation pipeline to a buffer with intelligent execution.
std::atomic< bool > m_is_processing
std::unordered_map< std::shared_ptr< Buffer >, std::shared_ptr< BufferProcessor > > m_final_processors
Map of buffers to their final processors.
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_pending_removal
Map of buffers to processors pending removal.
std::vector< TokenCompatibilityReport > analyze_token_compatibility() const
Analyzes token compatibility across all processors in the chain.
std::atomic< uint32_t > m_pending_count
ProcessingToken m_token_filter_mask
Preferred processing token for this chain.
std::unordered_map< std::shared_ptr< Buffer >, std::vector< std::shared_ptr< BufferProcessor > > > m_buffer_processors
Map of buffers to their processor sequences.
void process_non_owning(const std::shared_ptr< Buffer > &buffer)
Internal processing method for non-owning buffer contexts.
bool validate_all_processors(std::vector< std::string > *incompatibility_reasons=nullptr) const
Validates all processors in the chain against the preferred processing token.
const std::vector< std::shared_ptr< BufferProcessor > > & get_processors(const std::shared_ptr< Buffer > &buffer) const
Gets all processors in a buffer's transformation pipeline.
bool add_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, std::string *rejection_reason=nullptr)
Adds a processor to the transformation pipeline for a specific buffer.
void remove_processor(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
Removes a processor from the pipeline for a specific buffer.
void cleanup_rejected_processors(const std::shared_ptr< Buffer > &buffer)
Validates the processing token against the chain's preferred token.
void merge_chain(const std::shared_ptr< BufferProcessingChain > &other)
Combines another processing pipeline into this one with optimization.
void process_pending_processor_operations()
Process pending processor operations.
void enforce_chain_token_on_processors()
Enforces the chain's preferred processing token on all processors.
TokenEnforcementStrategy m_enforcement_strategy
Token enforcement strategy for this chain.
void remove_processor_direct(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer)
bool queue_pending_processor_op(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, bool is_addition, std::string *rejection_reason=nullptr)
void optimize_for_tokens(const std::shared_ptr< Buffer > &buffer)
Optimizes the processing pipeline for improved performance.
PendingProcessorOp m_pending_ops[MAX_PENDING_PROCESSORS]
bool has_processors(const std::shared_ptr< Buffer > &buffer) const
Checks if a buffer has any processors in its pipeline.
void process_final(const std::shared_ptr< Buffer > &buffer)
Applies the final processor to a buffer with guaranteed execution.
bool add_processor_direct(const std::shared_ptr< BufferProcessor > &processor, const std::shared_ptr< Buffer > &buffer, std::string *rejection_reason=nullptr)
std::unordered_map< std::shared_ptr< Buffer >, std::unordered_set< std::shared_ptr< BufferProcessor > > > m_conditional_processors
Map of buffers to processors that are conditionally applied.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
@ OVERRIDE_SKIP
Allows token overrides but skips processing for incompatible operations.
@ STRICT
Strictly enforces token assignment with no cross-token sharing.
@ OVERRIDE_REJECT
Allows token overrides but rejects incompatible processors from chains.
@ FILTERED
Filters processors through token enumeration, allowing compatible combinations.
@ IGNORE
Ignores token assignments completely, allowing any processing combination.
@ BufferProcessing
Buffer processing (Buffers::BufferManager, processing chains)
@ Buffers
Buffers, Managers, processors and processing chains.
std::shared_ptr< BufferProcessor > processor
ProcessingToken processor_token
Holds information about a processor's compatibility with a buffer's processing token.
std::shared_ptr< Buffer > buffer
TokenEnforcementStrategy enforcement_strategy
ProcessingToken chain_preferred_token
std::vector< ProcessorTokenInfo > processor_infos
Holds the results of token compatibility analysis for a buffer processing chain.