MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
NetworkSource.cpp
Go to the documentation of this file.
1#include "NetworkSource.hpp"
2
7
8namespace MayaFlux::Vruta {
9
11{
14
15 if (!svc) {
17 "Cannot open NetworkSource: NetworkService not available");
18 return;
19 }
20
21 m_endpoint_id = svc->open_endpoint(info);
22
23 if (m_endpoint_id == 0) {
25 "NetworkSource: endpoint open failed");
26 return;
27 }
28
29 svc->set_endpoint_receive_callback(m_endpoint_id,
30 [this](uint64_t id, const uint8_t* data, size_t size, std::string_view addr) {
32 msg.endpoint_id = id;
33 msg.data.assign(data, data + size);
34 msg.sender_address = std::string(addr);
35 signal(msg);
36 });
37
39 "NetworkSource opened endpoint {}", m_endpoint_id);
40}
41
43{
44 if (m_endpoint_id == 0) {
45 return;
46 }
47
50
51 if (svc) {
53 svc->close_endpoint(m_endpoint_id);
54 }
55}
56
58{
59 auto* w = m_waiters_head.exchange(nullptr, std::memory_order_acq_rel);
60
61 if (w) {
62 while (w) {
63 auto* next = w->m_next.load(std::memory_order_relaxed);
64 w->deliver(message);
65 w = next;
66 }
67 } else {
68 (void)m_queue.push(message);
69 }
70}
71
76
78{
79 return !m_queue.snapshot().empty();
80}
81
83{
84 return m_queue.snapshot().size();
85}
86
88{
89 while (m_queue.pop()) { }
90}
91
92std::optional<Core::NetworkMessage> NetworkSource::pop_message()
93{
94 return m_queue.pop();
95}
96
98{
99 auto* head = m_waiters_head.load(std::memory_order_relaxed);
100 do {
101 awaiter->m_next.store(head, std::memory_order_relaxed);
102 } while (!m_waiters_head.compare_exchange_weak(head, awaiter,
103 std::memory_order_release, std::memory_order_relaxed));
104}
105
107{
108 auto* expected = awaiter;
109 if (m_waiters_head.compare_exchange_strong(expected,
110 awaiter->m_next.load(std::memory_order_relaxed),
111 std::memory_order_acq_rel)) {
112 return;
113 }
114
115 auto* cur = m_waiters_head.load(std::memory_order_acquire);
116 while (cur) {
117 auto* next = cur->m_next.load(std::memory_order_relaxed);
118 if (next == awaiter) {
119 cur->m_next.compare_exchange_strong(next,
120 awaiter->m_next.load(std::memory_order_relaxed),
121 std::memory_order_acq_rel);
122 return;
123 }
124 cur = next;
125 }
126}
127
128} // namespace MayaFlux::Vruta
#define MF_WARN(comp, ctx,...)
#define MF_DEBUG(comp, ctx,...)
Range size
uint32_t id
std::atomic< NetworkAwaiter * > m_next
Intrusive list link for lock-free waiter broadcast.
Awaiter for suspending a coroutine until a network message arrives.
Interface * get_service()
Query for a backend service.
static BackendRegistry & instance()
Get the global registry instance.
void register_waiter(Kriya::NetworkAwaiter *awaiter)
~NetworkSource()
Close the endpoint and release all resources.
void unregister_waiter(Kriya::NetworkAwaiter *awaiter)
Kriya::NetworkAwaiter next_message()
Create an awaiter for the next message.
void clear()
Clear all pending messages.
std::optional< Core::NetworkMessage > pop_message()
Memory::LockFreeQueue< Core::NetworkMessage, QUEUE_CAPACITY > m_queue
std::atomic< Kriya::NetworkAwaiter * > m_waiters_head
Head of the intrusive lock-free waiter list.
bool has_pending() const
Check if messages are pending.
size_t pending_count() const
Get number of pending messages.
void signal(const Core::NetworkMessage &message)
Signal that a message arrived (called from Asio IO thread)
NetworkSource(const Core::EndpointInfo &info)
Open a network endpoint and begin receiving.
@ Networking
Network operations (data transfer, protocol handling)
@ Vruta
Coroutines, schedulers, clocks, task management.
Describes one logical send/receive endpoint managed by a backend.
A received datagram or framed message with sender metadata.
std::function< void(uint64_t endpoint_id, std::function< void(uint64_t, const uint8_t *, size_t, std::string_view)> callback)> set_endpoint_receive_callback
Register a per-endpoint receive callback.
Backend network transport service interface.