1
0
mirror of https://github.com/ManyakRus/starter.git synced 2025-02-15 15:10:29 +02:00
starter/nats_liveness/nats_liveness.go
2024-09-30 12:05:31 +03:00

278 lines
7.4 KiB
Go

package nats_liveness
import (
bytes "bytes"
"context"
"encoding/base64"
"encoding/json"
"github.com/ManyakRus/starter/contextmain"
"github.com/ManyakRus/starter/log"
"github.com/ManyakRus/starter/nats_connect"
"github.com/ManyakRus/starter/stopapp"
"github.com/klauspost/compress/zstd"
"gitlab.aescorp.ru/dsp_dev/claim/common/sync_exchange/sync_types"
"os"
"strings"
"sync"
"time"
)
//var bufOut1 *bytes.Buffer
var Settings SettingsINI
var ServiceStartAt time.Time
var Ticker *time.Ticker
type SettingsINI struct {
NATS_LIVENESS_TOPIC string
SERVICE_NAME string
SERVICE_NUMBER string
STAGE string
SERVICE_NAME_FULL string
}
type Message struct {
ServiceName string `json:"service_name"` // Имя сервиса
ServiceTime string `json:"service_time"` // Фактическое время сервиса
ServiceUptime string `json:"service_uptime"` // Аптайм сервиса
ServiceNum string `json:"service_num"` // Уникальный номер сервиса
KernelVers string `json:"kernel_version"` // Фактическая версия ядра
KernelType string `json:"kernel_type"` // Фактическая версия ядра
}
// Connect - подключается к серверу Nats-sync_exchange
func Connect() {
err := Connect_err()
LogInfo_Connected(err)
}
// LogInfo_Connected - выводит сообщение в Лог, или паника при ошибке
func LogInfo_Connected(err error) {
if err != nil {
log.Panicln("Can not connect NATS: ", nats_connect.Settings.NATS_HOST, ", error: ", err)
} else {
log.Info("NATS liveness connected. host: ", nats_connect.Settings.NATS_HOST, ":", nats_connect.Settings.NATS_PORT, ", topic: ", Settings.NATS_LIVENESS_TOPIC)
}
}
// Connect_err - подключается к серверу Nats-sync_exchange и возвращает ошибку
func Connect_err() error {
var err error
err = nats_connect.Connect_err()
nats_connect.FillSettings()
//sNATS_PORT := (nats_connect.Settings.NATS_PORT)
//url := "nats://" + nats_connect.Settings.NATS_HOST + ":" + sNATS_PORT
//err = sync_exchange.InitSyncExchange(url, Settings.SERVICE_NAME)
ServiceStartAt = time.Now()
return err
}
// CloseConnection - закрывает соединение с сервером Nats-sync_exchange
func CloseConnection() {
err := CloseConnection_err()
if err != nil {
log.Warn("Can not CloseConnection() NATS Liveness: ", nats_connect.Settings.NATS_HOST, " warning: ", err)
}
}
// CloseConnection - закрывает соединение с сервером Nats-sync_exchange, и возвращает ошибку
func CloseConnection_err() error {
if Ticker != nil {
Ticker.Stop()
}
//err := sync_exchange.DeInitSyncExchange()
nats_connect.CloseConnection()
return nil
}
// SendMessage - отправляет 1 сообщение в Nats-sync_exchange
func SendMessage() {
var err error
//obj := sync_types.SyncObject(b)
//pack := sync_types.MakeSyncObject(obj)
//sync_exchange.SendMessage(Settings.NATS_LIVENESS_TOPIC, pack)
now := time.Now().String()
duration := time.Since(ServiceStartAt)
sDuration := duration.String()
//SERVICE_NAME := Settings.SERVICE_NAME
//SERVICE_NAME = SERVICE_NAME + "_" + Settings.STAGE
//if micro.IsTestApp() == true {
//}
Message1 := Message{}
Message1.ServiceName = Settings.SERVICE_NAME_FULL
Message1.ServiceNum = Settings.SERVICE_NUMBER
Message1.KernelType = "nikitin"
Message1.KernelVers = ""
Message1.ServiceTime = now //time.Now().UTC().Format("2006-01-02 15:04:05.000")
Message1.ServiceUptime = sDuration
var sMessage1 string
bytes1, err := json.Marshal(Message1)
//sMessage0 := string(bytes1)
sMessage1 = base64.StdEncoding.EncodeToString(bytes1)
params := make(sync_types.SyncParams)
params["binData"] = sMessage1 //time.Now().UTC().Format("2006-01-02 15:04:05.000")
msg := sync_types.MakeSyncCommand("live", params)
raw_data, err := sync_types.SyncPackageToJSON(&msg)
if err != nil {
log.Error("SyncPackageToJSON() error: ", err)
return
}
// Пакуем сообщение
var enc *zstd.Encoder
var bufOut *bytes.Buffer
enc, _ = zstd.NewWriter(bufOut, zstd.WithEncoderLevel(3))
var msg_bin = make([]byte, 0)
msg_bin = enc.EncodeAll([]byte(raw_data), msg_bin)
err = nats_connect.SendMessage(Settings.NATS_LIVENESS_TOPIC, msg_bin)
if err != nil {
log.Error("SendMessage() error: ", err)
return
}
//err = sync_exchange.SendMessage(Settings.NATS_LIVENESS_TOPIC, msg)
//if err != nil {
// log.Error("SendMessage() error: ", err)
//}
}
func FillSettings(SERVICE_NAME string) {
Settings.SERVICE_NAME = strings.ToUpper(SERVICE_NAME)
Settings.STAGE = os.Getenv("STAGE")
SERVICE_NAME_FULL := Settings.SERVICE_NAME
if Settings.STAGE != "" {
SERVICE_NAME_FULL = SERVICE_NAME_FULL + "_" + Settings.STAGE
}
Settings.SERVICE_NAME_FULL = SERVICE_NAME_FULL
NATS_LIVENESS_TOPIC := "/claim/" + Settings.SERVICE_NAME_FULL + "/live/"
Settings.NATS_LIVENESS_TOPIC = NATS_LIVENESS_TOPIC
Settings.SERVICE_NUMBER = os.Getenv("SERVICE_NUMBER")
}
// CheckSettingsNATS - проверяет наличие переменных окружения
func CheckSettingsNATS() error {
var err error
NATS_HOST := os.Getenv("NATS_HOST")
NATS_PORT := os.Getenv("NATS_PORT")
if NATS_HOST == "" {
NATS_HOST = os.Getenv("BUS_LOCAL_HOST")
}
if NATS_PORT == "" {
NATS_PORT = os.Getenv("BUS_LOCAL_PORT")
}
if NATS_HOST == "" {
log.Error("Need fill BUS_LOCAL_HOST ! in os.ENV ")
return err
}
if NATS_PORT == "" {
log.Error("Need fill BUS_LOCAL_PORT ! in os.ENV ")
return err
}
return err
}
// Start - Старт работы NATS Liveness
func Start(ServiceName string) {
var err error
ctx := contextmain.GetContext()
WaitGroup := stopapp.GetWaitGroup_Main()
err = Start_ctx(&ctx, WaitGroup, ServiceName)
LogInfo_Connected(err)
}
// Start_ctx - необходимые процедуры для подключения к серверу NATS
// Свой контекст и WaitGroup нужны для остановки работы сервиса Graceful shutdown
// Для тех кто пользуется этим репозиторием для старта и останова сервиса можно просто Start()
func Start_ctx(ctx *context.Context, WaitGroup *sync.WaitGroup, ServiceName string) error {
var err error
//запомним к себе контекст
contextmain.Ctx = ctx
if ctx == nil {
contextmain.GetContext()
}
//запомним к себе WaitGroup
stopapp.SetWaitGroup_Main(WaitGroup)
if WaitGroup == nil {
stopapp.StartWaitStop()
}
//
err = CheckSettingsNATS()
if err != nil {
return err
}
//
FillSettings(ServiceName)
err = Connect_err()
if err != nil {
return err
}
stopapp.GetWaitGroup_Main().Add(1)
go WaitStop()
Ticker = time.NewTicker(5 * time.Second)
stopapp.GetWaitGroup_Main().Add(1)
go SendMessages_go()
return err
}
// WaitStop - ожидает отмену глобального контекста
func WaitStop() {
defer stopapp.GetWaitGroup_Main().Done()
select {
case <-contextmain.GetContext().Done():
log.Warn("Context app is canceled. NATS_Liveness.")
}
CloseConnection()
}
// SendMessages_go - Отправляет сообщения каждые 5 секунд
func SendMessages_go() {
defer stopapp.GetWaitGroup_Main().Done()
for {
select {
case <-contextmain.GetContext().Done():
return
case <-Ticker.C:
SendMessage()
}
}
}