9#ifdef MAYAFLUX_PLATFORM_WINDOWS
14#include <netinet/tcp.h>
15#include <sys/select.h>
16#include <sys/socket.h>
26#ifdef MAYAFLUX_PLATFORM_WINDOWS
27 return WSAGetLastError();
35#ifdef MAYAFLUX_PLATFORM_WINDOWS
37 return "WSA error " + std::to_string(code);
39 return std::string(strerror(code));
45#ifdef MAYAFLUX_PLATFORM_WINDOWS
46 closesocket(
static_cast<SOCKET
>(fd));
54#ifdef MAYAFLUX_PLATFORM_WINDOWS
56 return recv(
static_cast<SOCKET
>(fd),
static_cast<char*
>(buf),
static_cast<int>(len), 0);
58 return ::read(fd, buf, len);
64#ifdef MAYAFLUX_PLATFORM_WINDOWS
66 return ioctlsocket(
static_cast<SOCKET
>(fd), FIONBIO, &mode);
68 int flags = fcntl(fd, F_GETFL, 0);
71 return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
80 LILA_DEBUG(Emitter::SYSTEM,
"Server instance created on port " + std::to_string(port));
91 LILA_WARN(Emitter::SERVER,
"Server already running");
96#ifdef MAYAFLUX_PLATFORM_WINDOWS
98 if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
99 throw std::system_error(WSAGetLastError(), std::system_category(),
"WSAStartup failed");
105 throw std::system_error(
socket_errno(), std::system_category(),
"socket creation failed");
109 if (setsockopt(
m_server_fd, SOL_SOCKET, SO_REUSEADDR,
reinterpret_cast<char*
>(&opt),
sizeof(opt)) < 0) {
110 throw std::system_error(
socket_errno(), std::system_category(),
"setsockopt failed");
114 throw std::system_error(
socket_errno(), std::system_category(),
"set non-blocking failed");
117 sockaddr_in address {};
118 address.sin_family = AF_INET;
119 address.sin_addr.s_addr = INADDR_ANY;
120 address.sin_port = htons(
m_port);
122 if (bind(
m_server_fd,
reinterpret_cast<sockaddr*
>(&address),
sizeof(address)) < 0) {
123 throw std::system_error(
socket_errno(), std::system_category(),
"bind failed");
127 throw std::system_error(
socket_errno(), std::system_category(),
"listen failed");
138 LILA_INFO(Emitter::SERVER,
"Server started on port " + std::to_string(
m_port));
144 }
catch (
const std::system_error& e) {
145 LILA_ERROR(Emitter::SERVER,
"Failed to start server: " + std::string(e.what()) +
" (code: " + std::to_string(e.code().value()) +
")");
150#ifdef MAYAFLUX_PLATFORM_WINDOWS
184#ifdef MAYAFLUX_PLATFORM_WINDOWS
188 LILA_INFO(Emitter::SERVER,
"Server stopped");
191#ifndef MAYAFLUX_JTHREAD_BROKEN
200 FD_SET(m_server_fd, &readfds);
202 timeval timeout { .tv_sec = 1, .tv_usec = 0 };
204#ifdef MAYAFLUX_PLATFORM_WINDOWS
205 int activity = select(0, &readfds,
nullptr,
nullptr, &timeout);
207 int activity = select(m_server_fd + 1, &readfds,
nullptr,
nullptr, &timeout);
212#ifndef MAYAFLUX_PLATFORM_WINDOWS
225 sockaddr_in client_addr {};
226 socklen_t client_len =
sizeof(client_addr);
228#ifdef MAYAFLUX_PLATFORM_WINDOWS
229 SOCKET client_socket = accept(
static_cast<SOCKET
>(m_server_fd),
reinterpret_cast<sockaddr*
>(&client_addr), &client_len);
230 int client_fd =
static_cast<int>(client_socket);
231 if (client_socket == INVALID_SOCKET) {
233 if (code == WSAEWOULDBLOCK)
241 int client_fd = accept(m_server_fd,
reinterpret_cast<sockaddr*
>(&client_addr), &client_len);
243 if (errno == EAGAIN || errno == EWOULDBLOCK)
246 LILA_ERROR(Emitter::SERVER,
"Accept failed: " + std::string(strerror(errno)));
253 LILA_WARN(Emitter::SERVER,
"Failed to set non-blocking on client fd " + std::to_string(client_fd));
256 handle_client(client_fd);
263 if (setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char*
>(&enable),
sizeof(enable)) < 0) {
270 .connected_at = std::chrono::system_clock::now()
284 LILA_INFO(Emitter::SERVER,
"Client connected fd: " + std::to_string(client_fd));
290 if (message.error() !=
"timeout" && message.error() !=
"would_block") {
293 std::this_thread::sleep_for(std::chrono::milliseconds(8));
297 if (message->empty())
300 if (message->starts_with(
'@')) {
310 send_message(client_fd,
"{\"status\":\"error\",\"message\":\"" + response.error() +
"\"}");
320 if (message.starts_with(
"session ")) {
321 std::string session_id(message.substr(8));
323 send_message(client_fd, R
"({"status":"success","message":"Session ID set"})");
324 } else if (message.starts_with(
"ping")) {
325 send_message(client_fd, R
"({"status":"success","message":"pong"})");
327 send_message(client_fd,
"{\"status\":\"error\",\"message\":\"Unknown command: " + std::string(message) +
"\"}");
335 it->second.session_id = std::move(session_id);
341 constexpr size_t BUFFER_SIZE = 4096;
344 result.reserve(BUFFER_SIZE);
345 std::array<char, BUFFER_SIZE> buffer {};
348 ssize_t bytes_read =
socket_read(client_fd, buffer.data(), buffer.size() - 1);
350 if (bytes_read < 0) {
351#ifdef MAYAFLUX_PLATFORM_WINDOWS
353 if (code == WSAEWOULDBLOCK) {
354 return std::unexpected(
"would_block");
358 if (errno == EAGAIN || errno == EWOULDBLOCK) {
359 return std::unexpected(
"would_block");
361 return std::unexpected(
"read error: " + std::string(strerror(errno)));
365 if (bytes_read == 0) {
366 return std::unexpected(
"client_disconnected");
369 buffer[bytes_read] =
'\0';
370 result.append(buffer.data());
372 if (!result.empty() && result.back() ==
'\n') {
374 if (!result.empty() && result.back() ==
'\r') {
380 if (result.size() >
static_cast<size_t>(1024 * 1024)) {
381 return std::unexpected(
"message_too_large");
390 std::string msg_with_newline = std::string(message) +
"\n";
391#ifdef MAYAFLUX_PLATFORM_WINDOWS
392 int sent = send(
static_cast<SOCKET
>(client_fd), msg_with_newline.data(),
static_cast<int>(msg_with_newline.size()), 0);
393 return sent ==
static_cast<int>(msg_with_newline.size());
395 ssize_t sent = send(client_fd, msg_with_newline.data(), msg_with_newline.size(), 0);
396 return sent ==
static_cast<ssize_t
>(msg_with_newline.size());
402 std::optional<ClientInfo> client_info;
407 client_info = it->second;
419 LILA_INFO(Emitter::SERVER,
"Client disconnected (fd: " + std::to_string(client_fd) +
")");
430 if (target_session && client.session_id != *target_session) {
static MayaFlux::Nodes::ProcessingToken token
void publish(const StreamEvent &event)
Publish an event to all subscribers of its type.
bool stop_requested() const noexcept
API-compatible stop token for platforms without std::stop_token.
void request_stop() noexcept
bool joinable() const noexcept
Fallback std::thread-based thread wrapper with manual stop signaling.
ServerThread m_server_thread
Server thread.
std::atomic< bool > m_running
Server running state.
std::expected< std::string, std::string > read_message(int client_fd)
Reads a message from a client.
ConnectionHandler m_disconnect_handler
Handler for client disconnections.
void set_client_session(int client_fd, std::string session_id)
Sets the session ID for a client.
void handle_client(int client_fd)
Handles communication with a single client.
std::shared_mutex m_clients_mutex
Mutex for client map.
bool start() noexcept
Starts the server and begins accepting clients.
Server(int port=9090)
Constructs a Server instance.
std::unordered_map< int, ClientInfo > m_connected_clients
Map of connected clients.
void stop() noexcept
Stops the server and disconnects all clients.
int m_server_fd
Server socket file descriptor.
int m_port
TCP port to listen on.
MessageHandler m_message_handler
Handler for client messages.
StartHandler m_start_handler
Handler for server start.
bool send_message(int client_fd, std::string_view message)
Sends a message to a client.
void broadcast_event(const StreamEvent &event, std::optional< std::string_view > target_session=std::nullopt)
Broadcasts an event to connected clients.
EventBus m_event_bus
Event bus for publishing server events.
void process_control_message(int client_fd, std::string_view message)
Processes control messages (e.g., session, ping)
ConnectionHandler m_connect_handler
Handler for client connections.
~Server()
Destructor; stops the server if running.
void cleanup_client(int client_fd)
Cleans up resources for a disconnected client.
std::vector< ClientInfo > get_connected_clients() const
Gets a list of all currently connected clients.
void server_loop(const ServerThread::StopToken &stop_token)
Main server loop; accepts and manages clients.
static int set_nonblocking(int fd)
static void socket_close(int fd)
static std::string socket_error_string(int code)
static int socket_errno()
static ssize_t socket_read(int fd, void *buf, size_t len)
int fd
Client socket file descriptor.
Information about a connected client.
Represents a published event in the system.