1
0
mirror of https://github.com/ManyakRus/starter.git synced 2025-02-15 15:10:29 +02:00

сделал GetOffsetFromGroupID()

This commit is contained in:
Nikitin Aleksandr 2024-11-12 14:56:24 +03:00
parent 6ad6d27f73
commit ea3361fbc3
2 changed files with 109 additions and 15 deletions

View File

@ -2,9 +2,12 @@ package kafka_connect
import ( import (
"context" "context"
"fmt"
"github.com/ManyakRus/starter/logger" "github.com/ManyakRus/starter/logger"
"net"
"os" "os"
"sync" "sync"
"time"
//"github.com/ManyakRus/starter/common/v0/micro" //"github.com/ManyakRus/starter/common/v0/micro"
"github.com/ManyakRus/starter/contextmain" "github.com/ManyakRus/starter/contextmain"
@ -30,6 +33,9 @@ type SettingsINI struct {
KAFKA_PASSWORD string KAFKA_PASSWORD string
} }
// Client - клиент для Kafka
var Client *kafka.Client
// Connect - подключается к серверу Nats // Connect - подключается к серверу Nats
func Connect() { func Connect() {
var err error var err error
@ -61,10 +67,41 @@ func Connect_err() error {
//UserInfo := nats.UserInfo(Settings.KAFKA_LOGIN, Settings.KAFKA_PASSWORD) //UserInfo := nats.UserInfo(Settings.KAFKA_LOGIN, Settings.KAFKA_PASSWORD)
Conn, err = kafka.Dial("tcp", Settings.KAFKA_HOST+":"+Settings.KAFKA_PORT) Conn, err = kafka.Dial("tcp", Settings.KAFKA_HOST+":"+Settings.KAFKA_PORT)
//
err = CreateClient()
if err != nil {
return err
}
//nats.ManualAck() //nats.ManualAck()
return err return err
} }
// CreateClient - создаёт клиент для Kafka
func CreateClient() error {
var err error
//Addr, err := GetAddr()
//if err != nil {
// err = fmt.Errorf("GetAddr() error: %w", err)
//}
Addr := kafka.TCP("localhost:9092")
Client = &kafka.Client{}
Client.Addr = Addr
return err
}
// GetAddr - создаёт Addr
func GetAddr() net.Addr {
URL := Settings.KAFKA_HOST + ":" + Settings.KAFKA_PORT
Otvet := kafka.TCP(URL)
return Otvet
}
// StartKafka - необходимые процедуры для подключения к серверу Kafka // StartKafka - необходимые процедуры для подключения к серверу Kafka
func StartKafka() { func StartKafka() {
var err error var err error
@ -182,3 +219,35 @@ func ConnectTopic(TopicName, GroupID string) *kafka.Reader {
return KafkaReader return KafkaReader
} }
// GetOffsetFromGroupID - получает оффсет группы для конкретного топика, партиция 0
func GetOffsetFromGroupID(TopicName, GroupID string) (*kafka.OffsetFetchResponse, error) {
var Otvet *kafka.OffsetFetchResponse
var err error
//
ctxMain := contextmain.GetContext()
ctx, ctxCancelFunc := context.WithTimeout(ctxMain, time.Duration(60)*time.Second)
defer ctxCancelFunc()
//
PartitionNumber := 0
MapTopics := make(map[string][]int)
MapTopics[TopicName] = []int{PartitionNumber}
//
Addr := GetAddr()
OFR := kafka.OffsetFetchRequest{}
OFR.Addr = Addr
OFR.GroupID = GroupID
OFR.Topics = MapTopics
//
Otvet, err = Client.OffsetFetch(ctx, &OFR)
if err != nil {
err = fmt.Errorf("OffsetFetch() error: %w", err)
return Otvet, err
}
return Otvet, err
}

View File

@ -6,6 +6,7 @@ import (
"github.com/ManyakRus/starter/config_main" "github.com/ManyakRus/starter/config_main"
"github.com/ManyakRus/starter/contextmain" "github.com/ManyakRus/starter/contextmain"
"github.com/ManyakRus/starter/micro" "github.com/ManyakRus/starter/micro"
"github.com/segmentio/kafka-go"
"testing" "testing"
"time" "time"
@ -90,23 +91,47 @@ func TestReadTopic(t *testing.T) {
} }
} }
func TestReadGroups(t *testing.T) { func TestOffsetFetch(t *testing.T) {
t.SkipNow() //ненужный, только для АЭС
config_main.LoadEnv() config_main.LoadEnv()
Connect() FillSettings()
defer CloseConnection() CreateClient()
//Connect()
//defer CloseConnection()
partitions, err := Conn.ReadPartitions("kol_atom_ul_uni.stack.documents") //
Addr := GetAddr()
//
ctx := context.Background()
MapTopics := make(map[string][]int)
MapTopics["kol_atom_ul_uni.stack.documents"] = []int{0}
OFR := kafka.OffsetFetchRequest{}
OFR.Addr = Addr
OFR.GroupID = "debezium_adapter_dev_documents"
OFR.Topics = MapTopics
Response, err := Client.OffsetFetch(ctx, &OFR)
if err != nil { if err != nil {
panic(err.Error()) t.Errorf("TestOffsetFetch() error: %v", err)
} return
m := map[string]struct{}{}
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
for k := range m {
fmt.Println(k)
} }
fmt.Printf("%v", *Response)
}
func TestGetOffsetFromGroupID(t *testing.T) {
t.SkipNow() //ненужный, только для АЭС
config_main.LoadEnv()
FillSettings()
CreateClient()
TopicName := "kol_atom_ul_uni.stack.documents"
GroupID := "debezium_adapter_dev_documents"
Otvet, err := GetOffsetFromGroupID(TopicName, GroupID)
if err != nil {
t.Error("TestGetOffsetFromGroupID() error: ", err)
}
fmt.Printf("Otvet: %v", Otvet)
} }