MayaFlux 0.4.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
1#include "Server.hpp"
2
3#include "Commentator.hpp"
4
5namespace Lila {
6
7// ---------------------------------------------------------------------------
8// ClientSession: one per connected client, drives the async read chain
9// ---------------------------------------------------------------------------
10
11class ClientSession : public std::enable_shared_from_this<ClientSession> {
12public:
13 ClientSession(asio::ip::tcp::socket socket, int client_id, Server& server)
14 : m_socket(std::move(socket))
15 , m_server(server)
16 {
17 m_info = ClientInfo {
18 .fd = client_id,
19 .session_id = "",
20 .connected_at = std::chrono::system_clock::now()
21 };
22
23 asio::ip::tcp::no_delay nodelay(true);
24 asio::error_code ec;
25 if (m_socket.set_option(nodelay, ec)) {
26 LILA_WARN(Emitter::SERVER,
27 "Failed to set TCP_NODELAY on client " + std::to_string(client_id)
28 + ": " + ec.message());
29 }
30 }
31
32 void start() { read_chunk(); }
33
34 void send(std::string_view message)
35 {
36 auto msg = std::make_shared<std::string>(message);
37 msg->push_back('\n');
38
39 asio::async_write(m_socket, asio::buffer(*msg),
40 [self = shared_from_this(), msg](const asio::error_code& ec, size_t) {
41 if (ec) {
42 LILA_WARN(Emitter::SERVER,
43 "Write failed for client " + std::to_string(self->m_info.fd)
44 + ": " + ec.message());
45 }
46 });
47 }
48
49 void close()
50 {
51 asio::error_code ec;
52 if (m_socket.shutdown(asio::ip::tcp::socket::shutdown_both, ec)) {
53 LILA_WARN(Emitter::SERVER,
54 "Failed to shutdown socket for client " + std::to_string(m_info.fd)
55 + ": " + ec.message());
56 }
57
58 if (m_socket.close(ec)) {
59 LILA_WARN(Emitter::SERVER,
60 "Failed to close socket for client " + std::to_string(m_info.fd)
61 + ": " + ec.message());
62 }
63 }
64
65 ClientInfo& info() { return m_info; }
66 const ClientInfo& info() const { return m_info; }
67
68private:
69 void read_chunk()
70 {
71 m_socket.async_read_some(asio::buffer(m_read_buf),
72 [self = shared_from_this()](const asio::error_code& ec, size_t bytes) {
73 self->on_chunk(ec, bytes);
74 });
75 }
76
77 void on_chunk(const asio::error_code& ec, size_t bytes)
78 {
79 if (ec) {
80 m_server.remove_session(m_info.fd);
81 return;
82 }
83
84 m_block_buf.append(m_read_buf.data(), bytes);
85
86 if (m_block_buf.size() > static_cast<size_t>(1024 * 1024)) {
87 m_block_buf.clear();
88 read_chunk();
89 return;
90 }
91
92 if (m_block_buf.back() == '\n') {
93 while (!m_block_buf.empty() && (m_block_buf.back() == '\n' || m_block_buf.back() == '\r'))
94 m_block_buf.pop_back();
95
96 if (!m_block_buf.empty()) {
97 if (m_block_buf.starts_with('@')) {
98 m_server.process_control_message(m_info.fd, std::string_view(m_block_buf).substr(1));
99 } else if (m_server.m_message_handler) {
100 auto response = m_server.m_message_handler(m_block_buf);
101 if (response) {
102 send(*response);
103 } else {
104 send(R"({"status":"error","message":")" + response.error() + "\"}");
105 }
106 }
107 }
108 m_block_buf.clear();
109 }
110
111 read_chunk();
112 }
113
114 asio::ip::tcp::socket m_socket;
115 std::array<char, 4096> m_read_buf {};
116 std::string m_block_buf;
117 ClientInfo m_info;
118 Server& m_server;
119
120 friend class Server;
121};
122
123// ---------------------------------------------------------------------------
124// Server
125// ---------------------------------------------------------------------------
126
127Server::Server(int port)
128 : m_port(port)
129{
130 LILA_DEBUG(Emitter::SYSTEM, "Server instance created on port " + std::to_string(port));
131}
132
133Server::~Server()
134{
135 stop();
136}
137
138bool Server::start() noexcept
139{
140 if (m_running.load(std::memory_order_acquire)) {
141 LILA_WARN(Emitter::SERVER, "Server already running");
142 return false;
143 }
144
145 try {
146 auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), static_cast<asio::ip::port_type>(m_port));
147 m_acceptor = std::make_unique<asio::ip::tcp::acceptor>(m_io_context);
148 m_acceptor->open(endpoint.protocol());
149 m_acceptor->set_option(asio::ip::tcp::acceptor::reuse_address(true));
150 m_acceptor->bind(endpoint);
151 m_acceptor->listen(10);
152 } catch (const asio::system_error& e) {
153 LILA_ERROR(Emitter::SERVER,
154 "Failed to start server: " + std::string(e.what()));
155 m_acceptor.reset();
156 return false;
157 }
158
159 m_running.store(true, std::memory_order_release);
160 m_event_bus.publish(StreamEvent { EventType::ServerStart });
161
162 if (m_start_handler) {
163 m_start_handler();
164 }
165
166 start_accept();
167
168 m_io_thread = std::thread([this]() {
169 LILA_DEBUG(Emitter::SERVER, "IO thread started");
170 m_io_context.run();
171 LILA_DEBUG(Emitter::SERVER, "IO thread exiting");
172 });
173
174 LILA_INFO(Emitter::SERVER, "Server started on port " + std::to_string(m_port));
175 return true;
176}
177
178void Server::stop() noexcept
179{
180 if (!m_running.exchange(false, std::memory_order_acq_rel)) {
181 return;
182 }
183
184 asio::error_code ec;
185 if (m_acceptor) {
186 if (m_acceptor->close(ec)) {
187 LILA_WARN(Emitter::SERVER,
188 "Failed to close acceptor: " + ec.message());
189 }
190 }
191
192 {
193 std::unique_lock lock(m_clients_mutex);
194 for (auto& [id, session] : m_sessions) {
195 session->close();
196 }
197 m_sessions.clear();
198 }
199
200 m_io_context.stop();
201
202 if (m_io_thread.joinable()) {
203 m_io_thread.join();
204 }
205
206 m_acceptor.reset();
207 m_event_bus.publish(StreamEvent { EventType::ServerStop });
208 LILA_INFO(Emitter::SERVER, "Server stopped");
209}
210
211void Server::start_accept()
212{
213 m_acceptor->async_accept(
214 [this](const asio::error_code& ec, asio::ip::tcp::socket socket) {
215 if (ec) {
216 if (m_running.load(std::memory_order_acquire)) {
217 LILA_ERROR(Emitter::SERVER, "Accept failed: " + ec.message());
218 }
219 return;
220 }
221
222 int client_id = m_next_client_id++;
223 auto session = std::make_shared<ClientSession>(std::move(socket), client_id, *this);
224 register_session(session);
225 session->start();
226
227 start_accept();
228 });
229}
230
231void Server::register_session(const std::shared_ptr<ClientSession>& session)
232{
233 int id = session->info().fd;
234
235 {
236 std::unique_lock lock(m_clients_mutex);
237 m_sessions[id] = session;
238 }
239
240 if (m_connect_handler) {
241 m_connect_handler(session->info());
242 }
243
244 m_event_bus.publish(StreamEvent { EventType::ClientConnected, session->info() });
245 LILA_INFO(Emitter::SERVER, "Client connected id: " + std::to_string(id));
246}
247
248void Server::remove_session(int client_id)
249{
250 std::optional<ClientInfo> info;
251
252 {
253 std::unique_lock lock(m_clients_mutex);
254 auto it = m_sessions.find(client_id);
255 if (it == m_sessions.end()) {
256 return;
257 }
258 info = it->second->info();
259 it->second->close();
260 m_sessions.erase(it);
261 }
262
263 if (info) {
264 if (m_disconnect_handler) {
265 m_disconnect_handler(*info);
266 }
267 m_event_bus.publish(StreamEvent { EventType::ClientDisconnected, *info });
268 LILA_INFO(Emitter::SERVER, "Client disconnected (id: " + std::to_string(client_id) + ")");
269 }
270}
271
272void Server::process_control_message(int client_id, std::string_view message)
273{
274 if (message.starts_with("session ")) {
275 std::string session_id(message.substr(8));
276 set_client_session(client_id, session_id);
277
278 std::shared_lock lock(m_clients_mutex);
279 if (auto it = m_sessions.find(client_id); it != m_sessions.end()) {
280 it->second->send(
281 R"({"status":"success","message":"Session ID set to ')" + session_id + R"('"})");
282 }
283
284 } else if (message.starts_with("ping")) {
285 std::shared_lock lock(m_clients_mutex);
286 if (auto it = m_sessions.find(client_id); it != m_sessions.end()) {
287 it->second->send(R"({"status":"success","message":"pong"})");
288 }
289
290 } else {
291 std::shared_lock lock(m_clients_mutex);
292 if (auto it = m_sessions.find(client_id); it != m_sessions.end()) {
293 it->second->send(
294 R"({"status":"error","message":"Unknown command: )" + std::string(message) + "\"}");
295 }
296 }
297}
298
299void Server::set_client_session(int client_id, std::string session_id)
300{
301 std::unique_lock lock(m_clients_mutex);
302 if (auto it = m_sessions.find(client_id); it != m_sessions.end()) {
303 it->second->info().session_id = std::move(session_id);
304 }
305}
306
307std::optional<std::string> Server::get_client_session(int client_id) const
308{
309 std::shared_lock lock(m_clients_mutex);
310 auto it = m_sessions.find(client_id);
311 if (it == m_sessions.end()) {
312 return std::nullopt;
313 }
314 const auto& sid = it->second->info().session_id;
315 return sid.empty() ? std::nullopt : std::optional<std::string>(sid);
316}
317
318void Server::broadcast_event(const StreamEvent& /*event*/,
319 std::optional<std::string_view> target_session)
320{
321 std::shared_lock lock(m_clients_mutex);
322
323 for (const auto& [id, session] : m_sessions) {
324 if (target_session && session->info().session_id != *target_session) {
325 continue;
326 }
327 session->send(R"({"status":"info","message":"Event broadcast not implemented yet"})");
328 }
329}
330
331void Server::broadcast_to_all(std::string_view message)
332{
333 std::shared_lock lock(m_clients_mutex);
334 for (const auto& [id, session] : m_sessions) {
335 session->send(message);
336 }
337}
338
339std::vector<ClientInfo> Server::get_connected_clients() const
340{
341 std::shared_lock lock(m_clients_mutex);
342 std::vector<ClientInfo> result;
343 result.reserve(m_sessions.size());
344 for (const auto& [id, session] : m_sessions) {
345 result.push_back(session->info());
346 }
347 return result;
348}
349
350} // namespace Lila
#define LILA_WARN(emitter, msg)
#define LILA_ERROR(emitter, msg)
#define LILA_DEBUG(emitter, msg)
#define LILA_INFO(emitter, msg)
uint32_t id
Server(int port=9090)
void stop()
Stop all Portal::Graphics operations.
Definition Graphics.cpp:69