1
0
mirror of https://github.com/Bayselonarrend/OpenIntegrations.git synced 2026-05-16 09:38:28 +02:00

Доработка компоненты ZMQ

This commit is contained in:
Anton Titovets
2026-05-04 21:26:24 +03:00
parent 65002a12ec
commit eca91bf31b
12 changed files with 1396 additions and 97 deletions
+966
View File
File diff suppressed because it is too large Load Diff
+4 -1
View File
@@ -3,7 +3,7 @@ name = "opi_zeromq"
version = "0.1.0"
license = "MIT"
edition = "2021"
description = "Native add-in for ZeroMQ (scaffold)"
description = "1C native add-in: ZeroMQ over the pure-Rust `zeromq` crate (REQ/REP first)"
[lib]
crate-type = ["cdylib"]
@@ -17,7 +17,10 @@ opt-level = "z"
[dependencies]
common-core = { path = "../commons/common-core" }
common-binary = { path = "../commons/common-binary" }
common-utils = { path = "../commons/common-utils" }
serde_json = "1.0.140"
tokio = { version = "1.52.2", features = ["rt-multi-thread", "time"] }
zeromq = "0.6.0-pre.2"
+17
View File
@@ -0,0 +1,17 @@
"MAIN ---"
linux-vdso.so.1 (0x00007ffefc3a1000)
libm.so.6 => /lib64/libm.so.6 (0x00007c4627600000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x00007c4627200000)
libc.so.6 => /lib64/libc.so.6 (0x00007c4626e00000)
libdl.so.2 => /lib64/libdl.so.2 (0x00007c4626a00000)
/lib64/ld-linux-x86-64.so.2 (0x00007c4627a00000)
GLIBC_2.2.5
GLIBC_2.12
GLIBC_2.3
GLIBC_2.3.2
GLIBC_2.3.4
GLIBC_2.7
GLIBC_2.9
GLIBC_2.10
GLIBC_2.14
GLIBC_2.17
+286
View File
@@ -0,0 +1,286 @@
use std::sync::mpsc::{self, Sender};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use tokio::time::timeout;
use zeromq::prelude::*;
use zeromq::{ReqSocket, RepSocket, Socket, ZmqMessage};
#[derive(Debug)]
pub enum BackendCommand {
Connect {
endpoint: String,
response: Sender<Result<(), String>>,
},
Bind {
endpoint: String,
response: Sender<Result<String, String>>,
},
Subscribe {
response: Sender<Result<(), String>>,
},
Send {
payload: Vec<u8>,
response: Sender<Result<(), String>>,
},
Recv {
timeout_ms: i32,
response: Sender<Result<Vec<u8>, String>>,
},
Close {
response: Sender<Result<(), String>>,
},
Shutdown,
}
pub struct ZeroMqBackend {
pub(crate) tx: Sender<BackendCommand>,
thread_handle: Option<JoinHandle<()>>,
}
enum OpenSocket {
None,
Req(ReqSocket),
Rep(RepSocket),
}
struct BackendState {
socket: OpenSocket,
}
impl BackendState {
fn new() -> Self {
Self {
socket: OpenSocket::None,
}
}
fn is_busy(&self) -> bool {
matches!(self.socket, OpenSocket::Req(_) | OpenSocket::Rep(..))
}
fn close_async(&mut self, rt: &tokio::runtime::Runtime) {
match std::mem::replace(&mut self.socket, OpenSocket::None) {
OpenSocket::None => {}
OpenSocket::Req(s) => {
let _ = rt.block_on(s.close());
}
OpenSocket::Rep(s) => {
let _ = rt.block_on(s.close());
}
}
}
}
fn zmq_msg_to_concat_bytes(msg: ZmqMessage) -> Vec<u8> {
msg.into_vec()
.into_iter()
.flat_map(|frame| frame.to_vec())
.collect()
}
async fn recv_with_timeout<S: SocketRecv + Unpin>(
sock: &mut S,
timeout_ms: i32,
) -> Result<Vec<u8>, String> {
if timeout_ms <= 0 {
let msg = sock.recv().await.map_err(|e| e.to_string())?;
Ok(zmq_msg_to_concat_bytes(msg))
} else {
match timeout(Duration::from_millis(timeout_ms as u64), sock.recv()).await {
Ok(Ok(msg)) => Ok(zmq_msg_to_concat_bytes(msg)),
Ok(Err(e)) => Err(e.to_string()),
Err(_) => Err("Receive timed out.".to_owned()),
}
}
}
impl ZeroMqBackend {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel();
let thread_handle = thread::Builder::new()
.name("opi_zeromq_backend".to_string())
.spawn(move || {
let rt = tokio::runtime::Runtime::new().expect("opi_zeromq tokio Runtime");
let mut state = BackendState::new();
while let Ok(cmd) = rx.recv() {
match cmd {
BackendCommand::Connect { endpoint, response } => {
let res = rt.block_on(async {
if state.is_busy() {
return Err("Socket is already connected or bound.".to_owned());
}
let mut sock = ReqSocket::new();
sock.connect(&endpoint)
.await
.map_err(|e| e.to_string())?;
state.socket = OpenSocket::Req(sock);
Ok(())
});
let _ = response.send(res);
}
BackendCommand::Bind { endpoint, response } => {
let res = rt.block_on(async {
if state.is_busy() {
return Err("Socket is already connected or bound.".to_owned());
}
let mut sock = RepSocket::new();
let bound = sock
.bind(&endpoint)
.await
.map_err(|e| e.to_string())?;
let ep_str = bound.to_string();
state.socket = OpenSocket::Rep(sock);
Ok(ep_str)
});
let _ = response.send(res);
}
BackendCommand::Subscribe { response } => {
let _ = response.send(Err(
"Subscribe is only for SUB sockets (not REQ/REP).".to_owned(),
));
}
BackendCommand::Send { payload, response } => {
let res = rt.block_on(async {
let msg = ZmqMessage::from(payload);
match &mut state.socket {
OpenSocket::Req(sock) => {
sock.send(msg).await.map_err(|e| e.to_string())
}
OpenSocket::Rep(sock) => {
sock.send(msg).await.map_err(|e| e.to_string())
}
OpenSocket::None => Err("No open socket.".to_owned()),
}
});
let _ = response.send(res);
}
BackendCommand::Recv { timeout_ms, response } => {
let res = rt.block_on(async {
match &mut state.socket {
OpenSocket::Req(sock) => {
recv_with_timeout(sock, timeout_ms).await
}
OpenSocket::Rep(sock) => {
recv_with_timeout(sock, timeout_ms).await
}
OpenSocket::None => Err("No open socket.".to_owned()),
}
});
let _ = response.send(res);
}
BackendCommand::Close { response } => {
state.close_async(&rt);
let _ = response.send(Ok(()));
}
BackendCommand::Shutdown => {
state.close_async(&rt);
break;
}
}
}
})
.expect("opi_zeromq backend thread spawn");
Self {
tx,
thread_handle: Some(thread_handle),
}
}
pub fn connect(&self, endpoint: &str) -> Result<(), String> {
let (response_tx, response_rx) = mpsc::channel();
self.tx
.send(BackendCommand::Connect {
endpoint: endpoint.to_string(),
response: response_tx,
})
.map_err(|e| format!("Failed to enqueue Connect: {}", e))?;
response_rx
.recv()
.map_err(|_| "Backend thread stopped unexpectedly.".to_string())?
}
pub fn bind(&self, endpoint: &str) -> Result<String, String> {
let (response_tx, response_rx) = mpsc::channel();
self.tx
.send(BackendCommand::Bind {
endpoint: endpoint.to_string(),
response: response_tx,
})
.map_err(|e| format!("Failed to enqueue Bind: {}", e))?;
response_rx
.recv()
.map_err(|_| "Backend thread stopped unexpectedly.".to_string())?
}
pub fn subscribe_stub(&self) -> Result<(), String> {
let (response_tx, response_rx) = mpsc::channel();
self.tx
.send(BackendCommand::Subscribe {
response: response_tx,
})
.map_err(|e| format!("Failed to enqueue Subscribe: {}", e))?;
response_rx
.recv()
.map_err(|_| "Backend thread stopped unexpectedly.".to_string())?
}
pub fn send_payload(&self, payload: Vec<u8>) -> Result<(), String> {
let (response_tx, response_rx) = mpsc::channel();
self.tx
.send(BackendCommand::Send {
payload,
response: response_tx,
})
.map_err(|e| format!("Failed to enqueue Send: {}", e))?;
response_rx
.recv()
.map_err(|_| "Backend thread stopped unexpectedly.".to_string())?
}
pub fn recv_payload(&self, timeout_ms: i32) -> Result<Vec<u8>, String> {
let (response_tx, response_rx) = mpsc::channel();
self.tx
.send(BackendCommand::Recv {
timeout_ms,
response: response_tx,
})
.map_err(|e| format!("Failed to enqueue Recv: {}", e))?;
response_rx
.recv()
.map_err(|_| "Backend thread stopped unexpectedly.".to_string())?
}
pub fn close_socket(&self) -> Result<(), String> {
let (response_tx, response_rx) = mpsc::channel();
self.tx
.send(BackendCommand::Close {
response: response_tx,
})
.map_err(|e| format!("Failed to enqueue Close: {}", e))?;
response_rx
.recv()
.map_err(|_| "Backend thread stopped unexpectedly.".to_string())?
}
}
impl Drop for ZeroMqBackend {
fn drop(&mut self) {
let _ = self.tx.send(BackendCommand::Shutdown);
if let Some(handle) = self.thread_handle.take() {
if let Err(e) = handle.join() {
eprintln!("opi_zeromq backend thread panicked: {:?}", e);
}
}
}
}
+51 -68
View File
@@ -1,32 +1,31 @@
mod backend;
mod methods;
use common_binary::vault::BinaryVault;
use common_core::*;
use common_utils::utils::json_success;
use std::sync::{Arc, Mutex};
impl_addin_exports!(AddIn);
impl_raw_addin!(AddIn, METHODS, PROPS, get_params_amount, cal_func);
pub const METHODS: &[&[u16]] = &[
name!("SetSocketType"), // 0
name!("Connect"), // 1
name!("Bind"), // 2
name!("Subscribe"), // 3
name!("Send"), // 4
name!("Recv"), // 5
name!("Close"), // 6
name!("GetLastError"), // 7
name!("Connect"), // 0
name!("Bind"), // 1
name!("Subscribe"), // 2
name!("Send"), // 3
name!("Recv"), // 4
name!("Close"), // 5
name!("RetrieveBinaryFromVault"), // 6
];
pub fn get_params_amount(num: usize) -> usize {
match num {
0 => 1,
1 => 1,
2 => 1,
3 => 1,
4 => 2,
5 => 1,
6 => 0,
7 => 0,
0 | 1 => 1, // Connect(endpoint) / Bind(endpoint)
2 => 1, // Subscribe
3 => 2, // Send(data, flags)
4 => 1, // Recv
5 => 0, // Close
6 => 1, // RetrieveBinaryFromVault
_ => 0,
}
}
@@ -40,91 +39,75 @@ pub fn cal_func(
match num {
0 => {
let t = params[0].get_string().unwrap_or_default();
Box::new(obj.set_socket_type(&t))
let ep = params[0].get_string().unwrap_or_default();
Box::new(obj.connect(&ep))
}
1 => {
let ep = params[0].get_string().unwrap_or_default();
Box::new(obj.stub_connect(&ep))
Box::new(obj.bind(&ep))
}
2 => {
let ep = params[0].get_string().unwrap_or_default();
Box::new(obj.stub_bind(&ep))
let prefix = params[0].get_string().unwrap_or_default();
Box::new(obj.subscribe(&prefix))
}
3 => {
let prefix = params[0].get_string().unwrap_or_default();
Box::new(obj.stub_subscribe(&prefix))
}
4 => {
let data = params[0].get_blob().unwrap_or(&empty).to_vec();
let flags = params[1].get_i32().unwrap_or(0);
Box::new(obj.stub_send(data, flags))
Box::new(obj.send(data, flags))
}
5 => {
4 => {
let timeout_ms = params[0].get_i32().unwrap_or(0);
Box::new(obj.stub_recv(timeout_ms))
Box::new(obj.recv(timeout_ms))
}
5 => Box::new(obj.close()),
6 => {
let key = params[0].get_string().unwrap_or_default();
Box::new(obj.retrieve_binary_from_vault(&key))
}
6 => Box::new(obj.close()),
7 => Box::new(obj.last_error.clone()),
_ => Box::new(false),
}
}
pub const PROPS: &[&[u16]] = &[
name!("SocketType"),
name!("Endpoint"),
];
pub const PROPS: &[&[u16]] = &[];
pub struct AddIn {
pub socket_type: String,
pub endpoint: String,
pub last_error: String,
vault: BinaryVault,
backend: Arc<Mutex<backend::ZeroMqBackend>>,
}
impl AddIn {
pub fn new() -> Self {
Self {
socket_type: String::new(),
endpoint: String::new(),
last_error: String::new(),
vault: BinaryVault::new(),
backend: Arc::new(Mutex::new(backend::ZeroMqBackend::new())),
}
}
pub fn clear_error_on_success(&mut self) {
self.last_error.clear();
}
pub fn save_error_str(&mut self, msg: &str) {
self.last_error = common_utils::utils::json_error(msg);
}
pub fn set_socket_type(&mut self, socket_type: &str) -> String {
self.socket_type = socket_type.to_owned();
self.clear_error_on_success();
json_success()
}
pub fn close(&mut self) -> bool {
self.endpoint.clear();
self.clear_error_on_success();
true
}
pub fn get_field_ptr(&self, index: usize) -> *const dyn getset::ValueType {
match index {
0 => &self.socket_type as &dyn getset::ValueType as *const _,
1 => &self.endpoint as &dyn getset::ValueType as *const _,
_ => panic!("Index out of bounds"),
}
pub fn get_field_ptr(&self, _index: usize) -> *const dyn getset::ValueType {
panic!("This add-in exposes no exported properties.")
}
pub fn get_field_ptr_mut(&mut self, index: usize) -> *mut dyn getset::ValueType {
self.get_field_ptr(index) as *mut _
}
pub fn retrieve_binary_from_vault(&self, vault_key: &str) -> Vec<u8> {
self.vault
.retrieve(&vault_key.to_string())
.unwrap_or_else(|_| Vec::new())
}
pub(crate) fn lock_backend(&self) -> Result<std::sync::MutexGuard<'_, backend::ZeroMqBackend>, String> {
self.backend
.lock()
.map_err(|e| format!("{}", e))
}
}
impl Drop for AddIn {
fn drop(&mut self) {
let _ = self.close();
if let Ok(guard) = self.backend.lock() {
let _ = guard.close_socket();
}
}
}
+52 -27
View File
@@ -1,40 +1,65 @@
use common_binary::vault::BinaryInput;
use common_utils::utils::{json_error, json_success};
use serde_json::json;
use crate::AddIn;
impl AddIn {
pub fn stub_connect(&mut self, endpoint: &str) -> bool {
self.endpoint = endpoint.to_owned();
self.save_error_str(
"ZeroMQ scaffold: hook up `zmq` + libzmq; Connect is not implemented.",
);
false
pub fn connect(&mut self, endpoint: &str) -> String {
let ep = endpoint.trim().to_owned();
match self.lock_backend().and_then(|g| g.connect(&ep)) {
Ok(()) => json_success(),
Err(e) => json_error(&e),
}
}
pub fn stub_bind(&mut self, endpoint: &str) -> bool {
self.endpoint = endpoint.to_owned();
self.save_error_str(
"ZeroMQ scaffold: hook up `zmq` + libzmq; Bind is not implemented.",
);
false
pub fn bind(&mut self, endpoint: &str) -> String {
let ep = endpoint.trim().to_owned();
match self.lock_backend().and_then(|g| g.bind(&ep)) {
Ok(bound_display) => json!({"result": true, "endpoint": bound_display}).to_string(),
Err(e) => json_error(&e),
}
}
pub fn stub_subscribe(&mut self, _prefix: &str) -> String {
self.save_error_str(
"ZeroMQ scaffold: Subscribe is for SUB sockets after ZMQ wiring.",
);
self.last_error.clone()
pub fn subscribe(&mut self, _prefix: &str) -> String {
match self.lock_backend().and_then(|g| g.subscribe_stub()) {
Ok(()) => json_success(),
Err(e) => json_error(&e),
}
}
pub fn stub_send(&mut self, _data: Vec<u8>, _flags: i32) -> bool {
self.save_error_str(
"ZeroMQ scaffold: Send (multipart) after queue integration.",
);
false
pub fn send(&mut self, data: Vec<u8>, _flags: i32) -> String {
match self.lock_backend().and_then(|g| g.send_payload(data)) {
Ok(()) => json_success(),
Err(e) => json_error(&e),
}
}
pub fn stub_recv(&mut self, _timeout_ms: i32) -> Vec<u8> {
self.save_error_str(
"ZeroMQ scaffold: Recv will return message frames after integration.",
);
Vec::new()
pub fn recv(&mut self, timeout_ms: i32) -> String {
match self.lock_backend().and_then(|g| g.recv_payload(timeout_ms)) {
Ok(buf) => {
let len = buf.len();
match self.vault.store(BinaryInput::Bytes(buf)) {
Ok(key) => json!({
"result": true,
"type": "binary",
"data": key,
"size": len
})
.to_string(),
Err(e) => json_error(format!("Failed to store message in vault: {}", e)),
}
}
Err(e) => json_error(&e),
}
}
pub fn close(&mut self) -> String {
match self.lock_backend().and_then(|g| g.close_socket()) {
Ok(()) => json_success(),
Err(e) => json_error(&e),
}
}
}
+3
View File
@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:40530a15d6560408a0ceaafd755d4615d429f55ffcf98388f2f8c8d36fdc60a9
size 2202205
@@ -3,7 +3,7 @@
<name>OPI_WSServer</name>
<synonym>
<key>ru</key>
<value>Компонента Websocket-сервера (ОПИ)</value>
<value>Компонента ZeroMQ (ОПИ)</value>
</synonym>
<templateType>AddIn</templateType>
</mdclass:CommonTemplate>
@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<mdclass:CommonTemplate xmlns:mdclass="http://g5.1c.ru/v8/dt/metadata/mdclass" uuid="42832dcb-0fe8-4770-b54f-5a310b9220fc">
<name>OPI_ZeroMQ</name>
<synonym>
<key>ru</key>
<value>ZeroMQ</value>
</synonym>
<templateType>AddIn</templateType>
</mdclass:CommonTemplate>
@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:40530a15d6560408a0ceaafd755d4615d429f55ffcf98388f2f8c8d36fdc60a9
size 2202205
@@ -62,6 +62,7 @@
<commonTemplates>CommonTemplate.OPI_TCPServer</commonTemplates>
<commonTemplates>CommonTemplate.OPI_WSClient</commonTemplates>
<commonTemplates>CommonTemplate.OPI_WSServer</commonTemplates>
<commonTemplates>CommonTemplate.OPI_ZeroMQ</commonTemplates>
<commonTemplates>CommonTemplate.OPI_Text_ClickHouseProto</commonTemplates>
<commonTemplates>CommonTemplate.OPI_Text_MethodSettings</commonTemplates>
<commonModules>CommonModule.OPI_Airtable</commonModules>
+3
View File
@@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:40530a15d6560408a0ceaafd755d4615d429f55ffcf98388f2f8c8d36fdc60a9
size 2202205