1
0
mirror of https://github.com/Bayselonarrend/OpenIntegrations.git synced 2026-04-26 20:43:22 +02:00

Новые типы сообщений, рефакторинг ws_client

This commit is contained in:
Anton Titovets
2026-04-16 10:45:02 +03:00
parent 79b2cab5ce
commit d03d7cb791
7 changed files with 393 additions and 196 deletions
+63
View File
@@ -32,6 +32,21 @@ pub enum WebSocketCommand {
message: Vec<u8>,
response: Sender<String>,
},
SendText {
connection_id: String,
text: String,
response: Sender<String>,
},
SendPing {
connection_id: String,
payload: Vec<u8>,
response: Sender<String>,
},
SendPong {
connection_id: String,
payload: Vec<u8>,
response: Sender<String>,
},
CloseConnection {
connection_id: String,
response: Sender<String>,
@@ -90,6 +105,24 @@ impl WebSocketServerBackend {
);
}
WebSocketCommand::SendText { connection_id, text, response } => {
handle_async_command!(server_state, rt, response, |state|
state.send_text(&connection_id, text).await
);
}
WebSocketCommand::SendPing { connection_id, payload, response } => {
handle_async_command!(server_state, rt, response, |state|
state.send_ping(&connection_id, payload).await
);
}
WebSocketCommand::SendPong { connection_id, payload, response } => {
handle_async_command!(server_state, rt, response, |state|
state.send_pong(&connection_id, payload).await
);
}
WebSocketCommand::CloseConnection { connection_id, response } => {
handle_sync_command!(server_state, response, |state|
state.close_connection(&connection_id)
@@ -164,6 +197,36 @@ impl WebSocketServerBackend {
})
}
pub fn send_text(&self, connection_id: String, text: String) -> String {
send_command!(self.backend, |response| {
WebSocketCommand::SendText {
connection_id,
text,
response,
}
})
}
pub fn send_ping(&self, connection_id: String, payload: Vec<u8>) -> String {
send_command!(self.backend, |response| {
WebSocketCommand::SendPing {
connection_id,
payload,
response,
}
})
}
pub fn send_pong(&self, connection_id: String, payload: Vec<u8>) -> String {
send_command!(self.backend, |response| {
WebSocketCommand::SendPong {
connection_id,
payload,
response,
}
})
}
pub fn close_connection(&self, connection_id: String) -> String {
send_command!(self.backend, |response| {
WebSocketCommand::CloseConnection {
+43
View File
@@ -0,0 +1,43 @@
use crate::server::{OutgoingMessage, WebSocketServerState};
use common_server::MessageHandler;
use serde_json::json;
impl WebSocketServerState {
pub fn close_connection(&mut self, connection_id: &str) -> String {
let mut manager = self.manager.lock().unwrap();
if let Some(close_sent) = manager.get_mut(connection_id, |conn| {
conn.is_closed = true;
conn.outgoing_tx.send(OutgoingMessage::Close).is_ok()
}) {
self.log(&format!("WebSocket closed: {}", connection_id));
if close_sent {
json!({
"result": true,
"message": "WebSocket closed"
}).to_string()
} else {
MessageHandler::error_response("Failed to send close frame")
}
} else {
MessageHandler::error_response("WebSocket connection not found")
}
}
pub fn get_connections_list(&mut self) -> String {
let mut manager = self.manager.lock().unwrap();
let mut connections_list = Vec::new();
manager.iter_mut(|conn_id, conn_info| {
connections_list.push(json!({
"connectionId": conn_id,
"address": conn_info.addr,
"isActive": !conn_info.is_closed
}));
});
json!({
"result": true,
"connections": connections_list
}).to_string()
}
}
+24
View File
@@ -1,6 +1,9 @@
mod backend;
mod server;
mod wrapper;
mod read;
mod write;
mod connections;
use std::sync::Arc;
use common_core::*;
@@ -22,6 +25,9 @@ pub const METHODS: &[&[u16]] = &[
name!("ListConnections"), // 6
name!("RetrieveBinaryFromVault"), // 7
name!("GetLogs"), // 8
name!("SendText"), // 9
name!("SendPing"), // 10
name!("SendPong"), // 11
];
pub fn get_params_amount(num: usize) -> usize {
@@ -35,6 +41,9 @@ pub fn get_params_amount(num: usize) -> usize {
6 => 0, // ListConnections()
7 => 1, // RetrieveBinaryFromVault(vault_key)
8 => 1, // GetLogs(count)
9 => 2, // SendText(connection_id, text)
10 => 2, // SendPing(connection_id, payload)
11 => 2, // SendPong(connection_id, payload)
_ => 0,
}
}
@@ -85,6 +94,21 @@ pub fn cal_func(obj: &mut AddIn, num: usize, params: &mut [Variant]) -> Box<dyn
let count = params[0].get_i32().unwrap_or(0) as usize;
Box::new(obj.get_logs(count))
},
9 => {
let connection_id = params[0].get_string().unwrap_or_default();
let text = params[1].get_string().unwrap_or_default();
Box::new(obj.server.send_text(&connection_id, &text))
},
10 => {
let connection_id = params[0].get_string().unwrap_or_default();
let payload = params[1].get_blob().unwrap_or(&empty_array);
Box::new(obj.server.send_ping(&connection_id, payload.to_vec()))
},
11 => {
let connection_id = params[0].get_string().unwrap_or_default();
let payload = params[1].get_blob().unwrap_or(&empty_array);
Box::new(obj.server.send_pong(&connection_id, payload.to_vec()))
},
_ => Box::new(false),
}
}
+125
View File
@@ -0,0 +1,125 @@
use crate::server::{WebSocketConnection, WebSocketServerState};
use common_server::{AsyncWaiter, MessageHandler};
use serde_json::json;
impl WebSocketServerState {
pub async fn get_next_message(&mut self, timeout_ms: u64) -> String {
let waiter = AsyncWaiter::new(timeout_ms);
let message_handler = MessageHandler::new(self.vault.clone());
let result = waiter.wait_for(|| {
let mut manager = self.manager.lock().unwrap();
let all_ids = manager.get_ids_round_robin();
let mut to_remove = Vec::new();
let mut found_message = None;
for conn_id in all_ids {
if let Some((message, address, is_active)) =
manager.get_mut(&conn_id, Self::poll_connection_message)
{
if let Some(msg) = message {
manager.set_last_processed(Some(conn_id.clone()));
found_message = Some((conn_id, Some(msg), address, is_active));
break;
}
if !is_active {
to_remove.push(conn_id.clone());
}
}
}
for conn_id in to_remove {
manager.remove(&conn_id);
}
if found_message.is_some() {
return found_message;
}
None
}).await;
match result {
Ok((connection_id, Some(message), address, is_active)) => {
self.log(&format!("Received {} bytes from WebSocket {}", message.len(), connection_id));
match message_handler.store_message(message) {
Ok(vault_key) => {
json!({
"result": true,
"connectionId": connection_id,
"address": address,
"message": vault_key,
"isActive": is_active,
}).to_string()
}
Err(e) => MessageHandler::error_response(&e),
}
}
Ok((_, None, _, _)) => MessageHandler::timeout_response(),
Err(()) => MessageHandler::timeout_response(),
}
}
pub async fn get_message(&mut self, connection_id: &str, timeout_ms: u64) -> String {
let waiter = AsyncWaiter::new(timeout_ms);
let conn_id = connection_id.to_string();
let result = waiter.wait_for(|| {
let mut manager = self.manager.lock().unwrap();
if let Some((message, address, is_active)) =
manager.get_mut(&conn_id, Self::poll_connection_message)
{
if !is_active && message.is_none() {
manager.remove(&conn_id);
}
if let Some(msg) = message {
return Some((Some(msg), address, is_active));
}
if !is_active {
return Some((None, address, false));
}
}
None
}).await;
match result {
Ok((Some(message), address, is_active)) => {
self.log(&format!("Received {} bytes from WebSocket {}", message.len(), connection_id));
let message_handler = MessageHandler::new(self.vault.clone());
match message_handler.store_message(message) {
Ok(vault_key) => {
json!({
"result": true,
"connectionId": connection_id,
"address": address,
"isActive": is_active,
"message": vault_key
}).to_string()
}
Err(e) => MessageHandler::error_response(&e),
}
}
Ok((None, address, _)) => {
json!({
"result": true,
"connectionId": connection_id,
"address": address,
"isActive": false
}).to_string()
}
Err(()) => MessageHandler::timeout_response(),
}
}
fn poll_connection_message(conn: &mut WebSocketConnection) -> (Option<Vec<u8>>, String, bool) {
let is_active = !conn.is_closed;
let address = conn.addr.clone();
match conn.incoming_rx.try_recv() {
Ok(msg) => (Some(msg), address, is_active),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (None, address, is_active),
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => (None, address, false),
}
}
}
+43 -196
View File
@@ -11,9 +11,8 @@ use tokio::sync::mpsc;
use futures_util::{StreamExt, SinkExt};
use common_binary::vault::BinaryVault;
use common_logs::{Logger, log};
use common_server::{ConnectionManager, MessageHandler, AsyncWaiter};
use common_server::ConnectionManager;
use serde::{Deserialize, Serialize};
use serde_json::json;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct WebSocketServerConfig {
@@ -42,20 +41,28 @@ impl Default for WebSocketServerConfig {
}
pub struct WebSocketServerState {
vault: BinaryVault,
logger: Option<Arc<Logger>>,
manager: Arc<Mutex<ConnectionManager<WebSocketConnection>>>,
pub(crate) vault: BinaryVault,
pub(crate) logger: Option<Arc<Logger>>,
pub(crate) manager: Arc<Mutex<ConnectionManager<WebSocketConnection>>>,
}
pub struct WebSocketConnection {
addr: String,
outgoing_tx: mpsc::UnboundedSender<Vec<u8>>,
incoming_rx: mpsc::UnboundedReceiver<Vec<u8>>,
is_closed: bool,
pub(crate) addr: String,
pub(crate) outgoing_tx: mpsc::UnboundedSender<OutgoingMessage>,
pub(crate) incoming_rx: mpsc::UnboundedReceiver<Vec<u8>>,
pub(crate) is_closed: bool,
}
pub(crate) enum OutgoingMessage {
Binary(Vec<u8>),
Text(String),
Ping(Vec<u8>),
Pong(Vec<u8>),
Close,
}
impl WebSocketServerState {
fn log(&self, message: &str) {
pub(crate) fn log(&self, message: &str) {
if let Some(ref logger) = self.logger {
log!(logger, "{}", message);
}
@@ -67,7 +74,7 @@ impl WebSocketServerState {
vault: BinaryVault,
logger: Option<Arc<Logger>>,
) -> Result<Self, String> {
// Parse configuration
let config = if config_json.is_empty() {
WebSocketServerConfig::default()
} else {
@@ -86,7 +93,6 @@ impl WebSocketServerState {
manager: manager.clone(),
}));
// Create router with configured routes
let mut app = Router::new();
for route in &config.routes {
if let Some(ref log) = logger {
@@ -118,188 +124,6 @@ impl WebSocketServerState {
})
}
pub async fn get_next_message(&mut self, timeout_ms: u64) -> String {
let waiter = AsyncWaiter::new(timeout_ms);
let message_handler = MessageHandler::new(self.vault.clone());
let result = waiter.wait_for(|| {
let mut manager = self.manager.lock().unwrap();
let all_ids = manager.get_ids_round_robin();
let mut to_remove = Vec::new();
let mut found_message = None;
for conn_id in all_ids {
if let Some(next_state) = manager.get_mut(&conn_id, |conn| {
let is_active = !conn.is_closed;
let address = conn.addr.clone();
match conn.incoming_rx.try_recv() {
Ok(msg) => Some((Some(msg), address, is_active)),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Some((None, address, is_active)),
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => Some((None, address, false)),
}
}) {
if let Some((message, address, is_active)) = next_state {
if let Some(msg) = message {
manager.set_last_processed(Some(conn_id.clone()));
found_message = Some((conn_id, Some(msg), address, is_active));
break;
}
if !is_active {
to_remove.push(conn_id.clone());
}
}
}
}
for conn_id in to_remove {
manager.remove(&conn_id);
}
if found_message.is_some() {
return found_message;
}
None
}).await;
match result {
Ok((connection_id, Some(message), address, is_active)) => {
self.log(&format!("Received {} bytes from WebSocket {}", message.len(), connection_id));
match message_handler.store_message(message) {
Ok(vault_key) => {
json!({
"result": true,
"connectionId": connection_id,
"address": address,
"message": vault_key,
"isActive": is_active,
"isNewConnection": false
}).to_string()
}
Err(e) => MessageHandler::error_response(&e),
}
}
Ok((_, None, _, _)) => MessageHandler::timeout_response(),
Err(()) => MessageHandler::timeout_response(),
}
}
pub async fn get_message(&mut self, connection_id: &str, timeout_ms: u64) -> String {
let waiter = AsyncWaiter::new(timeout_ms);
let conn_id = connection_id.to_string();
let result = waiter.wait_for(|| {
let mut manager = self.manager.lock().unwrap();
if let Some(next_state) = manager.get_mut(&conn_id, |conn| {
let is_active = !conn.is_closed;
let address = conn.addr.clone();
match conn.incoming_rx.try_recv() {
Ok(msg) => Some((Some(msg), address, is_active)),
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => Some((None, address, is_active)),
Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => Some((None, address, false)),
}
}) {
if let Some((message, address, is_active)) = next_state {
if !is_active && message.is_none() {
manager.remove(&conn_id);
}
if let Some(msg) = message {
return Some((Some(msg), address, is_active));
}
if !is_active {
return Some((None, address, false));
}
}
}
None
}).await;
match result {
Ok((Some(message), address, is_active)) => {
self.log(&format!("Received {} bytes from WebSocket {}", message.len(), connection_id));
let message_handler = MessageHandler::new(self.vault.clone());
match message_handler.store_message(message) {
Ok(vault_key) => {
json!({
"result": true,
"connectionId": connection_id,
"address": address,
"isActive": is_active,
"message": vault_key
}).to_string()
}
Err(e) => MessageHandler::error_response(&e),
}
}
Ok((None, address, _)) => {
json!({
"result": true,
"connectionId": connection_id,
"address": address,
"isActive": false
}).to_string()
}
Err(()) => MessageHandler::timeout_response(),
}
}
pub async fn send_message(&mut self, connection_id: &str, message: Vec<u8>) -> String {
self.log(&format!("Sending {} bytes to WebSocket {}", message.len(), connection_id));
let mut manager = self.manager.lock().unwrap();
if let Some(send_result) = manager.get_mut(connection_id, |conn| {
if conn.is_closed {
return false;
}
conn.outgoing_tx.send(message).is_ok()
}) {
if send_result {
json!({
"result": true,
"message": "Message sent"
}).to_string()
} else {
MessageHandler::error_response("WebSocket connection is closed")
}
} else {
MessageHandler::error_response("WebSocket connection not found")
}
}
pub fn close_connection(&mut self, connection_id: &str) -> String {
let mut manager = self.manager.lock().unwrap();
if let Some(()) = manager.get_mut(connection_id, |conn| {
conn.is_closed = true;
}) {
self.log(&format!("WebSocket closed: {}", connection_id));
json!({
"result": true,
"message": "WebSocket closed"
}).to_string()
} else {
MessageHandler::error_response("WebSocket connection not found")
}
}
pub fn get_connections_list(&mut self) -> String {
let mut manager = self.manager.lock().unwrap();
let mut connections_list = Vec::new();
manager.iter_mut(|conn_id, conn_info| {
connections_list.push(json!({
"connectionId": conn_id,
"address": conn_info.addr,
"isActive": !conn_info.is_closed
}));
});
json!({
"result": true,
"connections": connections_list
}).to_string()
}
}
async fn ws_handler(
@@ -339,8 +163,31 @@ async fn handle_websocket(
let send_task = tokio::spawn(async move {
while let Some(msg) = outgoing_rx.recv().await {
if ws_sender.send(Message::Binary(msg.into())).await.is_err() {
break;
match msg {
OutgoingMessage::Binary(data) => {
if ws_sender.send(Message::Binary(data.into())).await.is_err() {
break;
}
}
OutgoingMessage::Text(text) => {
if ws_sender.send(Message::Text(text.into())).await.is_err() {
break;
}
}
OutgoingMessage::Ping(data) => {
if ws_sender.send(Message::Ping(data.into())).await.is_err() {
break;
}
}
OutgoingMessage::Pong(data) => {
if ws_sender.send(Message::Pong(data.into())).await.is_err() {
break;
}
}
OutgoingMessage::Close => {
let _ = ws_sender.send(Message::Close(None)).await;
break;
}
}
}
});
+33
View File
@@ -88,6 +88,39 @@ impl WebSocketServer {
}
}
pub fn send_text(&self, connection_id: &str, text: &str) -> String {
if !self.started {
return json_error("WebSocket server not started");
}
match self.backend.lock() {
Ok(backend) => backend.send_text(connection_id.to_string(), text.to_string()),
Err(e) => json_error(&format!("Failed to lock backend: {}", e)),
}
}
pub fn send_ping(&self, connection_id: &str, payload: Vec<u8>) -> String {
if !self.started {
return json_error("WebSocket server not started");
}
match self.backend.lock() {
Ok(backend) => backend.send_ping(connection_id.to_string(), payload),
Err(e) => json_error(&format!("Failed to lock backend: {}", e)),
}
}
pub fn send_pong(&self, connection_id: &str, payload: Vec<u8>) -> String {
if !self.started {
return json_error("WebSocket server not started");
}
match self.backend.lock() {
Ok(backend) => backend.send_pong(connection_id.to_string(), payload),
Err(e) => json_error(&format!("Failed to lock backend: {}", e)),
}
}
pub fn close_connection(&self, connection_id: &str) -> String {
if !self.started {
return json_error("WebSocket server not started");
+62
View File
@@ -0,0 +1,62 @@
use crate::server::{OutgoingMessage, WebSocketServerState};
use common_server::MessageHandler;
use serde_json::json;
impl WebSocketServerState {
pub async fn send_message(&mut self, connection_id: &str, message: Vec<u8>) -> String {
let length = message.len();
self.send_frame(
connection_id,
OutgoingMessage::Binary(message),
&format!("Sending {} bytes to WebSocket {}", length, connection_id),
)
}
pub async fn send_text(&mut self, connection_id: &str, text: String) -> String {
self.send_frame(
connection_id,
OutgoingMessage::Text(text),
&format!("Sending text to WebSocket {}", connection_id),
)
}
pub async fn send_ping(&mut self, connection_id: &str, payload: Vec<u8>) -> String {
self.send_frame(
connection_id,
OutgoingMessage::Ping(payload),
&format!("Sending ping to WebSocket {}", connection_id),
)
}
pub async fn send_pong(&mut self, connection_id: &str, payload: Vec<u8>) -> String {
self.send_frame(
connection_id,
OutgoingMessage::Pong(payload),
&format!("Sending pong to WebSocket {}", connection_id),
)
}
fn send_frame(&self, connection_id: &str, frame: OutgoingMessage, log_message: &str) -> String {
self.log(log_message);
let mut manager = self.manager.lock().unwrap();
if let Some(send_result) = manager.get_mut(connection_id, |conn| {
if conn.is_closed {
return false;
}
conn.outgoing_tx.send(frame).is_ok()
}) {
if send_result {
json!({
"result": true,
"message": "Message sent"
}).to_string()
} else {
MessageHandler::error_response("WebSocket connection is closed")
}
} else {
MessageHandler::error_response("WebSocket connection not found")
}
}
}