1
0
mirror of https://github.com/Bayselonarrend/OpenIntegrations.git synced 2026-04-24 20:34:14 +02:00

Доработка компоненты ws_server

This commit is contained in:
Anton Titovets
2026-04-16 08:48:24 +03:00
parent d50d4ffc3c
commit 42fc9f9745
7 changed files with 66 additions and 35 deletions
+6 -6
View File
@@ -1,10 +1,10 @@
"MAIN ---"
linux-vdso.so.1 (0x00007fff843fd000)
libm.so.6 => /lib64/libm.so.6 (0x00007b93e4200000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x00007b93e3e00000)
libc.so.6 => /lib64/libc.so.6 (0x00007b93e3a00000)
libdl.so.2 => /lib64/libdl.so.2 (0x00007b93e3600000)
/lib64/ld-linux-x86-64.so.2 (0x00007b93e4600000)
linux-vdso.so.1 (0x00007ffe81f4c000)
libm.so.6 => /lib64/libm.so.6 (0x0000739af4200000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x0000739af3e00000)
libc.so.6 => /lib64/libc.so.6 (0x0000739af3a00000)
libdl.so.2 => /lib64/libdl.so.2 (0x0000739af3600000)
/lib64/ld-linux-x86-64.so.2 (0x0000739af4600000)
GLIBC_2.2.5
GLIBC_2.12
GLIBC_2.3
+50 -20
View File
@@ -1,8 +1,10 @@
use std::sync::Arc;
use std::sync::Mutex;
use std::net::SocketAddr;
use axum::{
Router,
routing::get,
extract::{State, WebSocketUpgrade, ws::{WebSocket, Message}},
extract::{ConnectInfo, State, WebSocketUpgrade, ws::{WebSocket, Message}},
response::Response,
};
use tokio::sync::mpsc;
@@ -42,7 +44,7 @@ impl Default for WebSocketServerConfig {
pub struct WebSocketServerState {
vault: BinaryVault,
logger: Option<Arc<Logger>>,
manager: ConnectionManager<WebSocketConnection>,
manager: Arc<Mutex<ConnectionManager<WebSocketConnection>>>,
message_rx: Option<mpsc::UnboundedReceiver<(String, Vec<u8>)>>,
message_tx: mpsc::UnboundedSender<(String, Vec<u8>)>,
connection_rx: Option<mpsc::UnboundedReceiver<String>>,
@@ -51,6 +53,7 @@ pub struct WebSocketServerState {
pub struct WebSocketConnection {
tx: mpsc::UnboundedSender<Vec<u8>>,
addr: String,
}
impl WebSocketServerState {
@@ -78,14 +81,14 @@ impl WebSocketServerState {
log!(log, "Starting WebSocket server on port {} with config: {:?}", port, config);
}
let manager = ConnectionManager::new(config.max_connections, logger.clone());
let manager = Arc::new(Mutex::new(ConnectionManager::new(config.max_connections, logger.clone())));
let (message_tx, message_rx) = mpsc::unbounded_channel();
let (connection_tx, connection_rx) = mpsc::unbounded_channel();
let state = Arc::new(tokio::sync::Mutex::new(WebSocketServerState {
vault: vault.clone(),
logger: logger.clone(),
manager,
manager: manager.clone(),
message_tx: message_tx.clone(),
message_rx: Some(message_rx),
connection_tx: connection_tx.clone(),
@@ -112,7 +115,7 @@ impl WebSocketServerState {
}
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app).await {
if let Err(e) = axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await {
eprintln!("WebSocket server error: {}", e);
}
});
@@ -125,7 +128,7 @@ impl WebSocketServerState {
Ok(WebSocketServerState {
vault,
logger: logger.clone(),
manager: ConnectionManager::new(config.max_connections, logger),
manager,
message_rx: Some(message_rx),
message_tx,
connection_rx: Some(connection_rx),
@@ -146,10 +149,14 @@ impl WebSocketServerState {
if let Some(ref mut msg_rx) = self.message_rx {
if let Ok((conn_id, msg)) = msg_rx.try_recv() {
let exists = self.manager.get_mut(&conn_id, |_| {}).is_some();
if exists {
return Some((conn_id, Some(msg)));
let exists = {
let mut manager = self.manager.lock().unwrap();
manager.get_mut(&conn_id, |_| ()).is_some()
};
if !exists {
return None;
}
return Some((conn_id, Some(msg)));
}
}
@@ -222,7 +229,8 @@ impl WebSocketServerState {
pub async fn send_message(&mut self, connection_id: &str, message: Vec<u8>) -> String {
self.log(&format!("Sending {} bytes to WebSocket {}", message.len(), connection_id));
if let Some(()) = self.manager.get_mut(connection_id, |conn| {
let mut manager = self.manager.lock().unwrap();
if let Some(()) = manager.get_mut(connection_id, |conn| {
let _ = conn.tx.send(message);
}) {
json!({
@@ -235,7 +243,8 @@ impl WebSocketServerState {
}
pub fn close_connection(&mut self, connection_id: &str) -> String {
if self.manager.remove(connection_id) {
let mut manager = self.manager.lock().unwrap();
if manager.remove(connection_id) {
self.log(&format!("WebSocket closed: {}", connection_id));
json!({
"result": true,
@@ -247,30 +256,48 @@ impl WebSocketServerState {
}
pub fn get_connections_list(&mut self) -> String {
let ids = self.manager.get_ids();
let mut manager = self.manager.lock().unwrap();
let mut connections_list = Vec::new();
manager.iter_mut(|conn_id, conn_info| {
connections_list.push(json!({
"connectionId": conn_id,
"address": conn_info.addr
}));
});
json!({
"result": true,
"connections": ids,
"count": ids.len()
"connections": connections_list
}).to_string()
}
}
async fn ws_handler(
ws: WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<Arc<tokio::sync::Mutex<WebSocketServerState>>>,
) -> Response {
ws.on_upgrade(|socket| handle_websocket(socket, state))
ws.on_upgrade(move |socket| handle_websocket(socket, state, addr))
}
async fn handle_websocket(socket: WebSocket, state: Arc<tokio::sync::Mutex<WebSocketServerState>>) {
async fn handle_websocket(
socket: WebSocket,
state: Arc<tokio::sync::Mutex<WebSocketServerState>>,
addr: SocketAddr,
) {
let connection_id = ConnectionManager::<WebSocketConnection>::generate_id();
let (tx, mut rx) = mpsc::unbounded_channel();
let (message_tx, connection_tx) = {
let mut locked_state = state.lock().await;
locked_state.manager.add(connection_id.clone(), WebSocketConnection { tx });
let locked_state = state.lock().await;
{
let mut manager = locked_state.manager.lock().unwrap();
manager.add(connection_id.clone(), WebSocketConnection {
tx,
addr: addr.to_string(),
});
}
locked_state.log(&format!("WebSocket connected: {}", connection_id));
(locked_state.message_tx.clone(), locked_state.connection_tx.clone())
};
@@ -305,7 +332,10 @@ async fn handle_websocket(socket: WebSocket, state: Arc<tokio::sync::Mutex<WebSo
_ = recv_task => {},
}
let mut locked_state = state_clone.lock().await;
locked_state.manager.remove(&connection_id);
let locked_state = state_clone.lock().await;
{
let mut manager = locked_state.manager.lock().unwrap();
manager.remove(&connection_id);
}
locked_state.log(&format!("WebSocket disconnected: {}", connection_id));
}
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:803709fbd27de26e7dc14c2d56f862f715e45e8625f45b890e7a8d022dea4349
size 2125316
oid sha256:85deefa5f1528ba368188cf4c7f44ebb3093ee580e58261271ff9d6cfce6c063
size 2128229
+2 -2
View File
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:803709fbd27de26e7dc14c2d56f862f715e45e8625f45b890e7a8d022dea4349
size 2125316
oid sha256:85deefa5f1528ba368188cf4c7f44ebb3093ee580e58261271ff9d6cfce6c063
size 2128229
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:803709fbd27de26e7dc14c2d56f862f715e45e8625f45b890e7a8d022dea4349
size 2125316
oid sha256:85deefa5f1528ba368188cf4c7f44ebb3093ee580e58261271ff9d6cfce6c063
size 2128229
@@ -681,9 +681,10 @@
ВызватьИсключение OPI_Инструменты.JSONСтрокой(ОбъектКлиента);
Иначе
Сообщение = "Hello from client!";
OPI_WebSocket.ОтправитьТекстовоеСообщение(ОбъектКлиента, Сообщение);
Отправка = OPI_WebSocket.ОтправитьТекстовоеСообщение(ОбъектКлиента, Сообщение);
КонецЕсли;
Результат = OPI_WebSocket.ПолучитьДанныеОчередногоСоединения(ОбъектСервера, 5000); // SKIP
Результат = OPI_WebSocket.ПолучитьДанныеОчередногоСоединения(ОбъектСервера, 5000);
// END
+2 -2
View File
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:803709fbd27de26e7dc14c2d56f862f715e45e8625f45b890e7a8d022dea4349
size 2125316
oid sha256:85deefa5f1528ba368188cf4c7f44ebb3093ee580e58261271ff9d6cfce6c063
size 2128229