MayaFlux 0.1.0
Digital-First Multimedia Processing Framework
Loading...
Searching...
No Matches
Server.cpp
Go to the documentation of this file.
1#include "Server.hpp"
2
3#include "Commentator.hpp"
4
5#include <cerrno>
6#include <cstddef>
7#include <fcntl.h>
8
9#ifdef MAYAFLUX_PLATFORM_WINDOWS
10#include <mstcpip.h>
11using ssize_t = int;
12#else
13#include <arpa/inet.h>
14#include <netinet/tcp.h>
15#include <sys/select.h>
16#include <sys/socket.h>
17#include <unistd.h>
18#endif
19
20namespace Lila {
21
22// --- Small platform abstraction helpers ------------------------------------
23
24static int socket_errno()
25{
26#ifdef MAYAFLUX_PLATFORM_WINDOWS
27 return WSAGetLastError();
28#else
29 return errno;
30#endif
31}
32
33static std::string socket_error_string(int code)
34{
35#ifdef MAYAFLUX_PLATFORM_WINDOWS
36 // Keep it simple: return numeric WSA code (can be extended with FormatMessage)
37 return "WSA error " + std::to_string(code);
38#else
39 return std::string(strerror(code));
40#endif
41}
42
43static void socket_close(int fd)
44{
45#ifdef MAYAFLUX_PLATFORM_WINDOWS
46 closesocket(static_cast<SOCKET>(fd));
47#else
48 ::close(fd);
49#endif
50}
51
52static ssize_t socket_read(int fd, void* buf, size_t len)
53{
54#ifdef MAYAFLUX_PLATFORM_WINDOWS
55 // recv returns int
56 return recv(static_cast<SOCKET>(fd), static_cast<char*>(buf), static_cast<int>(len), 0);
57#else
58 return ::read(fd, buf, len);
59#endif
60}
61
62static int set_nonblocking(int fd)
63{
64#ifdef MAYAFLUX_PLATFORM_WINDOWS
65 u_long mode = 1;
66 return ioctlsocket(static_cast<SOCKET>(fd), FIONBIO, &mode);
67#else
68 int flags = fcntl(fd, F_GETFL, 0);
69 if (flags < 0)
70 return -1;
71 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
72#endif
73}
74
75// --------------------------------------------------------------------------
76
78 : m_port(port)
79{
80 LILA_DEBUG(Emitter::SYSTEM, "Server instance created on port " + std::to_string(port));
81}
82
84{
85 stop();
86}
87
88bool Server::start() noexcept
89{
90 if (m_running) {
91 LILA_WARN(Emitter::SERVER, "Server already running");
92 return false;
93 }
94
95 try {
96#ifdef MAYAFLUX_PLATFORM_WINDOWS
97 WSADATA wsaData;
98 if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
99 throw std::system_error(WSAGetLastError(), std::system_category(), "WSAStartup failed");
100 }
101#endif
102
103 m_server_fd = socket(AF_INET, SOCK_STREAM, 0);
104 if (m_server_fd < 0) {
105 throw std::system_error(socket_errno(), std::system_category(), "socket creation failed");
106 }
107
108 int opt = 1;
109 if (setsockopt(m_server_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
110 throw std::system_error(socket_errno(), std::system_category(), "setsockopt failed");
111 }
112
113 if (set_nonblocking(m_server_fd) < 0) {
114 throw std::system_error(socket_errno(), std::system_category(), "set non-blocking failed");
115 }
116
117 sockaddr_in address {};
118 address.sin_family = AF_INET;
119 address.sin_addr.s_addr = INADDR_ANY;
120 address.sin_port = htons(m_port);
121
122 if (bind(m_server_fd, reinterpret_cast<sockaddr*>(&address), sizeof(address)) < 0) {
123 throw std::system_error(socket_errno(), std::system_category(), "bind failed");
124 }
125
126 if (listen(m_server_fd, 10) < 0) {
127 throw std::system_error(socket_errno(), std::system_category(), "listen failed");
128 }
129
130 m_running = true;
131 m_event_bus.publish(StreamEvent { EventType::ServerStart });
132 if (m_start_handler) {
134 }
135
136 m_server_thread = ServerThread([this](const auto& token) { server_loop(token); });
137
138 LILA_INFO(Emitter::SERVER, "Server started on port " + std::to_string(m_port));
139
140 m_event_bus.publish(StreamEvent { EventType::ServerStart });
141
142 return true;
143
144 } catch (const std::system_error& e) {
145 LILA_ERROR(Emitter::SERVER, "Failed to start server: " + std::string(e.what()) + " (code: " + std::to_string(e.code().value()) + ")");
146 if (m_server_fd >= 0) {
148 m_server_fd = -1;
149 }
150#ifdef MAYAFLUX_PLATFORM_WINDOWS
151 WSACleanup();
152#endif
153 return false;
154 }
155}
156
157void Server::stop() noexcept
158{
159 if (!m_running)
160 return;
161
162 m_running = false;
163
167 }
168
169 if (m_server_fd >= 0) {
171 m_server_fd = -1;
172 }
173
174 {
175 std::unique_lock lock(m_clients_mutex);
176 for (const auto& [fd, client] : m_connected_clients) {
177 socket_close(fd);
178 }
179 m_connected_clients.clear();
180 }
181
182 m_event_bus.publish(StreamEvent { EventType::ServerStop });
183
184#ifdef MAYAFLUX_PLATFORM_WINDOWS
185 WSACleanup();
186#endif
187
188 LILA_INFO(Emitter::SERVER, "Server stopped");
189}
190
191#ifndef MAYAFLUX_JTHREAD_BROKEN
192void Server::server_loop(const std::stop_token& stop_token)
193#else
195#endif
196{
197 while (!stop_token.stop_requested() && m_running) {
198 fd_set readfds;
199 FD_ZERO(&readfds);
200 FD_SET(m_server_fd, &readfds);
201
202 timeval timeout { .tv_sec = 1, .tv_usec = 0 };
203
204#ifdef MAYAFLUX_PLATFORM_WINDOWS
205 int activity = select(0, &readfds, nullptr, nullptr, &timeout); // nfds ignored on Windows
206#else
207 int activity = select(m_server_fd + 1, &readfds, nullptr, nullptr, &timeout);
208#endif
209
210 if (activity < 0) {
211 int code = socket_errno();
212#ifndef MAYAFLUX_PLATFORM_WINDOWS
213 if (code == EINTR)
214 continue;
215#endif
216 if (m_running) {
217 LILA_ERROR(Emitter::SERVER, "Select error: " + socket_error_string(code));
218 }
219 break;
220 }
221
222 if (activity == 0)
223 continue;
224
225 sockaddr_in client_addr {};
226 socklen_t client_len = sizeof(client_addr);
227
228#ifdef MAYAFLUX_PLATFORM_WINDOWS
229 SOCKET client_socket = accept(static_cast<SOCKET>(m_server_fd), reinterpret_cast<sockaddr*>(&client_addr), &client_len);
230 int client_fd = static_cast<int>(client_socket);
231 if (client_socket == INVALID_SOCKET) {
232 int code = socket_errno();
233 if (code == WSAEWOULDBLOCK)
234 continue;
235 if (m_running) {
236 LILA_ERROR(Emitter::SERVER, "Accept failed: " + socket_error_string(code));
237 }
238 continue;
239 }
240#else
241 int client_fd = accept(m_server_fd, reinterpret_cast<sockaddr*>(&client_addr), &client_len);
242 if (client_fd < 0) {
243 if (errno == EAGAIN || errno == EWOULDBLOCK)
244 continue;
245 if (m_running) {
246 LILA_ERROR(Emitter::SERVER, "Accept failed: " + std::string(strerror(errno)));
247 }
248 continue;
249 }
250#endif
251
252 if (set_nonblocking(client_fd) < 0) {
253 LILA_WARN(Emitter::SERVER, "Failed to set non-blocking on client fd " + std::to_string(client_fd));
254 }
255
256 handle_client(client_fd);
257 }
258}
259
260void Server::handle_client(int client_fd)
261{
262 int enable = 1;
263 if (setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<char*>(&enable), sizeof(enable)) < 0) {
264 LILA_WARN(Emitter::SERVER, "Failed to set TCP_NODELAY on client fd " + std::to_string(client_fd) + ": " + socket_error_string(socket_errno()));
265 }
266
267 ClientInfo client_info {
268 .fd = client_fd,
269 .session_id = "",
270 .connected_at = std::chrono::system_clock::now()
271 };
272
273 {
274 std::unique_lock lock(m_clients_mutex);
275 m_connected_clients[client_fd] = client_info;
276 }
277
278 if (m_connect_handler) {
279 m_connect_handler(client_info);
280 }
281
282 m_event_bus.publish(StreamEvent { EventType::ClientConnected, client_info });
283
284 LILA_INFO(Emitter::SERVER, "Client connected fd: " + std::to_string(client_fd));
285
286 while (m_running) {
287 auto message = read_message(client_fd);
288
289 if (!message) {
290 if (message.error() != "timeout" && message.error() != "would_block") {
291 break;
292 }
293 std::this_thread::sleep_for(std::chrono::milliseconds(8));
294 continue;
295 }
296
297 if (message->empty())
298 continue;
299
300 if (message->starts_with('@')) {
301 process_control_message(client_fd, message->substr(1));
302 continue;
303 }
304
305 if (m_message_handler) {
306 auto response = m_message_handler(*message);
307 if (response) {
308 send_message(client_fd, *response);
309 } else {
310 send_message(client_fd, "{\"status\":\"error\",\"message\":\"" + response.error() + "\"}");
311 }
312 }
313 }
314
315 cleanup_client(client_fd);
316}
317
318void Server::process_control_message(int client_fd, std::string_view message)
319{
320 if (message.starts_with("session ")) {
321 std::string session_id(message.substr(8));
322 set_client_session(client_fd, session_id);
323 send_message(client_fd, R"({"status":"success","message":"Session ID set"})");
324 } else if (message.starts_with("ping")) {
325 send_message(client_fd, R"({"status":"success","message":"pong"})");
326 } else {
327 send_message(client_fd, "{\"status\":\"error\",\"message\":\"Unknown command: " + std::string(message) + "\"}");
328 }
329}
330
331void Server::set_client_session(int client_fd, std::string session_id)
332{
333 std::unique_lock lock(m_clients_mutex);
334 if (auto it = m_connected_clients.find(client_fd); it != m_connected_clients.end()) {
335 it->second.session_id = std::move(session_id);
336 }
337}
338
339std::expected<std::string, std::string> Server::read_message(int client_fd)
340{
341 constexpr size_t BUFFER_SIZE = 4096;
342 std::string result;
343
344 result.reserve(BUFFER_SIZE);
345 std::array<char, BUFFER_SIZE> buffer {};
346
347 while (true) {
348 ssize_t bytes_read = socket_read(client_fd, buffer.data(), buffer.size() - 1);
349
350 if (bytes_read < 0) {
351#ifdef MAYAFLUX_PLATFORM_WINDOWS
352 int code = socket_errno();
353 if (code == WSAEWOULDBLOCK) {
354 return std::unexpected("would_block");
355 }
356 return std::unexpected("read error: " + socket_error_string(code));
357#else
358 if (errno == EAGAIN || errno == EWOULDBLOCK) {
359 return std::unexpected("would_block");
360 }
361 return std::unexpected("read error: " + std::string(strerror(errno)));
362#endif
363 }
364
365 if (bytes_read == 0) {
366 return std::unexpected("client_disconnected");
367 }
368
369 buffer[bytes_read] = '\0';
370 result.append(buffer.data());
371
372 if (!result.empty() && result.back() == '\n') {
373 result.pop_back();
374 if (!result.empty() && result.back() == '\r') {
375 result.pop_back();
376 }
377 break;
378 }
379
380 if (result.size() > static_cast<size_t>(1024 * 1024)) {
381 return std::unexpected("message_too_large");
382 }
383 }
384
385 return result;
386}
387
388bool Server::send_message(int client_fd, std::string_view message)
389{
390 std::string msg_with_newline = std::string(message) + "\n";
391#ifdef MAYAFLUX_PLATFORM_WINDOWS
392 int sent = send(static_cast<SOCKET>(client_fd), msg_with_newline.data(), static_cast<int>(msg_with_newline.size()), 0);
393 return sent == static_cast<int>(msg_with_newline.size());
394#else
395 ssize_t sent = send(client_fd, msg_with_newline.data(), msg_with_newline.size(), 0);
396 return sent == static_cast<ssize_t>(msg_with_newline.size());
397#endif
398}
399
400void Server::cleanup_client(int client_fd)
401{
402 std::optional<ClientInfo> client_info;
403
404 {
405 std::unique_lock lock(m_clients_mutex);
406 if (auto it = m_connected_clients.find(client_fd); it != m_connected_clients.end()) {
407 client_info = it->second;
408 m_connected_clients.erase(it);
409 }
410 }
411
412 if (client_info) {
414 m_disconnect_handler(*client_info);
415 }
416
417 m_event_bus.publish(StreamEvent { EventType::ClientDisconnected, *client_info });
418
419 LILA_INFO(Emitter::SERVER, "Client disconnected (fd: " + std::to_string(client_fd) + ")");
420 }
421
422 socket_close(client_fd);
423}
424
425void Server::broadcast_event(const StreamEvent& /*event*/, std::optional<std::string_view> target_session)
426{
427 std::shared_lock lock(m_clients_mutex);
428
429 for (const auto& [fd, client] : m_connected_clients) {
430 if (target_session && client.session_id != *target_session) {
431 continue;
432 }
433
434 send_message(fd, "TODO: Serialize event to JSON");
435 }
436}
437
438std::vector<ClientInfo> Server::get_connected_clients() const
439{
440 std::shared_lock lock(m_clients_mutex);
441 return m_connected_clients | std::views::values | std::ranges::to<std::vector>();
442}
443
444} // namespace Lila
#define LILA_WARN(emitter, msg)
#define LILA_ERROR(emitter, msg)
#define LILA_DEBUG(emitter, msg)
#define LILA_INFO(emitter, msg)
static MayaFlux::Nodes::ProcessingToken token
Definition Timers.cpp:8
void publish(const StreamEvent &event)
Publish an event to all subscribers of its type.
Definition EventBus.cpp:18
bool stop_requested() const noexcept
API-compatible stop token for platforms without std::stop_token.
void request_stop() noexcept
bool joinable() const noexcept
Fallback std::thread-based thread wrapper with manual stop signaling.
ServerThread m_server_thread
Server thread.
Definition Server.hpp:165
std::atomic< bool > m_running
Server running state.
Definition Server.hpp:164
std::expected< std::string, std::string > read_message(int client_fd)
Reads a message from a client.
Definition Server.cpp:339
ConnectionHandler m_disconnect_handler
Handler for client disconnections.
Definition Server.hpp:169
void set_client_session(int client_fd, std::string session_id)
Sets the session ID for a client.
Definition Server.cpp:331
void handle_client(int client_fd)
Handles communication with a single client.
Definition Server.cpp:260
std::shared_mutex m_clients_mutex
Mutex for client map.
Definition Server.hpp:173
bool start() noexcept
Starts the server and begins accepting clients.
Definition Server.cpp:88
Server(int port=9090)
Constructs a Server instance.
Definition Server.cpp:77
std::unordered_map< int, ClientInfo > m_connected_clients
Map of connected clients.
Definition Server.hpp:174
void stop() noexcept
Stops the server and disconnects all clients.
Definition Server.cpp:157
int m_server_fd
Server socket file descriptor.
Definition Server.hpp:163
int m_port
TCP port to listen on.
Definition Server.hpp:162
MessageHandler m_message_handler
Handler for client messages.
Definition Server.hpp:167
StartHandler m_start_handler
Handler for server start.
Definition Server.hpp:170
bool send_message(int client_fd, std::string_view message)
Sends a message to a client.
Definition Server.cpp:388
void broadcast_event(const StreamEvent &event, std::optional< std::string_view > target_session=std::nullopt)
Broadcasts an event to connected clients.
Definition Server.cpp:425
EventBus m_event_bus
Event bus for publishing server events.
Definition Server.hpp:171
void process_control_message(int client_fd, std::string_view message)
Processes control messages (e.g., session, ping)
Definition Server.cpp:318
ConnectionHandler m_connect_handler
Handler for client connections.
Definition Server.hpp:168
~Server()
Destructor; stops the server if running.
Definition Server.cpp:83
void cleanup_client(int client_fd)
Cleans up resources for a disconnected client.
Definition Server.cpp:400
std::vector< ClientInfo > get_connected_clients() const
Gets a list of all currently connected clients.
Definition Server.cpp:438
void server_loop(const ServerThread::StopToken &stop_token)
Main server loop; accepts and manages clients.
Definition Server.cpp:194
static int set_nonblocking(int fd)
Definition Server.cpp:62
static void socket_close(int fd)
Definition Server.cpp:43
static std::string socket_error_string(int code)
Definition Server.cpp:33
static int socket_errno()
Definition Server.cpp:24
static ssize_t socket_read(int fd, void *buf, size_t len)
Definition Server.cpp:52
int fd
Client socket file descriptor.
Definition EventBus.hpp:59
Information about a connected client.
Definition EventBus.hpp:58
Represents a published event in the system.
Definition EventBus.hpp:121