7template <
typename BufferType>
10 using BufferType::BufferType;
24 if (this->is_processing()) {
25 for (
auto& m_pending_op : m_pending_ops) {
26 bool expected =
false;
27 if (m_pending_op.active.compare_exchange_strong(expected,
true, std::memory_order_acquire, std::memory_order_relaxed)) {
28 m_pending_op.buffer = buffer;
29 m_pending_op.is_addition =
true;
30 m_pending_count.fetch_add(1, std::memory_order_relaxed);
37 add_child_buffer_direct(buffer);
49 virtual bool try_add_child_buffer(
const std::shared_ptr<BufferType>& buffer, std::string* rejection_reason =
nullptr)
51 if (!is_buffer_acceptable(buffer, rejection_reason)) {
55 add_child_buffer(buffer);
65 if (this->is_processing()) {
66 for (
auto& m_pending_op : m_pending_ops) {
67 bool expected =
false;
68 if (m_pending_op.active.compare_exchange_strong(expected,
true, std::memory_order_acquire, std::memory_order_relaxed)) {
69 m_pending_op.buffer = buffer;
70 m_pending_op.is_addition =
false;
71 m_pending_count.fetch_add(1, std::memory_order_relaxed);
79 remove_child_buffer_direct(buffer);
94 return m_child_buffers;
106 for (
auto& child : m_child_buffers) {
167 bool is_buffer_acceptable(
const std::shared_ptr<BufferType>& buffer, std::string* rejection_reason =
nullptr)
const
169 auto default_processor = buffer->get_default_processor();
170 if (!default_processor) {
174 ProcessingToken child_token = default_processor->get_processing_token();
176 switch (m_token_enforcement_strategy) {
177 case TokenEnforcementStrategy::STRICT:
178 if (child_token != m_preferred_processing_token) {
179 if (rejection_reason) {
180 *rejection_reason =
"Child buffer's default processor token does not match preferred processing token (STRICT mode)";
186 case TokenEnforcementStrategy::FILTERED:
188 if (rejection_reason) {
189 *rejection_reason =
"Child buffer's default processor token is not compatible with preferred processing token (FILTERED mode)";
195 case TokenEnforcementStrategy::OVERRIDE_SKIP:
197 if (rejection_reason) {
198 *rejection_reason =
"Child buffer token is incompatible but will be conditionally processed (OVERRIDE_SKIP mode)";
203 case TokenEnforcementStrategy::OVERRIDE_REJECT:
205 if (rejection_reason) {
206 *rejection_reason =
"Child buffer token is incompatible and will be removed later (OVERRIDE_REJECT mode)";
211 case TokenEnforcementStrategy::IGNORE:
220 return m_pending_count.load(std::memory_order_relaxed) > 0;
226 std::string rejection_reason;
227 if (!is_buffer_acceptable(buffer, &rejection_reason)) {
228 throw std::runtime_error(
"Cannot add child buffer: " + rejection_reason);
231 m_child_buffers.push_back(buffer);
233 if (!buffer->get_processing_chain() && this->get_processing_chain()) {
234 buffer->set_processing_chain(this->get_processing_chain());
240 auto it = std::find(m_child_buffers.begin(), m_child_buffers.end(), buffer);
241 if (it != m_child_buffers.end()) {
242 m_child_buffers.erase(it);
251 for (
auto& m_pending_op : m_pending_ops) {
252 if (m_pending_op.active.load(std::memory_order_acquire)) {
253 auto& op = m_pending_op;
255 if (op.is_addition) {
256 add_child_buffer_direct(op.buffer);
258 remove_child_buffer_direct(op.buffer);
263 op.active.store(
false, std::memory_order_release);
264 m_pending_count.fetch_sub(1, std::memory_order_relaxed);
310 std::atomic<uint32_t> m_pending_count { 0 };
312 static constexpr size_t MAX_PENDING = 64;
321 std::atomic<bool> active {
false };
323 bool is_addition {
true };
324 } m_pending_ops[MAX_PENDING];
virtual void remove_child_buffer(const std::shared_ptr< BufferType > &buffer)
Removes a tributary buffer from this root buffer.
virtual void add_child_buffer(const std::shared_ptr< BufferType > &buffer)
Adds a tributary buffer to this root buffer.
virtual void set_processing_rate_hint(uint32_t tick_rate)
Sets processing rate hint for the buffer.
bool m_cross_modal_sharing
Whether this buffer allows cross-modal data sharing.
const std::vector< std::shared_ptr< BufferType > > & get_child_buffers() const
Gets all tributary buffers in the aggregation hierarchy.
virtual void enable_cross_modal_sharing(bool enabled)
Enables cross-modal data sharing.
void add_child_buffer_direct(const std::shared_ptr< BufferType > &buffer)
bool is_buffer_acceptable(const std::shared_ptr< BufferType > &buffer, std::string *rejection_reason=nullptr) const
Validates if a buffer is acceptable based on current token enforcement strategy.
void process_pending_buffer_operations()
Process pending operations - call this at start of processing cycles.
virtual uint32_t get_processing_rate_hint() const
Gets the processing rate hint.
bool has_pending_operations() const
virtual bool is_token_active() const =0
Checks if the buffer is active for its assigned token.
void clear() override
Resets all data values in this buffer and its tributaries.
uint32_t m_processing_rate_hint
Processing rate hint for this buffer.
virtual bool try_add_child_buffer(const std::shared_ptr< BufferType > &buffer, std::string *rejection_reason=nullptr)
Attempts to add a child buffer without throwing exceptions.
ProcessingToken m_preferred_processing_token
Preferred processing token for this root buffer.
void remove_child_buffer_direct(const std::shared_ptr< BufferType > &buffer)
std::vector< std::shared_ptr< BufferType > > m_child_buffers
Vector of tributary buffers that contribute to this root buffer.
size_t get_num_children() const
Gets the number of tributary buffers in the aggregation hierarchy.
virtual void set_token_active(bool active)=0
Activates/deactivates processing for the current token.
virtual bool is_cross_modal_sharing_enabled() const
Checks if cross-modal sharing is enabled.
bool are_tokens_compatible(ProcessingToken preferred, ProcessingToken current)
Determines if two processing tokens are compatible for joint execution.
ProcessingToken
Bitfield enum defining processing characteristics and backend requirements for buffer operations.
TokenEnforcementStrategy
Defines how strictly processing token requirements are enforced in buffer processing chains.
std::shared_ptr< BufferType > buffer
Structure for storing pending buffer add/remove operations.