diff --git a/src/addins/mongodb/src/component/backend_core.rs b/src/addins/mongodb/src/component/backend_core.rs index c259c0307d..52e22321dc 100644 --- a/src/addins/mongodb/src/component/backend_core.rs +++ b/src/addins/mongodb/src/component/backend_core.rs @@ -1,13 +1,25 @@ use serde_json::Value; use std::sync::mpsc::{self, Sender}; use std::thread::{self, JoinHandle}; +use mongodb::bson::{Bson, Document}; use mongodb::Client; +use serde::{Deserialize, Serialize}; +use crate::component::bson::{bson_to_json_value, json_value_to_bson}; pub struct MongoBackend { pub(crate) tx: Sender, thread_handle: Option>, } +#[derive(Serialize, Deserialize)] +pub struct ExecuteParams { + pub operation: String, + pub database: Option, + pub collection: Option, + pub data: Option, + pub fields: Option, +} + pub enum BackendCommand { Connect { connection_string: String, @@ -15,11 +27,7 @@ pub enum BackendCommand { }, Shutdown, Execute { - operation: String, - database: Option, - collection: Option, - data: Option, - fields: Option, + params: ExecuteParams, response: Sender, }, Disconnect { @@ -51,20 +59,18 @@ impl MongoBackend { } } } - BackendCommand::Execute { - operation, - database, - collection, - data, - fields, - response, - } => { + BackendCommand::Execute { params, response } => { if client.is_none() { let _ = response.send("Not connected to MongoDB".to_string()); continue; } - let _ = response.send(format!("Executing operation: {}", operation)); + let result = rt.block_on(execute_operation( + client.as_ref().unwrap(), + ¶ms + )); + + let _ = response.send(result.unwrap_or_else(|e| e)); }, BackendCommand::Disconnect { response } => { client = None; @@ -81,10 +87,6 @@ impl MongoBackend { thread_handle: Some(thread_handle), } } - - pub fn send_command(&self, cmd: BackendCommand) { - let _ = self.tx.send(cmd); - } } async fn handle_connect(connection_string: &str) -> Result { @@ -100,6 +102,46 @@ async fn handle_connect(connection_string: &str) -> Result { Ok(client) } +async fn execute_operation( + client: &Client, + params: &ExecuteParams, +) -> Result { + + let db_name = params.database.as_deref().unwrap_or("admin"); + let db = client.database(db_name); + + let mut command = Document::new(); + + if let Some(coll) = ¶ms.collection { + command.insert(¶ms.operation, coll); + } else { + command.insert(¶ms.operation, 1); + } + + if let Some(Value::Object(data_map)) = ¶ms.data { + for (key, value) in data_map { + if key != ¶ms.operation { + command.insert(key, json_value_to_bson(value)); + } + } + } + + if let Some(Value::Object(fields_map)) = ¶ms.fields { + for (key, value) in fields_map { + command.insert(key, json_value_to_bson(value)); + } + } + + let result_doc = db + .run_command(command) + .await + .map_err(|e| format!("MongoDB command '{}' failed: {}", ¶ms.operation, e))?; + + let json_result = bson_to_json_value(&Bson::Document(result_doc)); + serde_json::to_string(&json_result) + .map_err(|e| format!("Failed to serialize command result: {}", e)) +} + impl Drop for MongoBackend { fn drop(&mut self) { let _ = self.tx.send(BackendCommand::Shutdown); diff --git a/src/addins/mongodb/src/component/backend_methods.rs b/src/addins/mongodb/src/component/backend_methods.rs index 30d5c08c05..743bc16fe8 100644 --- a/src/addins/mongodb/src/component/backend_methods.rs +++ b/src/addins/mongodb/src/component/backend_methods.rs @@ -1,5 +1,5 @@ use std::sync::mpsc; -use crate::component::backend_core::{BackendCommand, MongoBackend}; +use crate::component::backend_core::{BackendCommand, ExecuteParams, MongoBackend}; impl MongoBackend { pub fn connect(&self, connection_string: &str) -> Result<(), String> { @@ -30,4 +30,22 @@ impl MongoBackend { .map_err(|e| format!("Failed to receive disconnect response: {}", e))?; Ok(()) } + + pub fn execute(&self, json_input: &str) -> Result { + + let params: ExecuteParams = serde_json::from_str(json_input) + .map_err(|e| format!("Invalid JSON: {}", e))?; + + let (response_tx, response_rx) = mpsc::channel(); + self.tx + .send(BackendCommand::Execute { + params, + response: response_tx, + }) + .map_err(|e| format!("Failed to send command: {}", e))?; + + response_rx + .recv() + .map_err(|e| format!("No response from backend: {}", e)) + } } \ No newline at end of file diff --git a/src/addins/mongodb/src/component/bson.rs b/src/addins/mongodb/src/component/bson.rs index 1626c2e9f7..5cdc545758 100644 --- a/src/addins/mongodb/src/component/bson.rs +++ b/src/addins/mongodb/src/component/bson.rs @@ -1,9 +1,9 @@ use serde_json::{Number, Value}; use mongodb::bson::{Bson, Document}; use mongodb::bson; -use base64::{Engine as _, engine::{self, general_purpose}, alphabet}; +use base64::{Engine as _, engine::{general_purpose}}; -fn json_value_to_bson(value: &Value) -> Bson { +pub fn json_value_to_bson(value: &Value) -> Bson { match value { Value::String(s) => Bson::String(s.clone()), @@ -52,7 +52,7 @@ fn json_value_to_bson(value: &Value) -> Bson { } } -fn bson_to_json_value(bson: &Bson) -> Value { +pub fn bson_to_json_value(bson: &Bson) -> Value { match bson { Bson::String(s) => Value::String(s.clone()), Bson::Int32(n) => Value::Number((*n).into()), diff --git a/src/addins/mongodb/src/component/mod.rs b/src/addins/mongodb/src/component/mod.rs index ffb1aae5a0..e5dea5e3e9 100644 --- a/src/addins/mongodb/src/component/mod.rs +++ b/src/addins/mongodb/src/component/mod.rs @@ -26,6 +26,10 @@ pub fn cal_func(obj: &mut AddIn, num: usize, params: &mut [Variant]) -> Box Box::new(obj.connect()), 1 => Box::new(obj.disconnect()), + 2 => { + let json_string = params[0].get_string().unwrap_or("".to_string()); + Box::new(obj.execute(&json_string)) + } _ => Box::new(false), } } @@ -94,6 +98,23 @@ impl AddIn { } } + pub fn execute(&mut self, json_string: &str) -> String { + + if !self.initialized { + return format_json_error("Connection already closed!"); + } + + let guard = match self.backend.lock(){ + Ok(lock) => lock, + Err(e) => return format_json_error(&e.to_string()) + }; + + match guard.execute(json_string){ + Ok(result) => json!({"result": true, "data": result}).to_string(), + Err(e) => format_json_error(&e.to_string()) + } + } + pub fn get_field_ptr(&self, index: usize) -> *const dyn getset::ValueType { match index { 0 => &self.connection_string as &dyn getset::ValueType as *const _,