mirror of
https://github.com/ManyakRus/starter.git
synced 2025-11-24 22:53:52 +02:00
сделал log
This commit is contained in:
343
postgres_stek/postgres_stek.go
Normal file
343
postgres_stek/postgres_stek.go
Normal file
@@ -0,0 +1,343 @@
|
||||
// модуль для работы с базой данных
|
||||
|
||||
package postgres_stek
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
model "gitlab.aescorp.ru/dsp_dev/claim/common/object_model"
|
||||
"github.com/manyakrus/starter/logger"
|
||||
"github.com/manyakrus/starter/ping"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
//"time"
|
||||
|
||||
//"github.com/jmoiron/sqlx"
|
||||
//_ "github.com/lib/pq"
|
||||
|
||||
"github.com/manyakrus/starter/contextmain"
|
||||
"github.com/manyakrus/starter/micro"
|
||||
"github.com/manyakrus/starter/stopapp"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/gorm"
|
||||
gormlogger "gorm.io/gorm/logger"
|
||||
"gorm.io/gorm/schema"
|
||||
)
|
||||
|
||||
//// Conn - соединение к базе данных
|
||||
//var Conn *gorm.DB
|
||||
|
||||
// Conn - все соединения к 10 базам данных
|
||||
var MapConn = make(map[int64]*gorm.DB)
|
||||
|
||||
// MapConnection - все объекты Connection
|
||||
var MapConnection = make(map[int64]model.Connection)
|
||||
|
||||
// log - глобальный логгер
|
||||
var log = logger.GetLog()
|
||||
|
||||
// mutexReconnect - защита от многопоточности Reconnect()
|
||||
var mutexReconnect = &sync.Mutex{}
|
||||
|
||||
// NeedReconnect - флаг необходимости переподключения
|
||||
var NeedReconnect bool
|
||||
|
||||
var MutexConnection sync.Mutex
|
||||
|
||||
// Connect_err - подключается к базе данных
|
||||
func Connect(Connection model.Connection) {
|
||||
|
||||
if Connection.Server == "" {
|
||||
log.Panicln("Need fill Connection.Server")
|
||||
}
|
||||
|
||||
ping.Ping(Connection.Server, Connection.Port)
|
||||
|
||||
err := Connect_err(Connection)
|
||||
if err != nil {
|
||||
log.Panicln("POSTGRES gorm stack Connect() to database host: ", Connection.Server, ", Error: ", err)
|
||||
} else {
|
||||
log.Info("POSTGRES gorm stack Connected. host: ", Connection.Server, ", base name: ", Connection.DbName, ", schema: ", Connection.DbScheme)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Connect_err - подключается к базе данных
|
||||
func Connect_err(Connection model.Connection) error {
|
||||
|
||||
var err error
|
||||
|
||||
if Connection.Server == "" {
|
||||
log.Panicln("Need fill Connection.Server")
|
||||
}
|
||||
|
||||
//ctxMain := context.Background()
|
||||
//ctxMain := contextmain.GetContext()
|
||||
//ctx, cancel := context.WithTimeout(ctxMain, 5*time.Second)
|
||||
//defer cancel()
|
||||
|
||||
// get the database connection URL.
|
||||
dsn := GetDSN(Connection)
|
||||
|
||||
//
|
||||
conf := &gorm.Config{}
|
||||
conn := postgres.Open(dsn)
|
||||
Conn, err := gorm.Open(conn, conf)
|
||||
Conn.Config.NamingStrategy = schema.NamingStrategy{TablePrefix: Connection.DbScheme + "."}
|
||||
Conn.Config.Logger = gormlogger.Default.LogMode(gormlogger.Warn)
|
||||
|
||||
if err == nil {
|
||||
DB, err := Conn.DB()
|
||||
if err != nil {
|
||||
log.Error("Conn.DB() error: ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = DB.Ping()
|
||||
}
|
||||
|
||||
MutexConnection.Lock() //race
|
||||
|
||||
MapConnection[Connection.ID] = Connection
|
||||
MapConn[Connection.ID] = Conn
|
||||
|
||||
MutexConnection.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// IsClosed проверка что база данных закрыта
|
||||
func IsClosed(Connection model.Connection) bool {
|
||||
var otvet bool
|
||||
Conn := MapConn[Connection.ID]
|
||||
if Conn == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
DB, err := Conn.DB()
|
||||
if err != nil {
|
||||
log.Error("Conn.DB() error: ", err)
|
||||
return true
|
||||
}
|
||||
|
||||
err = DB.Ping()
|
||||
if err != nil {
|
||||
log.Error("DB.Ping() error: ", err)
|
||||
return true
|
||||
}
|
||||
return otvet
|
||||
}
|
||||
|
||||
// Reconnect повторное подключение к базе данных, если оно отключено
|
||||
// или полная остановка программы
|
||||
func Reconnect(Connection model.Connection, err error) {
|
||||
mutexReconnect.Lock()
|
||||
defer mutexReconnect.Unlock()
|
||||
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
Conn := MapConn[Connection.ID]
|
||||
if Conn == nil {
|
||||
log.Warn("Reconnect()")
|
||||
err := Connect_err(Connection)
|
||||
if err != nil {
|
||||
log.Error("error: ", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if IsClosed(Connection) {
|
||||
micro.Pause(1000)
|
||||
log.Warn("Reconnect()")
|
||||
err := Connect_err(Connection)
|
||||
if err != nil {
|
||||
log.Error("error: ", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
sError := err.Error()
|
||||
if sError == "Conn closed" {
|
||||
micro.Pause(1000)
|
||||
log.Warn("Reconnect()")
|
||||
err := Connect_err(Connection)
|
||||
if err != nil {
|
||||
log.Error("error: ", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//PgError, ok := err.(*pgconn.PgError)
|
||||
//if ok {
|
||||
// if PgError.Code == "P0001" { // Class P0 — PL/pgSQL Error, RaiseException
|
||||
// return //нужен
|
||||
// }
|
||||
//}
|
||||
|
||||
//остановим программу т.к. она не должна работать при неработающеё БД
|
||||
log.Error("STOP app. Error: ", err)
|
||||
stopapp.StopApp()
|
||||
|
||||
}
|
||||
|
||||
// CloseConnectionAll - закрытие всех соединений к базам данных
|
||||
func CloseConnectionAll() {
|
||||
|
||||
var MapConnection_copy = make(map[int64]model.Connection)
|
||||
|
||||
maps.Copy(MapConnection_copy, MapConnection) // копия для race error
|
||||
|
||||
for _, Connection := range MapConnection_copy {
|
||||
if Connection.Server == "" {
|
||||
continue
|
||||
}
|
||||
CloseConnection(Connection)
|
||||
}
|
||||
}
|
||||
|
||||
// CloseConnection - закрытие соединения с базой данных
|
||||
func CloseConnection(Connection model.Connection) {
|
||||
Conn := MapConn[Connection.ID]
|
||||
if Conn == nil {
|
||||
return
|
||||
}
|
||||
|
||||
err := CloseConnection_err(Connection)
|
||||
if err != nil {
|
||||
log.Error("Postgres gorm stack CloseConnection() error: ", err)
|
||||
} else {
|
||||
log.Info("Postgres gorm stack connection closed")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// CloseConnection_err - закрытие соединения с базой данных
|
||||
func CloseConnection_err(Connection model.Connection) error {
|
||||
|
||||
Conn := MapConn[Connection.ID]
|
||||
if Conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
DB, err := Conn.DB()
|
||||
if err != nil {
|
||||
log.Error("Conn.DB() error: ", err)
|
||||
return err
|
||||
}
|
||||
err = DB.Close()
|
||||
if err != nil {
|
||||
log.Error("DB.Close() error: ", err)
|
||||
}
|
||||
Conn = nil
|
||||
|
||||
MutexConnection.Lock() //race
|
||||
|
||||
delete(MapConnection, Connection.ID)
|
||||
delete(MapConn, Connection.ID)
|
||||
|
||||
MutexConnection.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// WaitStop - ожидает отмену глобального контекста или сигнала завершения приложения
|
||||
func WaitStop() {
|
||||
|
||||
select {
|
||||
//case <-stopapp.SignalInterrupt:
|
||||
// log.Warn("Interrupt clean shutdown.")
|
||||
// contextmain.CancelContext()
|
||||
case <-contextmain.GetContext().Done():
|
||||
log.Warn("Context app is canceled.")
|
||||
}
|
||||
|
||||
//
|
||||
stopapp.WaitTotalMessagesSendingNow("Postgres stek")
|
||||
|
||||
//
|
||||
CloseConnectionAll()
|
||||
|
||||
stopapp.GetWaitGroup_Main().Done()
|
||||
}
|
||||
|
||||
// StartDB - делает соединение с БД, отключение и др.
|
||||
func StartDB(Connection model.Connection) {
|
||||
Connect(Connection)
|
||||
|
||||
stopapp.GetWaitGroup_Main().Add(1)
|
||||
go WaitStop()
|
||||
|
||||
stopapp.GetWaitGroup_Main().Add(1)
|
||||
go ping_go()
|
||||
|
||||
}
|
||||
|
||||
// GetDSN - возвращает строку соединения к базе данных
|
||||
func GetDSN(Connection model.Connection) string {
|
||||
dsn := "host=" + Connection.Server + " "
|
||||
dsn += "user=" + Connection.Login + " "
|
||||
dsn += "password=" + Connection.Password + " "
|
||||
dsn += "dbname=" + Connection.DbName + " "
|
||||
dsn += "port=" + Connection.Port + " sslmode=disable TimeZone=UTC"
|
||||
|
||||
return dsn
|
||||
}
|
||||
|
||||
// GetConnection - возвращает соединение к нужной базе данных
|
||||
func GetConnection(Connection model.Connection) *gorm.DB {
|
||||
Conn := MapConn[Connection.ID]
|
||||
if Conn == nil {
|
||||
Connect(Connection)
|
||||
Conn = MapConn[Connection.ID]
|
||||
}
|
||||
|
||||
return Conn
|
||||
}
|
||||
|
||||
// ping_go - делает пинг каждые 60 секунд, и реконнект
|
||||
func ping_go() {
|
||||
|
||||
ticker := time.NewTicker(60 * time.Second)
|
||||
|
||||
//бесконечный цикл
|
||||
loop:
|
||||
for {
|
||||
|
||||
for _, Connection := range MapConnection {
|
||||
if Connection.Server == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
addr := Connection.Server + ":" + Connection.Port
|
||||
|
||||
select {
|
||||
case <-contextmain.GetContext().Done():
|
||||
log.Warn("Context app is canceled. postgres_stek.ping")
|
||||
break loop
|
||||
case <-ticker.C:
|
||||
err := ping.Ping_err(Connection.Server, Connection.Port)
|
||||
//log.Debug("ticker, ping err: ", err) //удалить
|
||||
if err != nil {
|
||||
NeedReconnect = true
|
||||
log.Warn("postgres_stek Ping(", addr, ") error: ", err)
|
||||
} else if NeedReconnect == true {
|
||||
log.Warn("postgres_stek Ping(", addr, ") OK. Start Reconnect()")
|
||||
NeedReconnect = false
|
||||
Connect(Connection)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stopapp.GetWaitGroup_Main().Done()
|
||||
}
|
||||
Reference in New Issue
Block a user