7template <
typename BufferType>
10 using BufferType::BufferType;
24 if (this->is_processing()) {
25 for (
auto& m_pending_op : m_pending_ops) {
26 bool expected =
false;
27 if (m_pending_op.active.compare_exchange_strong(expected,
true, std::memory_order_acquire, std::memory_order_relaxed)) {
28 m_pending_op.buffer = buffer;
29 m_pending_op.is_addition =
true;
30 m_pending_count.fetch_add(1, std::memory_order_relaxed);
37 add_child_buffer_direct(buffer);
49 virtual bool try_add_child_buffer(std::shared_ptr<BufferType> buffer, std::string* rejection_reason =
nullptr)
51 if (!is_buffer_acceptable(buffer, rejection_reason)) {
55 add_child_buffer(buffer);
65 if (this->is_processing()) {
66 for (
auto& m_pending_op : m_pending_ops) {
67 bool expected =
false;
68 if (m_pending_op.active.compare_exchange_strong(expected,
true, std::memory_order_acquire, std::memory_order_relaxed)) {
69 m_pending_op.buffer = buffer;
70 m_pending_op.is_addition =
false;
71 m_pending_count.fetch_add(1, std::memory_order_relaxed);
79 remove_child_buffer_direct(buffer);
94 return m_child_buffers;
106 for (
auto& child : m_child_buffers) {
169 auto default_processor = buffer->get_default_processor();
170 if (!default_processor) {
174 ProcessingToken child_token = default_processor->get_processing_token();
176 switch (m_token_enforcement_strategy) {
177 case TokenEnforcementStrategy::STRICT:
178 if (child_token != m_preferred_processing_token) {
179 if (rejection_reason) {
180 *rejection_reason =
"Child buffer's default processor token does not match preferred processing token (STRICT mode)";
186 case TokenEnforcementStrategy::FILTERED:
188 if (rejection_reason) {
189 *rejection_reason =
"Child buffer's default processor token is not compatible with preferred processing token (FILTERED mode)";
195 case TokenEnforcementStrategy::OVERRIDE_SKIP:
197 if (rejection_reason) {
198 *rejection_reason =
"Child buffer token is incompatible but will be conditionally processed (OVERRIDE_SKIP mode)";
203 case TokenEnforcementStrategy::OVERRIDE_REJECT:
205 if (rejection_reason) {
206 *rejection_reason =
"Child buffer token is incompatible and will be removed later (OVERRIDE_REJECT mode)";
211 case TokenEnforcementStrategy::IGNORE:
220 return m_pending_count.load(std::memory_order_relaxed) > 0;
226 std::string rejection_reason;
227 if (!is_buffer_acceptable(buffer, &rejection_reason)) {
228 throw std::runtime_error(
"Cannot add child buffer: " + rejection_reason);
231 m_child_buffers.push_back(buffer);
233 if (!buffer->get_processing_chain() && this->get_processing_chain()) {
234 buffer->set_processing_chain(this->get_processing_chain());
240 auto it = std::find(m_child_buffers.begin(), m_child_buffers.end(), buffer);
241 if (it != m_child_buffers.end()) {
242 m_child_buffers.erase(it);
251 for (
auto& m_pending_op : m_pending_ops) {
252 if (m_pending_op.active.load(std::memory_order_acquire)) {
253 auto& op = m_pending_op;
255 if (op.is_addition) {
256 add_child_buffer_direct(op.buffer);
258 remove_child_buffer_direct(op.buffer);
263 op.active.store(
false, std::memory_order_release);
264 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
310 std::atomic<uint32_t> m_pending_count { 0 };
312 static constexpr size_t MAX_PENDING = 64;
321 std::atomic<bool> active {
false };
323 bool is_addition {
true };
324 } m_pending_ops[MAX_PENDING];
virtual void set_processing_rate_hint(uint32_t tick_rate)
Sets processing rate hint for the buffer.
bool m_cross_modal_sharing
Whether this buffer allows cross-modal data sharing.
const std::vector< std::shared_ptr< BufferType > > & get_child_buffers() const
Gets all tributary buffers in the aggregation hierarchy.
virtual void enable_cross_modal_sharing(bool enabled)
Enables cross-modal data sharing.
void process_pending_buffer_operations()
Process pending operations - call this at start of processing cycles.
virtual uint32_t get_processing_rate_hint() const
Gets the processing rate hint.
bool has_pending_operations() const
virtual bool is_token_active() const =0
Checks if the buffer is active for its assigned token.
void clear() override
Resets all data values in this buffer and its tributaries.
virtual void add_child_buffer(std::shared_ptr< BufferType > buffer)
Adds a tributary buffer to this root buffer.
void remove_child_buffer_direct(std::shared_ptr< BufferType > buffer)
virtual void remove_child_buffer(std::shared_ptr< BufferType > buffer)
Removes a tributary buffer from this root buffer.
uint32_t m_processing_rate_hint
Processing rate hint for this buffer.
bool is_buffer_acceptable(std::shared_ptr< BufferType > buffer, std::string *rejection_reason=nullptr) const
Validates if a buffer is acceptable based on current token enforcement strategy.
ProcessingToken m_preferred_processing_token
Preferred processing token for this root buffer.
void add_child_buffer_direct(std::shared_ptr< BufferType > buffer)
std::vector< std::shared_ptr< BufferType > > m_child_buffers
Vector of tributary buffers that contribute to this root buffer.
size_t get_num_children() const
Gets the number of tributary buffers in the aggregation hierarchy.
virtual void set_token_active(bool active)=0
Activates/deactivates processing for the current token.
virtual bool is_cross_modal_sharing_enabled() const
Checks if cross-modal sharing is enabled.
virtual bool try_add_child_buffer(std::shared_ptr< BufferType > buffer, std::string *rejection_reason=nullptr)
Attempts to add a child buffer without throwing exceptions.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
TokenEnforcementStrategy
Defines how strictly processing token requirements are enforced in buffer processing chains.
std::shared_ptr< BufferType > buffer
Structure for storing pending buffer add/remove operations.