1
0
mirror of https://github.com/Bayselonarrend/OpenIntegrations.git synced 2026-05-16 09:38:28 +02:00

ZMQ: Методы коннетка в 1С

This commit is contained in:
Anton Titovets
2026-05-05 22:27:03 +03:00
parent 2935904e42
commit f2a5b78dad
10 changed files with 172 additions and 34 deletions
+6 -6
View File
@@ -1,10 +1,10 @@
"MAIN ---"
linux-vdso.so.1 (0x00007ffd629a0000)
libm.so.6 => /lib64/libm.so.6 (0x000072bb69000000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x000072bb68c00000)
libc.so.6 => /lib64/libc.so.6 (0x000072bb68800000)
libdl.so.2 => /lib64/libdl.so.2 (0x000072bb68400000)
/lib64/ld-linux-x86-64.so.2 (0x000072bb69400000)
linux-vdso.so.1 (0x00007fff775d3000)
libm.so.6 => /lib64/libm.so.6 (0x00007442c0800000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x00007442c0400000)
libc.so.6 => /lib64/libc.so.6 (0x00007442c0000000)
libdl.so.2 => /lib64/libdl.so.2 (0x00007442bfc00000)
/lib64/ld-linux-x86-64.so.2 (0x00007442c0c00000)
GLIBC_2.2.5
GLIBC_2.12
GLIBC_2.3
+47 -7
View File
@@ -32,6 +32,7 @@ pub enum BackendCommand {
},
Send {
payload: Vec<u8>,
dontwait: bool,
response: Sender<Result<(), String>>,
},
Recv {
@@ -122,6 +123,22 @@ async fn recv_with_timeout<S: SocketRecv + Unpin>(
}
}
async fn send_with_dontwait<S: SocketSend + Unpin>(
sock: &mut S,
msg: ZmqMessage,
dontwait: bool,
) -> Result<(), String> {
if dontwait {
match timeout(Duration::from_millis(0), sock.send(msg)).await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(e.to_string()),
Err(_) => Err("Send would block (DONTWAIT).".to_owned()),
}
} else {
sock.send(msg).await.map_err(|e| e.to_string())
}
}
impl ZeroMqBackend {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel();
@@ -229,14 +246,36 @@ impl ZeroMqBackend {
});
let _ = response.send(res);
}
BackendCommand::Send { payload, response } => {
BackendCommand::Send { payload, dontwait, response } => {
let res = rt.block_on(async {
let msg = ZmqMessage::from(payload);
match &mut state.socket {
OpenSocket::Req(sock) => sock.send(msg).await.map_err(|e| e.to_string()),
OpenSocket::Rep(sock) => sock.send(msg).await.map_err(|e| e.to_string()),
OpenSocket::Pub(sock) => sock.send(msg).await.map_err(|e| e.to_string()),
OpenSocket::Push(sock) => sock.send(msg).await.map_err(|e| e.to_string()),
OpenSocket::Req(sock) => {
send_with_dontwait(
sock,
ZmqMessage::from(payload.clone()),
dontwait,
)
.await
}
OpenSocket::Rep(sock) => {
send_with_dontwait(
sock,
ZmqMessage::from(payload.clone()),
dontwait,
)
.await
}
OpenSocket::Pub(sock) => {
send_with_dontwait(
sock,
ZmqMessage::from(payload.clone()),
dontwait,
)
.await
}
OpenSocket::Push(sock) => {
send_with_dontwait(sock, ZmqMessage::from(payload), dontwait).await
}
OpenSocket::Sub(_) => Err(
"Send is not supported on SUB sockets.".to_owned(),
),
@@ -338,11 +377,12 @@ impl ZeroMqBackend {
.map_err(|_| "Backend thread stopped unexpectedly.".to_string())?
}
pub fn send_payload(&self, payload: Vec<u8>) -> Result<(), String> {
pub fn send_payload(&self, payload: Vec<u8>, dontwait: bool) -> Result<(), String> {
let (response_tx, response_rx) = mpsc::channel();
self.tx
.send(BackendCommand::Send {
payload,
dontwait,
response: response_tx,
})
.map_err(|e| format!("Failed to enqueue Send: {}", e))?;
+3 -3
View File
@@ -28,7 +28,7 @@ pub fn get_params_amount(num: usize) -> usize {
match num {
0..=7 => 1, // Connect*/Bind*(endpoint)
8 => 1, // Subscribe(prefix)
9 => 2, // Send(data, flags)
9 => 2, // Send(data, dontwait)
10 => 1, // Recv(timeout_ms)
11 => 0, // Close
12 => 1, // RetrieveBinaryFromVault
@@ -82,8 +82,8 @@ pub fn cal_func(
}
9 => {
let data = params[0].get_blob().unwrap_or(&empty).to_vec();
let flags = params[1].get_i32().unwrap_or(0);
Box::new(obj.send(data, flags))
let dontwait = params[1].get_bool().unwrap_or(false);
Box::new(obj.send(data, dontwait))
}
10 => {
let timeout_ms = params[0].get_i32().unwrap_or(0);
+5 -2
View File
@@ -29,8 +29,11 @@ impl AddIn {
}
}
pub fn send(&mut self, data: Vec<u8>, _flags: i32) -> String {
match self.lock_backend().and_then(|g| g.send_payload(data)) {
pub fn send(&mut self, data: Vec<u8>, dontwait: bool) -> String {
match self
.lock_backend()
.and_then(|g| g.send_payload(data, dontwait))
{
Ok(()) => json_success(),
Err(e) => json_error(&e),
}
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:3447ab3c47871ac1ad93831b3257057f90fab964a8cbfe83517b31c5cab7b9bf
size 2561771
oid sha256:6a2220074a050783de7496f4b38b03eb023cb923c5c9565c48cac48946350b08
size 2567261
+2 -2
View File
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:3447ab3c47871ac1ad93831b3257057f90fab964a8cbfe83517b31c5cab7b9bf
size 2561771
oid sha256:6a2220074a050783de7496f4b38b03eb023cb923c5c9565c48cac48946350b08
size 2567261
@@ -105,7 +105,7 @@
// Логирование - Структура Из КлючИЗначение - Настройки логирования. См. ПолучитьНастройкиЛогирования - log
//
// Возвращаемое значение:
// Соответствие Из КлючИЗначение, Произвольный - Возвращает объект WebSocket клиента при успешном подключении или информацию об ошибке
// Соответствие Из КлючИЗначение, Произвольный - объект клиента или соответствие с информацией об ошибке
Функция ОткрытьСоединение(Знач Адрес
, Знач Tls = Неопределено
, Знач Прокси = Неопределено
@@ -53,20 +53,64 @@
#Область МетодыПодключения
// Создать соединение (REQ)
// Создает соединение для отправки запроса
//
// Параметры:
// Адрес - Строка - Адрес приемника - addr
//
// Возвращаемое значение:
// Соответствие Из КлючИЗначение, Произвольный - объект компоненты или соответствие с информацией об ошибке
Функция СоздатьСоединениеReq(Знач Адрес) Экспорт
Результат = ИнициализироватьКоннектор(Адрес, "ConnectReq");
Возврат Результат;
КонецФункции
// Создать соединение (SUB)
// Создает соединение подписчика
//
// Параметры:
// Адрес - Строка - Адрес приемника - addr
//
// Возвращаемое значение:
// Соответствие Из КлючИЗначение, Произвольный - объект компоненты или соответствие с информацией об ошибке
Функция СоздатьСоединениеSub(Знач Адрес) Экспорт
Результат = ИнициализироватьКоннектор(Адрес, "ConnectSub");
Возврат Результат;
КонецФункции
// Создать соединение (PUSH)
// Создает соединение отправки в пайплайн
//
// Параметры:
// Адрес - Строка - Адрес приемника - addr
//
// Возвращаемое значение:
// Соответствие Из КлючИЗначение, Произвольный - объект компоненты или соответствие с информацией об ошибке
Функция СоздатьСоединениеPush(Знач Адрес) Экспорт
Результат = ИнициализироватьКоннектор(Адрес, "ConnectPush");
Возврат Результат;
КонецФункции
// Создать соединение (PULL)
// Создает соединение чтения пайплайна
//
// Параметры:
// Адрес - Строка - Адрес приемника - addr
//
// Возвращаемое значение:
// Соответствие Из КлючИЗначение, Произвольный - объект компоненты или соответствие с информацией об ошибке
Функция СоздатьСоединениеPull(Знач Адрес) Экспорт
Результат = ИнициализироватьКоннектор(Адрес, "ConnectPull");
Возврат Результат;
КонецФункции
#КонецОбласти
@@ -74,19 +118,31 @@
#Область МетодыПрослушивания
Функция ОткрытьПортRep(Знач Порт) Экспорт
Результат = ИнициализироватьКоннектор(Порт, "BindRep");
Возврат Результат;
КонецФункции
Функция ОткрытьПортPub(Знач Порт) Экспорт
Результат = ИнициализироватьКоннектор(Порт, "BindPub");
Возврат Результат;
КонецФункции
Функция ОткрытьПортPush(Знач Порт) Экспорт
Результат = ИнициализироватьКоннектор(Порт, "BindPush");
Возврат Результат;
КонецФункции
Функция ОткрытьПортPull(Знач Порт) Экспорт
Результат = ИнициализироватьКоннектор(Порт, "BindPull");
Возврат Результат;
КонецФункции
#КонецОбласти
@@ -100,3 +156,42 @@
#КонецОбласти
#КонецОбласти
#Область СлужебныеПроцедурыИФункции
Функция ИнициализироватьКоннектор(Знач АдресПорт, Знач Вид)
Если СтрНачинаетсяС(АдресПорт, "Connect") Тогда
OPI_ПреобразованиеТипов.ПолучитьСтроку(АдресПорт);
OPI_Инструменты.ВернутьУправляющиеПоследовательности(АдресПорт);
Иначе
OPI_ПреобразованиеТипов.ПолучитьЧисло(АдресПорт)
КонецЕсли;
ZMQ = OPI_Компоненты.ПолучитьКомпоненту("ZeroMQ");
Если Вид = "ConnectReq" Тогда
Результат = ZMQ.ConnectReq(АдресПорт);
ИначеЕсли Вид = "ConnectSub" Тогда
Результат = ZMQ.ConnectPush(АдресПорт);
ИначеЕсли Вид = "ConnectPush" Тогда
Результат = ZMQ.ConnectPush(АдресПорт);
ИначеЕсли Вид = "ConnectPull" Тогда
Результат = ZMQ.ConnectPull(АдресПорт);
ИначеЕсли Вид = "BindRep" Тогда
Результат = ZMQ.BindRep(АдресПорт);
ИначеЕсли Вид = "BindPub" Тогда
Результат = ZMQ.BindPub(АдресПорт);
ИначеЕсли Вид = "BindPush" Тогда
Результат = ZMQ.BindPush(АдресПорт);
ИначеЕсли Вид = "BindPull" Тогда
Результат = ZMQ.BindPull(АдресПорт);
КонецЕсли;
Результат = OPI_Инструменты.JsonВСтруктуру(Результат);
Возврат ?(Результат["result"], ZMQ, Результат);
КонецФункции
#КонецОбласти
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:3447ab3c47871ac1ad93831b3257057f90fab964a8cbfe83517b31c5cab7b9bf
size 2561771
oid sha256:6a2220074a050783de7496f4b38b03eb023cb923c5c9565c48cac48946350b08
size 2567261
+2 -2
View File
@@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:3447ab3c47871ac1ad93831b3257057f90fab964a8cbfe83517b31c5cab7b9bf
size 2561771
oid sha256:6a2220074a050783de7496f4b38b03eb023cb923c5c9565c48cac48946350b08
size 2567261