1
0
mirror of https://github.com/Bayselonarrend/OpenIntegrations.git synced 2025-11-23 22:05:15 +02:00

Доработка компоненты MongoDB

This commit is contained in:
Anton Titovets
2025-11-02 10:59:35 +03:00
parent 9bbc3b6ec2
commit f5c5f55edd
4 changed files with 103 additions and 22 deletions

View File

@@ -1,13 +1,25 @@
use serde_json::Value;
use std::sync::mpsc::{self, Sender};
use std::thread::{self, JoinHandle};
use mongodb::bson::{Bson, Document};
use mongodb::Client;
use serde::{Deserialize, Serialize};
use crate::component::bson::{bson_to_json_value, json_value_to_bson};
pub struct MongoBackend {
pub(crate) tx: Sender<BackendCommand>,
thread_handle: Option<JoinHandle<()>>,
}
#[derive(Serialize, Deserialize)]
pub struct ExecuteParams {
pub operation: String,
pub database: Option<String>,
pub collection: Option<String>,
pub data: Option<Value>,
pub fields: Option<Value>,
}
pub enum BackendCommand {
Connect {
connection_string: String,
@@ -15,11 +27,7 @@ pub enum BackendCommand {
},
Shutdown,
Execute {
operation: String,
database: Option<String>,
collection: Option<String>,
data: Option<Value>,
fields: Option<Value>,
params: ExecuteParams,
response: Sender<String>,
},
Disconnect {
@@ -51,20 +59,18 @@ impl MongoBackend {
}
}
}
BackendCommand::Execute {
operation,
database,
collection,
data,
fields,
response,
} => {
BackendCommand::Execute { params, response } => {
if client.is_none() {
let _ = response.send("Not connected to MongoDB".to_string());
continue;
}
let _ = response.send(format!("Executing operation: {}", operation));
let result = rt.block_on(execute_operation(
client.as_ref().unwrap(),
&params
));
let _ = response.send(result.unwrap_or_else(|e| e));
},
BackendCommand::Disconnect { response } => {
client = None;
@@ -81,10 +87,6 @@ impl MongoBackend {
thread_handle: Some(thread_handle),
}
}
pub fn send_command(&self, cmd: BackendCommand) {
let _ = self.tx.send(cmd);
}
}
async fn handle_connect(connection_string: &str) -> Result<Client, String> {
@@ -100,6 +102,46 @@ async fn handle_connect(connection_string: &str) -> Result<Client, String> {
Ok(client)
}
async fn execute_operation(
client: &Client,
params: &ExecuteParams,
) -> Result<String, String> {
let db_name = params.database.as_deref().unwrap_or("admin");
let db = client.database(db_name);
let mut command = Document::new();
if let Some(coll) = &params.collection {
command.insert(&params.operation, coll);
} else {
command.insert(&params.operation, 1);
}
if let Some(Value::Object(data_map)) = &params.data {
for (key, value) in data_map {
if key != &params.operation {
command.insert(key, json_value_to_bson(value));
}
}
}
if let Some(Value::Object(fields_map)) = &params.fields {
for (key, value) in fields_map {
command.insert(key, json_value_to_bson(value));
}
}
let result_doc = db
.run_command(command)
.await
.map_err(|e| format!("MongoDB command '{}' failed: {}", &params.operation, e))?;
let json_result = bson_to_json_value(&Bson::Document(result_doc));
serde_json::to_string(&json_result)
.map_err(|e| format!("Failed to serialize command result: {}", e))
}
impl Drop for MongoBackend {
fn drop(&mut self) {
let _ = self.tx.send(BackendCommand::Shutdown);

View File

@@ -1,5 +1,5 @@
use std::sync::mpsc;
use crate::component::backend_core::{BackendCommand, MongoBackend};
use crate::component::backend_core::{BackendCommand, ExecuteParams, MongoBackend};
impl MongoBackend {
pub fn connect(&self, connection_string: &str) -> Result<(), String> {
@@ -30,4 +30,22 @@ impl MongoBackend {
.map_err(|e| format!("Failed to receive disconnect response: {}", e))?;
Ok(())
}
pub fn execute(&self, json_input: &str) -> Result<String, String> {
let params: ExecuteParams = serde_json::from_str(json_input)
.map_err(|e| format!("Invalid JSON: {}", e))?;
let (response_tx, response_rx) = mpsc::channel();
self.tx
.send(BackendCommand::Execute {
params,
response: response_tx,
})
.map_err(|e| format!("Failed to send command: {}", e))?;
response_rx
.recv()
.map_err(|e| format!("No response from backend: {}", e))
}
}

View File

@@ -1,9 +1,9 @@
use serde_json::{Number, Value};
use mongodb::bson::{Bson, Document};
use mongodb::bson;
use base64::{Engine as _, engine::{self, general_purpose}, alphabet};
use base64::{Engine as _, engine::{general_purpose}};
fn json_value_to_bson(value: &Value) -> Bson {
pub fn json_value_to_bson(value: &Value) -> Bson {
match value {
Value::String(s) => Bson::String(s.clone()),
@@ -52,7 +52,7 @@ fn json_value_to_bson(value: &Value) -> Bson {
}
}
fn bson_to_json_value(bson: &Bson) -> Value {
pub fn bson_to_json_value(bson: &Bson) -> Value {
match bson {
Bson::String(s) => Value::String(s.clone()),
Bson::Int32(n) => Value::Number((*n).into()),

View File

@@ -26,6 +26,10 @@ pub fn cal_func(obj: &mut AddIn, num: usize, params: &mut [Variant]) -> Box<dyn
match num {
0 => Box::new(obj.connect()),
1 => Box::new(obj.disconnect()),
2 => {
let json_string = params[0].get_string().unwrap_or("".to_string());
Box::new(obj.execute(&json_string))
}
_ => Box::new(false),
}
}
@@ -94,6 +98,23 @@ impl AddIn {
}
}
pub fn execute(&mut self, json_string: &str) -> String {
if !self.initialized {
return format_json_error("Connection already closed!");
}
let guard = match self.backend.lock(){
Ok(lock) => lock,
Err(e) => return format_json_error(&e.to_string())
};
match guard.execute(json_string){
Ok(result) => json!({"result": true, "data": result}).to_string(),
Err(e) => format_json_error(&e.to_string())
}
}
pub fn get_field_ptr(&self, index: usize) -> *const dyn getset::ValueType {
match index {
0 => &self.connection_string as &dyn getset::ValueType as *const _,