1
0
mirror of https://github.com/ManyakRus/starter.git synced 2025-11-24 22:53:52 +02:00
Files
starter/tinkoff_connect/tinkoff_connect.go
Nikitin Aleksandr 5eaa9fd103 сделал Abs()
2025-01-16 14:28:59 +03:00

238 lines
5.6 KiB
Go

package tinkoff_connect
import (
"context"
"errors"
"github.com/ManyakRus/starter/contextmain"
"github.com/ManyakRus/starter/log"
"github.com/ManyakRus/starter/microl"
"github.com/ManyakRus/starter/port_checker"
"github.com/ManyakRus/starter/stopapp"
"github.com/tinkoff/invest-api-go-sdk/investgo"
"os"
"sync"
"time"
)
// SettingsINI - тип для хранения настроек подключени
type SettingsINI struct {
investgo.Config
Host string
Port string
}
// Settings - структура для хранения настроек подключения
var Settings SettingsINI
// Conn - подключение к серверу GRPC
var Client *investgo.Client
// mutex_Connect - защита от многопоточности Reconnect()
var mutex_Connect = &sync.Mutex{}
// NeedReconnect - флаг необходимости переподключения
var NeedReconnect bool
// timeoutSeconds - время ожидания запроса в Тинькофф, в секундах
var timeoutSeconds = 60
// Connect - подключается к серверу GRPC, при ошибке вызывает панику
func Connect() {
var err error
err = Connect_err()
if err != nil {
log.Panicf("GRPC Connect() error: %v", err)
} else {
addr := Settings.EndPoint
log.Info("GRPC client connected. Address: ", addr)
}
}
// Connect_err - подключается к серверу GRPC, возвращает ошибку
func Connect_err() error {
var err error
//
mutex_Connect.Lock()
defer mutex_Connect.Unlock()
//
if Settings.EndPoint == "" {
err = FillSettings()
if err != nil {
return err
}
}
ctx := contextmain.GetContext()
//addr := Settings.Host + ":" + Settings.Port
Config := Settings.Config
Client, err = investgo.NewClient(ctx, Config, log.GetLog())
if err != nil {
return err
}
return err
}
func FillSettings() error {
var err error
Settings = SettingsINI{}
INVEST_HOST := os.Getenv("INVEST_HOST")
INVEST_PORT := os.Getenv("INVEST_PORT")
Settings.Host = INVEST_HOST
Settings.Port = INVEST_PORT
EndPoint := INVEST_HOST + ":" + INVEST_PORT
Settings.EndPoint = EndPoint
if INVEST_HOST == "" {
TextError := "Need fill INVEST_HOST ! in OS Environment "
err = errors.New(TextError)
return err
}
if INVEST_PORT == "" {
TextError := "Need fill INVEST_PORT ! in OS Environment "
err = errors.New(TextError)
return err
}
Name := ""
s := ""
//
Name = "INVEST_TOKEN"
s = microl.Getenv(Name, true)
Settings.Token = s
//
Name = "INVEST_ACCOUNTID"
s = microl.Getenv(Name, false)
Settings.AccountId = s
return err
}
// WaitStop - ожидает отмену глобального контекста
func WaitStop() {
select {
case <-contextmain.GetContext().Done():
log.Warn("Context app is canceled. grpc_connect")
}
// ждём пока отправляемых сейчас сообщений будет =0
stopapp.WaitTotalMessagesSendingNow("tinkoff_connect")
// закрываем соединение
CloseConnection()
stopapp.GetWaitGroup_Main().Done()
}
// Start - необходимые процедуры для запуска сервера GRPC
// если контекст хранится в contextmain.GetContext()
// и есть stopapp.GetWaitGroup_Main()
// при ошибке вызывает панику
func Start() {
Connect()
stopapp.GetWaitGroup_Main().Add(1)
go WaitStop()
stopapp.GetWaitGroup_Main().Add(1)
go ping_go()
}
// Start_ctx - необходимые процедуры для запуска сервера GRPC
// ctx - глобальный контекст приложения
// wg - глобальный WaitGroup приложения
func Start_ctx(ctx *context.Context, wg *sync.WaitGroup) error {
var err error
contextmain.Ctx = ctx
stopapp.SetWaitGroup_Main(wg)
err = Connect_err()
if err != nil {
return err
}
stopapp.GetWaitGroup_Main().Add(1)
go WaitStop()
stopapp.GetWaitGroup_Main().Add(1)
go ping_go()
return err
}
// CloseConnection - закрывает подключение к GRPC, и пишет лог
func CloseConnection() {
err := CloseConnection_err()
if err != nil {
log.Error("GRPC client CloseConnection() error: ", err)
} else {
log.Info("GRPC client connection closed")
}
}
// CloseConnection - закрывает подключение к GRPC, и возвращает ошибку
func CloseConnection_err() error {
err := Client.Stop()
return err
}
// ping_go - делает пинг каждые 60 секунд, и реконнект
func ping_go() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
addr := Settings.Host + ":" + Settings.Port
//бесконечный цикл
loop:
for {
select {
case <-contextmain.GetContext().Done():
log.Warn("Context app is canceled. grpc_client.ping")
break loop
case <-ticker.C:
err := port_checker.CheckPort_err(Settings.Host, Settings.Port)
//log.Debug("ticker, ping err: ", err) //удалить
if err != nil {
NeedReconnect = true
log.Warn("grpc_client CheckPort(", addr, ") error: ", err)
} else if NeedReconnect == true {
log.Warn("grpc_client CheckPort(", addr, ") OK. Start Reconnect()")
NeedReconnect = false
err = Connect_err()
if err != nil {
NeedReconnect = true
log.Error("grpc_client Connect() error: ", err)
}
}
}
}
stopapp.GetWaitGroup_Main().Done()
}
// GetTimeoutSeconds - возвращает время ожидания ответа
func GetTimeoutSeconds() int {
Otvet := timeoutSeconds
return Otvet
}
// SetTimeoutSeconds - устанавливает время ожидания ответа
func SetTimeoutSeconds(seconds int) {
timeoutSeconds = seconds
}