11class ClientSession :
public std::enable_shared_from_this<ClientSession> {
13 ClientSession(asio::ip::tcp::socket socket,
int client_id, Server& server)
14 : m_socket(
std::move(socket))
20 .connected_at = std::chrono::system_clock::now()
23 asio::ip::tcp::no_delay nodelay(
true);
25 if (m_socket.set_option(nodelay, ec)) {
27 "Failed to set TCP_NODELAY on client " + std::to_string(client_id)
28 +
": " + ec.message());
32 void start() { read_chunk(); }
34 void send(std::string_view message)
36 auto msg = std::make_shared<std::string>(message);
39 asio::async_write(m_socket, asio::buffer(*msg),
40 [self = shared_from_this(), msg](
const asio::error_code& ec,
size_t) {
43 "Write failed for client " + std::to_string(self->m_info.fd)
44 +
": " + ec.message());
52 if (m_socket.shutdown(asio::ip::tcp::socket::shutdown_both, ec)) {
54 "Failed to shutdown socket for client " + std::to_string(m_info.fd)
55 +
": " + ec.message());
58 if (m_socket.close(ec)) {
60 "Failed to close socket for client " + std::to_string(m_info.fd)
61 +
": " + ec.message());
65 ClientInfo& info() {
return m_info; }
66 const ClientInfo& info()
const {
return m_info; }
71 m_socket.async_read_some(asio::buffer(m_read_buf),
72 [self = shared_from_this()](
const asio::error_code& ec,
size_t bytes) {
73 self->on_chunk(ec, bytes);
77 void on_chunk(
const asio::error_code& ec,
size_t bytes)
80 m_server.remove_session(m_info.fd);
84 m_block_buf.append(m_read_buf.data(), bytes);
86 if (m_block_buf.size() >
static_cast<size_t>(1024 * 1024)) {
92 if (m_block_buf.back() ==
'\n') {
93 while (!m_block_buf.empty() && (m_block_buf.back() ==
'\n' || m_block_buf.back() ==
'\r'))
94 m_block_buf.pop_back();
96 if (!m_block_buf.empty()) {
97 if (m_block_buf.starts_with(
'@')) {
98 m_server.process_control_message(m_info.fd, std::string_view(m_block_buf).substr(1));
99 }
else if (m_server.m_message_handler) {
100 auto response = m_server.m_message_handler(m_block_buf);
104 send(R
"({"status":"error","message":")" + response.error() + "\"}");
114 asio::ip::tcp::socket m_socket;
115 std::array<char, 4096> m_read_buf {};
116 std::string m_block_buf;
130 LILA_DEBUG(Emitter::SYSTEM,
"Server instance created on port " + std::to_string(port));
138bool Server::start() noexcept
140 if (m_running.load(std::memory_order_acquire)) {
141 LILA_WARN(Emitter::SERVER,
"Server already running");
146 auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(),
static_cast<asio::ip::port_type
>(m_port));
147 m_acceptor = std::make_unique<asio::ip::tcp::acceptor>(m_io_context);
148 m_acceptor->open(endpoint.protocol());
149 m_acceptor->set_option(asio::ip::tcp::acceptor::reuse_address(
true));
150 m_acceptor->bind(endpoint);
151 m_acceptor->listen(10);
152 }
catch (
const asio::system_error& e) {
154 "Failed to start server: " + std::string(e.what()));
159 m_running.store(
true, std::memory_order_release);
160 m_event_bus.publish(StreamEvent { EventType::ServerStart });
162 if (m_start_handler) {
168 m_io_thread = std::thread([
this]() {
169 LILA_DEBUG(Emitter::SERVER,
"IO thread started");
171 LILA_DEBUG(Emitter::SERVER,
"IO thread exiting");
174 LILA_INFO(Emitter::SERVER,
"Server started on port " + std::to_string(m_port));
178void Server::stop() noexcept
180 if (!m_running.exchange(
false, std::memory_order_acq_rel)) {
186 if (m_acceptor->close(ec)) {
188 "Failed to close acceptor: " + ec.message());
193 std::unique_lock lock(m_clients_mutex);
194 for (
auto& [
id, session] : m_sessions) {
202 if (m_io_thread.joinable()) {
207 m_event_bus.publish(StreamEvent { EventType::ServerStop });
208 LILA_INFO(Emitter::SERVER,
"Server stopped");
211void Server::start_accept()
213 m_acceptor->async_accept(
214 [
this](
const asio::error_code& ec, asio::ip::tcp::socket socket) {
216 if (m_running.load(std::memory_order_acquire)) {
217 LILA_ERROR(Emitter::SERVER,
"Accept failed: " + ec.message());
222 int client_id = m_next_client_id++;
223 auto session = std::make_shared<ClientSession>(std::move(socket), client_id, *
this);
224 register_session(session);
231void Server::register_session(
const std::shared_ptr<ClientSession>& session)
233 int id = session->info().fd;
236 std::unique_lock lock(m_clients_mutex);
237 m_sessions[
id] = session;
240 if (m_connect_handler) {
241 m_connect_handler(session->info());
244 m_event_bus.publish(StreamEvent { EventType::ClientConnected, session->info() });
245 LILA_INFO(Emitter::SERVER,
"Client connected id: " + std::to_string(
id));
248void Server::remove_session(
int client_id)
250 std::optional<ClientInfo> info;
253 std::unique_lock lock(m_clients_mutex);
254 auto it = m_sessions.find(client_id);
255 if (it == m_sessions.end()) {
258 info = it->second->info();
260 m_sessions.erase(it);
264 if (m_disconnect_handler) {
265 m_disconnect_handler(*info);
267 m_event_bus.publish(StreamEvent { EventType::ClientDisconnected, *info });
268 LILA_INFO(Emitter::SERVER,
"Client disconnected (id: " + std::to_string(client_id) +
")");
272void Server::process_control_message(
int client_id, std::string_view message)
274 if (message.starts_with(
"session ")) {
275 std::string session_id(message.substr(8));
276 set_client_session(client_id, session_id);
278 std::shared_lock lock(m_clients_mutex);
279 if (
auto it = m_sessions.find(client_id); it != m_sessions.end()) {
281 R
"({"status":"success","message":"Session ID set to ')" + session_id + R"('"})");
284 } else if (message.starts_with(
"ping")) {
285 std::shared_lock lock(m_clients_mutex);
286 if (
auto it = m_sessions.find(client_id); it != m_sessions.end()) {
287 it->second->send(R
"({"status":"success","message":"pong"})");
291 std::shared_lock lock(m_clients_mutex);
292 if (
auto it = m_sessions.find(client_id); it != m_sessions.end()) {
294 R
"({"status":"error","message":"Unknown command: )" + std::string(message) + "\"}");
299void Server::set_client_session(
int client_id, std::string session_id)
301 std::unique_lock lock(m_clients_mutex);
302 if (
auto it = m_sessions.find(client_id); it != m_sessions.end()) {
303 it->second->info().session_id = std::move(session_id);
307std::optional<std::string> Server::get_client_session(
int client_id)
const
309 std::shared_lock lock(m_clients_mutex);
310 auto it = m_sessions.find(client_id);
311 if (it == m_sessions.end()) {
314 const auto& sid = it->second->info().session_id;
315 return sid.empty() ? std::nullopt : std::optional<std::string>(sid);
318void Server::broadcast_event(
const StreamEvent& ,
319 std::optional<std::string_view> target_session)
321 std::shared_lock lock(m_clients_mutex);
323 for (
const auto& [
id, session] : m_sessions) {
324 if (target_session && session->info().session_id != *target_session) {
327 session->send(R
"({"status":"info","message":"Event broadcast not implemented yet"})");
331void Server::broadcast_to_all(std::string_view message)
333 std::shared_lock lock(m_clients_mutex);
334 for (
const auto& [
id, session] : m_sessions) {
335 session->send(message);
339std::vector<ClientInfo> Server::get_connected_clients()
const
341 std::shared_lock lock(m_clients_mutex);
342 std::vector<ClientInfo> result;
343 result.reserve(m_sessions.size());
344 for (
const auto& [
id, session] : m_sessions) {
345 result.push_back(session->info());
void stop()
Stop all Portal::Graphics operations.