mirror of
https://github.com/ManyakRus/starter.git
synced 2025-11-24 22:53:52 +02:00
267 lines
6.2 KiB
Go
267 lines
6.2 KiB
Go
package kafka_connect
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/ManyakRus/starter/log"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
//"github.com/ManyakRus/starter/common/v0/micro"
|
|
"github.com/ManyakRus/starter/contextmain"
|
|
"github.com/ManyakRus/starter/stopapp"
|
|
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
// Conn - соединение к серверу nats
|
|
var Conn *kafka.Conn
|
|
|
|
// log - глобальный логгер
|
|
//var log = logger.GetLog()
|
|
|
|
// Settings хранит все нужные переменные окружения
|
|
var Settings SettingsINI
|
|
|
|
// SettingsINI - структура для хранения всех нужных переменных окружения
|
|
type SettingsINI struct {
|
|
KAFKA_HOST string
|
|
KAFKA_PORT string
|
|
KAFKA_LOGIN string
|
|
KAFKA_PASSWORD string
|
|
}
|
|
|
|
// Client - клиент для Kafka
|
|
var Client *kafka.Client
|
|
|
|
// Connect - подключается к серверу Kafka
|
|
func Connect() {
|
|
var err error
|
|
|
|
err = Connect_err()
|
|
LogInfo_Connected(err)
|
|
}
|
|
|
|
// LogInfo_Connected - выводит сообщение в Лог, или паника при ошибке
|
|
func LogInfo_Connected(err error) {
|
|
if err != nil {
|
|
log.Panicln("KAFKA Connect() host: ", Settings.KAFKA_HOST, " error: ", err)
|
|
} else {
|
|
log.Info("KAFKA Connect() OK, host: ", Settings.KAFKA_HOST)
|
|
}
|
|
|
|
}
|
|
|
|
// Connect_err - подключается к серверу Kafka и возвращает ошибку
|
|
func Connect_err() error {
|
|
var err error
|
|
|
|
if Settings.KAFKA_HOST == "" {
|
|
FillSettings()
|
|
}
|
|
|
|
//sKAFKA_PORT := (Settings.KAFKA_PORT)
|
|
//URL := "nats://" + Settings.KAFKA_HOST + ":" + sKAFKA_PORT
|
|
//UserInfo := nats.UserInfo(Settings.KAFKA_LOGIN, Settings.KAFKA_PASSWORD)
|
|
Conn, err = kafka.Dial("tcp", Settings.KAFKA_HOST+":"+Settings.KAFKA_PORT)
|
|
|
|
//
|
|
err = CreateClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
//nats.ManualAck()
|
|
return err
|
|
}
|
|
|
|
// CreateClient - создаёт клиент для Kafka
|
|
func CreateClient() error {
|
|
var err error
|
|
|
|
//Addr, err := GetAddr()
|
|
//if err != nil {
|
|
// err = fmt.Errorf("GetAddr() error: %w", err)
|
|
//}
|
|
|
|
Addr := kafka.TCP("localhost:9092")
|
|
|
|
Client = &kafka.Client{}
|
|
Client.Addr = Addr
|
|
|
|
return err
|
|
}
|
|
|
|
// GetAddr - создаёт Addr
|
|
func GetAddr() net.Addr {
|
|
URL := Settings.KAFKA_HOST + ":" + Settings.KAFKA_PORT
|
|
Otvet := kafka.TCP(URL)
|
|
|
|
return Otvet
|
|
}
|
|
|
|
// StartKafka - необходимые процедуры для подключения к серверу Kafka
|
|
func StartKafka() {
|
|
var err error
|
|
|
|
ctx := contextmain.GetContext()
|
|
WaitGroup := stopapp.GetWaitGroup_Main()
|
|
err = Start_ctx(&ctx, WaitGroup)
|
|
LogInfo_Connected(err)
|
|
|
|
}
|
|
|
|
// Start_ctx - необходимые процедуры для подключения к серверу Kafka
|
|
// Свой контекст и WaitGroup нужны для остановки работы сервиса Graceful shutdown
|
|
// Для тех кто пользуется этим репозиторием для старта и останова сервиса можно просто StartKafka()
|
|
func Start_ctx(ctx *context.Context, WaitGroup *sync.WaitGroup) error {
|
|
var err error
|
|
|
|
//запомним к себе контекст
|
|
contextmain.Ctx = ctx
|
|
if ctx == nil {
|
|
contextmain.GetContext()
|
|
}
|
|
|
|
//запомним к себе WaitGroup
|
|
stopapp.SetWaitGroup_Main(WaitGroup)
|
|
if WaitGroup == nil {
|
|
stopapp.StartWaitStop()
|
|
}
|
|
|
|
//
|
|
err = Connect_err()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
stopapp.GetWaitGroup_Main().Add(1)
|
|
go WaitStop()
|
|
|
|
return err
|
|
}
|
|
|
|
// CloseConnection - закрывает соединение с сервером Kafka
|
|
func CloseConnection() {
|
|
var err error
|
|
|
|
if Conn == nil {
|
|
return
|
|
}
|
|
|
|
err = Conn.Close()
|
|
if err != nil {
|
|
log.Error("KAFKA CloseConnection() error: ", err)
|
|
} else {
|
|
log.Info("KAFKA stopped")
|
|
}
|
|
|
|
//
|
|
Client = nil
|
|
|
|
return
|
|
}
|
|
|
|
// WaitStop - ожидает отмену глобального контекста
|
|
func WaitStop() {
|
|
|
|
select {
|
|
case <-contextmain.GetContext().Done():
|
|
log.Warn("Context app is canceled.")
|
|
}
|
|
|
|
//
|
|
stopapp.WaitTotalMessagesSendingNow("KAFKA_connect")
|
|
|
|
//
|
|
CloseConnection()
|
|
|
|
stopapp.GetWaitGroup_Main().Done()
|
|
}
|
|
|
|
// FillSettings загружает переменные окружения в структуру из файла или из переменных окружения
|
|
func FillSettings() {
|
|
Settings = SettingsINI{}
|
|
Settings.KAFKA_HOST = os.Getenv("KAFKA_HOST")
|
|
Settings.KAFKA_PORT = os.Getenv("KAFKA_PORT")
|
|
Settings.KAFKA_LOGIN = os.Getenv("KAFKA_LOGIN")
|
|
Settings.KAFKA_PASSWORD = os.Getenv("KAFKA_PASSWORD")
|
|
|
|
if Settings.KAFKA_HOST == "" {
|
|
log.Panicln("Need fill KAFKA_HOST ! in os.ENV ")
|
|
}
|
|
|
|
if Settings.KAFKA_PORT == "" {
|
|
log.Panicln("Need fill KAFKA_PORT ! in os.ENV ")
|
|
}
|
|
|
|
//if Settings.KAFKA_LOGIN == "" {
|
|
// log.Panicln("Need fill KAFKA_LOGIN ! in os.ENV ")
|
|
//}
|
|
//
|
|
//if Settings.KAFKA_PASSWORD == "" {
|
|
// log.Panicln("Need fill KAFKA_PASSWORD ! in os.ENV ")
|
|
//}
|
|
|
|
//
|
|
}
|
|
|
|
// ConnectTopic - подключает кафку к нужному топику
|
|
func ConnectTopic(TopicName, GroupID string) *kafka.Reader {
|
|
|
|
// make a new reader that consumes from topic
|
|
KafkaReader := kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: []string{Settings.KAFKA_HOST + ":" + Settings.KAFKA_PORT},
|
|
GroupID: GroupID,
|
|
Topic: TopicName,
|
|
MinBytes: 10, // 10KB 10e3
|
|
MaxBytes: 10e6, // 10MB
|
|
})
|
|
|
|
return KafkaReader
|
|
}
|
|
|
|
// GetOffsetFromGroupID - получает оффсет группы для конкретного топика, партиция 0
|
|
func GetOffsetFromGroupID(TopicName, GroupID string) (int64, error) {
|
|
var Otvet int64 = 0
|
|
var err error
|
|
|
|
//
|
|
ctxMain := contextmain.GetContext()
|
|
ctx, ctxCancelFunc := context.WithTimeout(ctxMain, time.Duration(60)*time.Second)
|
|
defer ctxCancelFunc()
|
|
|
|
//
|
|
PartitionNumber := 0
|
|
MapTopics := make(map[string][]int)
|
|
MapTopics[TopicName] = []int{PartitionNumber}
|
|
|
|
//
|
|
Addr := GetAddr()
|
|
OFR := kafka.OffsetFetchRequest{}
|
|
OFR.Addr = Addr
|
|
OFR.GroupID = GroupID
|
|
OFR.Topics = MapTopics
|
|
|
|
//
|
|
Response, err := Client.OffsetFetch(ctx, &OFR)
|
|
if err != nil {
|
|
err = fmt.Errorf("OffsetFetch() error: %w", err)
|
|
return Otvet, err
|
|
}
|
|
|
|
//
|
|
MassOffset := Response.Topics[TopicName]
|
|
if len(MassOffset) != 1 {
|
|
err = fmt.Errorf("GetOffsetFromGroupID() error: len(MassOffset) != 1")
|
|
return Otvet, err
|
|
}
|
|
|
|
//
|
|
Otvet = MassOffset[0].CommittedOffset
|
|
|
|
return Otvet, err
|
|
}
|