diff --git a/src/addins/ws_server/dependencies.log b/src/addins/ws_server/dependencies.log index 600e05fe28..4c91a0a768 100644 --- a/src/addins/ws_server/dependencies.log +++ b/src/addins/ws_server/dependencies.log @@ -1,10 +1,10 @@ "MAIN ---" - linux-vdso.so.1 (0x00007fff843fd000) - libm.so.6 => /lib64/libm.so.6 (0x00007b93e4200000) - libpthread.so.0 => /lib64/libpthread.so.0 (0x00007b93e3e00000) - libc.so.6 => /lib64/libc.so.6 (0x00007b93e3a00000) - libdl.so.2 => /lib64/libdl.so.2 (0x00007b93e3600000) - /lib64/ld-linux-x86-64.so.2 (0x00007b93e4600000) + linux-vdso.so.1 (0x00007ffe81f4c000) + libm.so.6 => /lib64/libm.so.6 (0x0000739af4200000) + libpthread.so.0 => /lib64/libpthread.so.0 (0x0000739af3e00000) + libc.so.6 => /lib64/libc.so.6 (0x0000739af3a00000) + libdl.so.2 => /lib64/libdl.so.2 (0x0000739af3600000) + /lib64/ld-linux-x86-64.so.2 (0x0000739af4600000) GLIBC_2.2.5 GLIBC_2.12 GLIBC_2.3 diff --git a/src/addins/ws_server/src/server.rs b/src/addins/ws_server/src/server.rs index 5887893e34..c907fdf910 100644 --- a/src/addins/ws_server/src/server.rs +++ b/src/addins/ws_server/src/server.rs @@ -1,8 +1,10 @@ use std::sync::Arc; +use std::sync::Mutex; +use std::net::SocketAddr; use axum::{ Router, routing::get, - extract::{State, WebSocketUpgrade, ws::{WebSocket, Message}}, + extract::{ConnectInfo, State, WebSocketUpgrade, ws::{WebSocket, Message}}, response::Response, }; use tokio::sync::mpsc; @@ -42,7 +44,7 @@ impl Default for WebSocketServerConfig { pub struct WebSocketServerState { vault: BinaryVault, logger: Option>, - manager: ConnectionManager, + manager: Arc>>, message_rx: Option)>>, message_tx: mpsc::UnboundedSender<(String, Vec)>, connection_rx: Option>, @@ -51,6 +53,7 @@ pub struct WebSocketServerState { pub struct WebSocketConnection { tx: mpsc::UnboundedSender>, + addr: String, } impl WebSocketServerState { @@ -78,14 +81,14 @@ impl WebSocketServerState { log!(log, "Starting WebSocket server on port {} with config: {:?}", port, config); } - let manager = ConnectionManager::new(config.max_connections, logger.clone()); + let manager = Arc::new(Mutex::new(ConnectionManager::new(config.max_connections, logger.clone()))); let (message_tx, message_rx) = mpsc::unbounded_channel(); let (connection_tx, connection_rx) = mpsc::unbounded_channel(); let state = Arc::new(tokio::sync::Mutex::new(WebSocketServerState { vault: vault.clone(), logger: logger.clone(), - manager, + manager: manager.clone(), message_tx: message_tx.clone(), message_rx: Some(message_rx), connection_tx: connection_tx.clone(), @@ -112,7 +115,7 @@ impl WebSocketServerState { } tokio::spawn(async move { - if let Err(e) = axum::serve(listener, app).await { + if let Err(e) = axum::serve(listener, app.into_make_service_with_connect_info::()).await { eprintln!("WebSocket server error: {}", e); } }); @@ -125,7 +128,7 @@ impl WebSocketServerState { Ok(WebSocketServerState { vault, logger: logger.clone(), - manager: ConnectionManager::new(config.max_connections, logger), + manager, message_rx: Some(message_rx), message_tx, connection_rx: Some(connection_rx), @@ -146,10 +149,14 @@ impl WebSocketServerState { if let Some(ref mut msg_rx) = self.message_rx { if let Ok((conn_id, msg)) = msg_rx.try_recv() { - let exists = self.manager.get_mut(&conn_id, |_| {}).is_some(); - if exists { - return Some((conn_id, Some(msg))); + let exists = { + let mut manager = self.manager.lock().unwrap(); + manager.get_mut(&conn_id, |_| ()).is_some() + }; + if !exists { + return None; } + return Some((conn_id, Some(msg))); } } @@ -222,7 +229,8 @@ impl WebSocketServerState { pub async fn send_message(&mut self, connection_id: &str, message: Vec) -> String { self.log(&format!("Sending {} bytes to WebSocket {}", message.len(), connection_id)); - if let Some(()) = self.manager.get_mut(connection_id, |conn| { + let mut manager = self.manager.lock().unwrap(); + if let Some(()) = manager.get_mut(connection_id, |conn| { let _ = conn.tx.send(message); }) { json!({ @@ -235,7 +243,8 @@ impl WebSocketServerState { } pub fn close_connection(&mut self, connection_id: &str) -> String { - if self.manager.remove(connection_id) { + let mut manager = self.manager.lock().unwrap(); + if manager.remove(connection_id) { self.log(&format!("WebSocket closed: {}", connection_id)); json!({ "result": true, @@ -247,30 +256,48 @@ impl WebSocketServerState { } pub fn get_connections_list(&mut self) -> String { - let ids = self.manager.get_ids(); + let mut manager = self.manager.lock().unwrap(); + let mut connections_list = Vec::new(); + + manager.iter_mut(|conn_id, conn_info| { + connections_list.push(json!({ + "connectionId": conn_id, + "address": conn_info.addr + })); + }); json!({ "result": true, - "connections": ids, - "count": ids.len() + "connections": connections_list }).to_string() } } async fn ws_handler( ws: WebSocketUpgrade, + ConnectInfo(addr): ConnectInfo, State(state): State>>, ) -> Response { - ws.on_upgrade(|socket| handle_websocket(socket, state)) + ws.on_upgrade(move |socket| handle_websocket(socket, state, addr)) } -async fn handle_websocket(socket: WebSocket, state: Arc>) { +async fn handle_websocket( + socket: WebSocket, + state: Arc>, + addr: SocketAddr, +) { let connection_id = ConnectionManager::::generate_id(); let (tx, mut rx) = mpsc::unbounded_channel(); let (message_tx, connection_tx) = { - let mut locked_state = state.lock().await; - locked_state.manager.add(connection_id.clone(), WebSocketConnection { tx }); + let locked_state = state.lock().await; + { + let mut manager = locked_state.manager.lock().unwrap(); + manager.add(connection_id.clone(), WebSocketConnection { + tx, + addr: addr.to_string(), + }); + } locked_state.log(&format!("WebSocket connected: {}", connection_id)); (locked_state.message_tx.clone(), locked_state.connection_tx.clone()) }; @@ -305,7 +332,10 @@ async fn handle_websocket(socket: WebSocket, state: Arc {}, } - let mut locked_state = state_clone.lock().await; - locked_state.manager.remove(&connection_id); + let locked_state = state_clone.lock().await; + { + let mut manager = locked_state.manager.lock().unwrap(); + manager.remove(&connection_id); + } locked_state.log(&format!("WebSocket disconnected: {}", connection_id)); } diff --git a/src/en/BSL/OpenIntegrations/src/CommonTemplates/OPI_WSServer/Template.addin b/src/en/BSL/OpenIntegrations/src/CommonTemplates/OPI_WSServer/Template.addin index b5504c26cb..a6523930d4 100644 --- a/src/en/BSL/OpenIntegrations/src/CommonTemplates/OPI_WSServer/Template.addin +++ b/src/en/BSL/OpenIntegrations/src/CommonTemplates/OPI_WSServer/Template.addin @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:803709fbd27de26e7dc14c2d56f862f715e45e8625f45b890e7a8d022dea4349 -size 2125316 +oid sha256:85deefa5f1528ba368188cf4c7f44ebb3093ee580e58261271ff9d6cfce6c063 +size 2128229 diff --git a/src/en/OInt/addins/OPI_WSServer.zip b/src/en/OInt/addins/OPI_WSServer.zip index b5504c26cb..a6523930d4 100644 --- a/src/en/OInt/addins/OPI_WSServer.zip +++ b/src/en/OInt/addins/OPI_WSServer.zip @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:803709fbd27de26e7dc14c2d56f862f715e45e8625f45b890e7a8d022dea4349 -size 2125316 +oid sha256:85deefa5f1528ba368188cf4c7f44ebb3093ee580e58261271ff9d6cfce6c063 +size 2128229 diff --git a/src/ru/BSL/OpenIntegrations/src/CommonTemplates/OPI_WSServer/Template.addin b/src/ru/BSL/OpenIntegrations/src/CommonTemplates/OPI_WSServer/Template.addin index b5504c26cb..a6523930d4 100644 --- a/src/ru/BSL/OpenIntegrations/src/CommonTemplates/OPI_WSServer/Template.addin +++ b/src/ru/BSL/OpenIntegrations/src/CommonTemplates/OPI_WSServer/Template.addin @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:803709fbd27de26e7dc14c2d56f862f715e45e8625f45b890e7a8d022dea4349 -size 2125316 +oid sha256:85deefa5f1528ba368188cf4c7f44ebb3093ee580e58261271ff9d6cfce6c063 +size 2128229 diff --git a/src/ru/BSL/Tests/src/CommonModules/OPIt_WebSocket/Module.bsl b/src/ru/BSL/Tests/src/CommonModules/OPIt_WebSocket/Module.bsl index a49cc6f2d0..830e0858b2 100644 --- a/src/ru/BSL/Tests/src/CommonModules/OPIt_WebSocket/Module.bsl +++ b/src/ru/BSL/Tests/src/CommonModules/OPIt_WebSocket/Module.bsl @@ -681,9 +681,10 @@ ВызватьИсключение OPI_Инструменты.JSONСтрокой(ОбъектКлиента); Иначе Сообщение = "Hello from client!"; - OPI_WebSocket.ОтправитьТекстовоеСообщение(ОбъектКлиента, Сообщение); + Отправка = OPI_WebSocket.ОтправитьТекстовоеСообщение(ОбъектКлиента, Сообщение); КонецЕсли; + Результат = OPI_WebSocket.ПолучитьДанныеОчередногоСоединения(ОбъектСервера, 5000); // SKIP Результат = OPI_WebSocket.ПолучитьДанныеОчередногоСоединения(ОбъектСервера, 5000); // END diff --git a/src/ru/OInt/addins/OPI_WSServer.zip b/src/ru/OInt/addins/OPI_WSServer.zip index b5504c26cb..a6523930d4 100644 --- a/src/ru/OInt/addins/OPI_WSServer.zip +++ b/src/ru/OInt/addins/OPI_WSServer.zip @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:803709fbd27de26e7dc14c2d56f862f715e45e8625f45b890e7a8d022dea4349 -size 2125316 +oid sha256:85deefa5f1528ba368188cf4c7f44ebb3093ee580e58261271ff9d6cfce6c063 +size 2128229