diff --git a/src/addins/ws_server/src/backend.rs b/src/addins/ws_server/src/backend.rs index 9d5376f44e..e5910bc13f 100644 --- a/src/addins/ws_server/src/backend.rs +++ b/src/addins/ws_server/src/backend.rs @@ -32,6 +32,21 @@ pub enum WebSocketCommand { message: Vec, response: Sender, }, + SendText { + connection_id: String, + text: String, + response: Sender, + }, + SendPing { + connection_id: String, + payload: Vec, + response: Sender, + }, + SendPong { + connection_id: String, + payload: Vec, + response: Sender, + }, CloseConnection { connection_id: String, response: Sender, @@ -90,6 +105,24 @@ impl WebSocketServerBackend { ); } + WebSocketCommand::SendText { connection_id, text, response } => { + handle_async_command!(server_state, rt, response, |state| + state.send_text(&connection_id, text).await + ); + } + + WebSocketCommand::SendPing { connection_id, payload, response } => { + handle_async_command!(server_state, rt, response, |state| + state.send_ping(&connection_id, payload).await + ); + } + + WebSocketCommand::SendPong { connection_id, payload, response } => { + handle_async_command!(server_state, rt, response, |state| + state.send_pong(&connection_id, payload).await + ); + } + WebSocketCommand::CloseConnection { connection_id, response } => { handle_sync_command!(server_state, response, |state| state.close_connection(&connection_id) @@ -164,6 +197,36 @@ impl WebSocketServerBackend { }) } + pub fn send_text(&self, connection_id: String, text: String) -> String { + send_command!(self.backend, |response| { + WebSocketCommand::SendText { + connection_id, + text, + response, + } + }) + } + + pub fn send_ping(&self, connection_id: String, payload: Vec) -> String { + send_command!(self.backend, |response| { + WebSocketCommand::SendPing { + connection_id, + payload, + response, + } + }) + } + + pub fn send_pong(&self, connection_id: String, payload: Vec) -> String { + send_command!(self.backend, |response| { + WebSocketCommand::SendPong { + connection_id, + payload, + response, + } + }) + } + pub fn close_connection(&self, connection_id: String) -> String { send_command!(self.backend, |response| { WebSocketCommand::CloseConnection { diff --git a/src/addins/ws_server/src/connections.rs b/src/addins/ws_server/src/connections.rs new file mode 100644 index 0000000000..35448fb16e --- /dev/null +++ b/src/addins/ws_server/src/connections.rs @@ -0,0 +1,43 @@ +use crate::server::{OutgoingMessage, WebSocketServerState}; +use common_server::MessageHandler; +use serde_json::json; + +impl WebSocketServerState { + pub fn close_connection(&mut self, connection_id: &str) -> String { + let mut manager = self.manager.lock().unwrap(); + if let Some(close_sent) = manager.get_mut(connection_id, |conn| { + conn.is_closed = true; + conn.outgoing_tx.send(OutgoingMessage::Close).is_ok() + }) { + self.log(&format!("WebSocket closed: {}", connection_id)); + if close_sent { + json!({ + "result": true, + "message": "WebSocket closed" + }).to_string() + } else { + MessageHandler::error_response("Failed to send close frame") + } + } else { + MessageHandler::error_response("WebSocket connection not found") + } + } + + pub fn get_connections_list(&mut self) -> String { + let mut manager = self.manager.lock().unwrap(); + let mut connections_list = Vec::new(); + + manager.iter_mut(|conn_id, conn_info| { + connections_list.push(json!({ + "connectionId": conn_id, + "address": conn_info.addr, + "isActive": !conn_info.is_closed + })); + }); + + json!({ + "result": true, + "connections": connections_list + }).to_string() + } +} diff --git a/src/addins/ws_server/src/lib.rs b/src/addins/ws_server/src/lib.rs index 1a95ad85d2..df443f0353 100644 --- a/src/addins/ws_server/src/lib.rs +++ b/src/addins/ws_server/src/lib.rs @@ -1,6 +1,9 @@ mod backend; mod server; mod wrapper; +mod read; +mod write; +mod connections; use std::sync::Arc; use common_core::*; @@ -22,6 +25,9 @@ pub const METHODS: &[&[u16]] = &[ name!("ListConnections"), // 6 name!("RetrieveBinaryFromVault"), // 7 name!("GetLogs"), // 8 + name!("SendText"), // 9 + name!("SendPing"), // 10 + name!("SendPong"), // 11 ]; pub fn get_params_amount(num: usize) -> usize { @@ -35,6 +41,9 @@ pub fn get_params_amount(num: usize) -> usize { 6 => 0, // ListConnections() 7 => 1, // RetrieveBinaryFromVault(vault_key) 8 => 1, // GetLogs(count) + 9 => 2, // SendText(connection_id, text) + 10 => 2, // SendPing(connection_id, payload) + 11 => 2, // SendPong(connection_id, payload) _ => 0, } } @@ -85,6 +94,21 @@ pub fn cal_func(obj: &mut AddIn, num: usize, params: &mut [Variant]) -> Box { + let connection_id = params[0].get_string().unwrap_or_default(); + let text = params[1].get_string().unwrap_or_default(); + Box::new(obj.server.send_text(&connection_id, &text)) + }, + 10 => { + let connection_id = params[0].get_string().unwrap_or_default(); + let payload = params[1].get_blob().unwrap_or(&empty_array); + Box::new(obj.server.send_ping(&connection_id, payload.to_vec())) + }, + 11 => { + let connection_id = params[0].get_string().unwrap_or_default(); + let payload = params[1].get_blob().unwrap_or(&empty_array); + Box::new(obj.server.send_pong(&connection_id, payload.to_vec())) + }, _ => Box::new(false), } } diff --git a/src/addins/ws_server/src/read.rs b/src/addins/ws_server/src/read.rs new file mode 100644 index 0000000000..717ca1b665 --- /dev/null +++ b/src/addins/ws_server/src/read.rs @@ -0,0 +1,125 @@ +use crate::server::{WebSocketConnection, WebSocketServerState}; +use common_server::{AsyncWaiter, MessageHandler}; +use serde_json::json; + +impl WebSocketServerState { + + pub async fn get_next_message(&mut self, timeout_ms: u64) -> String { + let waiter = AsyncWaiter::new(timeout_ms); + let message_handler = MessageHandler::new(self.vault.clone()); + + let result = waiter.wait_for(|| { + let mut manager = self.manager.lock().unwrap(); + let all_ids = manager.get_ids_round_robin(); + let mut to_remove = Vec::new(); + let mut found_message = None; + for conn_id in all_ids { + if let Some((message, address, is_active)) = + manager.get_mut(&conn_id, Self::poll_connection_message) + { + if let Some(msg) = message { + manager.set_last_processed(Some(conn_id.clone())); + found_message = Some((conn_id, Some(msg), address, is_active)); + break; + } + if !is_active { + to_remove.push(conn_id.clone()); + } + } + } + + for conn_id in to_remove { + manager.remove(&conn_id); + } + + if found_message.is_some() { + return found_message; + } + + None + }).await; + + match result { + Ok((connection_id, Some(message), address, is_active)) => { + self.log(&format!("Received {} bytes from WebSocket {}", message.len(), connection_id)); + + match message_handler.store_message(message) { + Ok(vault_key) => { + json!({ + "result": true, + "connectionId": connection_id, + "address": address, + "message": vault_key, + "isActive": is_active, + }).to_string() + } + Err(e) => MessageHandler::error_response(&e), + } + } + Ok((_, None, _, _)) => MessageHandler::timeout_response(), + Err(()) => MessageHandler::timeout_response(), + } + } + + pub async fn get_message(&mut self, connection_id: &str, timeout_ms: u64) -> String { + let waiter = AsyncWaiter::new(timeout_ms); + let conn_id = connection_id.to_string(); + + let result = waiter.wait_for(|| { + let mut manager = self.manager.lock().unwrap(); + if let Some((message, address, is_active)) = + manager.get_mut(&conn_id, Self::poll_connection_message) + { + if !is_active && message.is_none() { + manager.remove(&conn_id); + } + if let Some(msg) = message { + return Some((Some(msg), address, is_active)); + } + if !is_active { + return Some((None, address, false)); + } + } + None + }).await; + + match result { + Ok((Some(message), address, is_active)) => { + self.log(&format!("Received {} bytes from WebSocket {}", message.len(), connection_id)); + + let message_handler = MessageHandler::new(self.vault.clone()); + match message_handler.store_message(message) { + Ok(vault_key) => { + json!({ + "result": true, + "connectionId": connection_id, + "address": address, + "isActive": is_active, + "message": vault_key + }).to_string() + } + Err(e) => MessageHandler::error_response(&e), + } + } + Ok((None, address, _)) => { + json!({ + "result": true, + "connectionId": connection_id, + "address": address, + "isActive": false + }).to_string() + } + Err(()) => MessageHandler::timeout_response(), + } + } + + fn poll_connection_message(conn: &mut WebSocketConnection) -> (Option>, String, bool) { + let is_active = !conn.is_closed; + let address = conn.addr.clone(); + match conn.incoming_rx.try_recv() { + Ok(msg) => (Some(msg), address, is_active), + Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (None, address, is_active), + Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => (None, address, false), + } + } +} diff --git a/src/addins/ws_server/src/server.rs b/src/addins/ws_server/src/server.rs index 433cb7c531..df4bc3f8a8 100644 --- a/src/addins/ws_server/src/server.rs +++ b/src/addins/ws_server/src/server.rs @@ -11,9 +11,8 @@ use tokio::sync::mpsc; use futures_util::{StreamExt, SinkExt}; use common_binary::vault::BinaryVault; use common_logs::{Logger, log}; -use common_server::{ConnectionManager, MessageHandler, AsyncWaiter}; +use common_server::ConnectionManager; use serde::{Deserialize, Serialize}; -use serde_json::json; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct WebSocketServerConfig { @@ -42,20 +41,28 @@ impl Default for WebSocketServerConfig { } pub struct WebSocketServerState { - vault: BinaryVault, - logger: Option>, - manager: Arc>>, + pub(crate) vault: BinaryVault, + pub(crate) logger: Option>, + pub(crate) manager: Arc>>, } pub struct WebSocketConnection { - addr: String, - outgoing_tx: mpsc::UnboundedSender>, - incoming_rx: mpsc::UnboundedReceiver>, - is_closed: bool, + pub(crate) addr: String, + pub(crate) outgoing_tx: mpsc::UnboundedSender, + pub(crate) incoming_rx: mpsc::UnboundedReceiver>, + pub(crate) is_closed: bool, +} + +pub(crate) enum OutgoingMessage { + Binary(Vec), + Text(String), + Ping(Vec), + Pong(Vec), + Close, } impl WebSocketServerState { - fn log(&self, message: &str) { + pub(crate) fn log(&self, message: &str) { if let Some(ref logger) = self.logger { log!(logger, "{}", message); } @@ -67,7 +74,7 @@ impl WebSocketServerState { vault: BinaryVault, logger: Option>, ) -> Result { - // Parse configuration + let config = if config_json.is_empty() { WebSocketServerConfig::default() } else { @@ -86,7 +93,6 @@ impl WebSocketServerState { manager: manager.clone(), })); - // Create router with configured routes let mut app = Router::new(); for route in &config.routes { if let Some(ref log) = logger { @@ -118,188 +124,6 @@ impl WebSocketServerState { }) } - pub async fn get_next_message(&mut self, timeout_ms: u64) -> String { - let waiter = AsyncWaiter::new(timeout_ms); - let message_handler = MessageHandler::new(self.vault.clone()); - - let result = waiter.wait_for(|| { - let mut manager = self.manager.lock().unwrap(); - let all_ids = manager.get_ids_round_robin(); - let mut to_remove = Vec::new(); - let mut found_message = None; - for conn_id in all_ids { - if let Some(next_state) = manager.get_mut(&conn_id, |conn| { - let is_active = !conn.is_closed; - let address = conn.addr.clone(); - match conn.incoming_rx.try_recv() { - Ok(msg) => Some((Some(msg), address, is_active)), - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Some((None, address, is_active)), - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => Some((None, address, false)), - } - }) { - if let Some((message, address, is_active)) = next_state { - if let Some(msg) = message { - manager.set_last_processed(Some(conn_id.clone())); - found_message = Some((conn_id, Some(msg), address, is_active)); - break; - } - if !is_active { - to_remove.push(conn_id.clone()); - } - } - } - } - - for conn_id in to_remove { - manager.remove(&conn_id); - } - - if found_message.is_some() { - return found_message; - } - - None - }).await; - - match result { - Ok((connection_id, Some(message), address, is_active)) => { - self.log(&format!("Received {} bytes from WebSocket {}", message.len(), connection_id)); - - match message_handler.store_message(message) { - Ok(vault_key) => { - json!({ - "result": true, - "connectionId": connection_id, - "address": address, - "message": vault_key, - "isActive": is_active, - "isNewConnection": false - }).to_string() - } - Err(e) => MessageHandler::error_response(&e), - } - } - Ok((_, None, _, _)) => MessageHandler::timeout_response(), - Err(()) => MessageHandler::timeout_response(), - } - } - - pub async fn get_message(&mut self, connection_id: &str, timeout_ms: u64) -> String { - let waiter = AsyncWaiter::new(timeout_ms); - let conn_id = connection_id.to_string(); - - let result = waiter.wait_for(|| { - let mut manager = self.manager.lock().unwrap(); - if let Some(next_state) = manager.get_mut(&conn_id, |conn| { - let is_active = !conn.is_closed; - let address = conn.addr.clone(); - match conn.incoming_rx.try_recv() { - Ok(msg) => Some((Some(msg), address, is_active)), - Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Some((None, address, is_active)), - Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => Some((None, address, false)), - } - }) { - if let Some((message, address, is_active)) = next_state { - if !is_active && message.is_none() { - manager.remove(&conn_id); - } - if let Some(msg) = message { - return Some((Some(msg), address, is_active)); - } - if !is_active { - return Some((None, address, false)); - } - } - } - None - }).await; - - match result { - Ok((Some(message), address, is_active)) => { - self.log(&format!("Received {} bytes from WebSocket {}", message.len(), connection_id)); - - let message_handler = MessageHandler::new(self.vault.clone()); - match message_handler.store_message(message) { - Ok(vault_key) => { - json!({ - "result": true, - "connectionId": connection_id, - "address": address, - "isActive": is_active, - "message": vault_key - }).to_string() - } - Err(e) => MessageHandler::error_response(&e), - } - } - Ok((None, address, _)) => { - json!({ - "result": true, - "connectionId": connection_id, - "address": address, - "isActive": false - }).to_string() - } - Err(()) => MessageHandler::timeout_response(), - } - } - - pub async fn send_message(&mut self, connection_id: &str, message: Vec) -> String { - self.log(&format!("Sending {} bytes to WebSocket {}", message.len(), connection_id)); - - let mut manager = self.manager.lock().unwrap(); - if let Some(send_result) = manager.get_mut(connection_id, |conn| { - if conn.is_closed { - return false; - } - conn.outgoing_tx.send(message).is_ok() - }) { - if send_result { - json!({ - "result": true, - "message": "Message sent" - }).to_string() - } else { - MessageHandler::error_response("WebSocket connection is closed") - } - } else { - MessageHandler::error_response("WebSocket connection not found") - } - } - - pub fn close_connection(&mut self, connection_id: &str) -> String { - let mut manager = self.manager.lock().unwrap(); - if let Some(()) = manager.get_mut(connection_id, |conn| { - conn.is_closed = true; - }) { - self.log(&format!("WebSocket closed: {}", connection_id)); - json!({ - "result": true, - "message": "WebSocket closed" - }).to_string() - } else { - MessageHandler::error_response("WebSocket connection not found") - } - } - - pub fn get_connections_list(&mut self) -> String { - let mut manager = self.manager.lock().unwrap(); - let mut connections_list = Vec::new(); - - manager.iter_mut(|conn_id, conn_info| { - connections_list.push(json!({ - "connectionId": conn_id, - "address": conn_info.addr, - "isActive": !conn_info.is_closed - })); - }); - - json!({ - "result": true, - "connections": connections_list - }).to_string() - } - } async fn ws_handler( @@ -339,8 +163,31 @@ async fn handle_websocket( let send_task = tokio::spawn(async move { while let Some(msg) = outgoing_rx.recv().await { - if ws_sender.send(Message::Binary(msg.into())).await.is_err() { - break; + match msg { + OutgoingMessage::Binary(data) => { + if ws_sender.send(Message::Binary(data.into())).await.is_err() { + break; + } + } + OutgoingMessage::Text(text) => { + if ws_sender.send(Message::Text(text.into())).await.is_err() { + break; + } + } + OutgoingMessage::Ping(data) => { + if ws_sender.send(Message::Ping(data.into())).await.is_err() { + break; + } + } + OutgoingMessage::Pong(data) => { + if ws_sender.send(Message::Pong(data.into())).await.is_err() { + break; + } + } + OutgoingMessage::Close => { + let _ = ws_sender.send(Message::Close(None)).await; + break; + } } } }); diff --git a/src/addins/ws_server/src/wrapper.rs b/src/addins/ws_server/src/wrapper.rs index 9e74d2171c..c0866a13d1 100644 --- a/src/addins/ws_server/src/wrapper.rs +++ b/src/addins/ws_server/src/wrapper.rs @@ -88,6 +88,39 @@ impl WebSocketServer { } } + pub fn send_text(&self, connection_id: &str, text: &str) -> String { + if !self.started { + return json_error("WebSocket server not started"); + } + + match self.backend.lock() { + Ok(backend) => backend.send_text(connection_id.to_string(), text.to_string()), + Err(e) => json_error(&format!("Failed to lock backend: {}", e)), + } + } + + pub fn send_ping(&self, connection_id: &str, payload: Vec) -> String { + if !self.started { + return json_error("WebSocket server not started"); + } + + match self.backend.lock() { + Ok(backend) => backend.send_ping(connection_id.to_string(), payload), + Err(e) => json_error(&format!("Failed to lock backend: {}", e)), + } + } + + pub fn send_pong(&self, connection_id: &str, payload: Vec) -> String { + if !self.started { + return json_error("WebSocket server not started"); + } + + match self.backend.lock() { + Ok(backend) => backend.send_pong(connection_id.to_string(), payload), + Err(e) => json_error(&format!("Failed to lock backend: {}", e)), + } + } + pub fn close_connection(&self, connection_id: &str) -> String { if !self.started { return json_error("WebSocket server not started"); diff --git a/src/addins/ws_server/src/write.rs b/src/addins/ws_server/src/write.rs new file mode 100644 index 0000000000..30f8a644df --- /dev/null +++ b/src/addins/ws_server/src/write.rs @@ -0,0 +1,62 @@ +use crate::server::{OutgoingMessage, WebSocketServerState}; +use common_server::MessageHandler; +use serde_json::json; + +impl WebSocketServerState { + + pub async fn send_message(&mut self, connection_id: &str, message: Vec) -> String { + let length = message.len(); + self.send_frame( + connection_id, + OutgoingMessage::Binary(message), + &format!("Sending {} bytes to WebSocket {}", length, connection_id), + ) + } + + pub async fn send_text(&mut self, connection_id: &str, text: String) -> String { + self.send_frame( + connection_id, + OutgoingMessage::Text(text), + &format!("Sending text to WebSocket {}", connection_id), + ) + } + + pub async fn send_ping(&mut self, connection_id: &str, payload: Vec) -> String { + self.send_frame( + connection_id, + OutgoingMessage::Ping(payload), + &format!("Sending ping to WebSocket {}", connection_id), + ) + } + + pub async fn send_pong(&mut self, connection_id: &str, payload: Vec) -> String { + self.send_frame( + connection_id, + OutgoingMessage::Pong(payload), + &format!("Sending pong to WebSocket {}", connection_id), + ) + } + + fn send_frame(&self, connection_id: &str, frame: OutgoingMessage, log_message: &str) -> String { + self.log(log_message); + + let mut manager = self.manager.lock().unwrap(); + if let Some(send_result) = manager.get_mut(connection_id, |conn| { + if conn.is_closed { + return false; + } + conn.outgoing_tx.send(frame).is_ok() + }) { + if send_result { + json!({ + "result": true, + "message": "Message sent" + }).to_string() + } else { + MessageHandler::error_response("WebSocket connection is closed") + } + } else { + MessageHandler::error_response("WebSocket connection not found") + } + } +}