mirror of
https://github.com/ManyakRus/starter.git
synced 2025-02-15 15:10:29 +02:00
138 lines
2.9 KiB
Go
138 lines
2.9 KiB
Go
package kafka_connect
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/ManyakRus/starter/config_main"
|
|
"github.com/ManyakRus/starter/contextmain"
|
|
"github.com/ManyakRus/starter/micro"
|
|
"github.com/segmentio/kafka-go"
|
|
"testing"
|
|
"time"
|
|
|
|
//"github.com/ManyakRus/starter/common/v0/logger"
|
|
"github.com/ManyakRus/starter/stopapp"
|
|
)
|
|
|
|
var TEXT_CONTEXT_DEADLINE = "context deadline exceeded"
|
|
|
|
func TestConnect_err(t *testing.T) {
|
|
//ProgramDir := micro.ProgramDir_Common()
|
|
config_main.LoadEnv()
|
|
err := Connect_err()
|
|
if err != nil {
|
|
t.Error("nats_connect.TestConnect_err() error: ", err)
|
|
}
|
|
CloseConnection()
|
|
}
|
|
|
|
func TestCloseConnection(t *testing.T) {
|
|
//ProgramDir := micro.ProgramDir_Common()
|
|
config_main.LoadEnv()
|
|
Connect()
|
|
CloseConnection()
|
|
}
|
|
|
|
func TestStart(t *testing.T) {
|
|
//ProgramDir := micro.ProgramDir_Common()
|
|
config_main.LoadEnv()
|
|
StartKafka()
|
|
micro.Pause(20)
|
|
|
|
_ = contextmain.GetContext()
|
|
contextmain.CancelContext()
|
|
|
|
stopapp.GetWaitGroup_Main().Wait()
|
|
|
|
contextmain.GetNewContext()
|
|
}
|
|
|
|
func TestWaitStop(t *testing.T) {
|
|
|
|
stopapp.GetWaitGroup_Main().Add(1)
|
|
go WaitStop()
|
|
|
|
_ = contextmain.GetContext()
|
|
contextmain.CancelContext()
|
|
|
|
stopapp.GetWaitGroup_Main().Wait()
|
|
|
|
contextmain.GetNewContext()
|
|
}
|
|
|
|
func TestConnect(t *testing.T) {
|
|
//ProgramDir := micro.ProgramDir_Common()
|
|
config_main.LoadEnv()
|
|
Connect()
|
|
defer CloseConnection()
|
|
}
|
|
|
|
func TestReadTopic(t *testing.T) {
|
|
config_main.LoadEnv()
|
|
FillSettings()
|
|
//Connect()
|
|
//defer CloseConnection()
|
|
|
|
KafkaReader := ConnectTopic("KAFKA_SERVICE", "")
|
|
|
|
ctxMain := context.Background()
|
|
ctx, ctxCancelFunc := context.WithTimeout(ctxMain, time.Duration(1)*time.Second)
|
|
defer ctxCancelFunc()
|
|
|
|
mess, err := KafkaReader.FetchMessage(ctx)
|
|
if err != nil {
|
|
if err.Error() == TEXT_CONTEXT_DEADLINE {
|
|
t.Log(" KafkaReader.FetchMessage() ", TEXT_CONTEXT_DEADLINE)
|
|
} else {
|
|
t.Error("FetchMessage() error: ", err)
|
|
}
|
|
} else {
|
|
t.Logf("new message: %#v", mess)
|
|
}
|
|
}
|
|
|
|
func TestOffsetFetch(t *testing.T) {
|
|
t.SkipNow() //ненужный, только для АЭС
|
|
|
|
config_main.LoadEnv()
|
|
FillSettings()
|
|
CreateClient()
|
|
//Connect()
|
|
//defer CloseConnection()
|
|
|
|
//
|
|
Addr := GetAddr()
|
|
|
|
//
|
|
ctx := context.Background()
|
|
MapTopics := make(map[string][]int)
|
|
MapTopics["kol_atom_ul_uni.stack.documents"] = []int{0}
|
|
|
|
OFR := kafka.OffsetFetchRequest{}
|
|
OFR.Addr = Addr
|
|
OFR.GroupID = "debezium_adapter_dev_documents"
|
|
OFR.Topics = MapTopics
|
|
Response, err := Client.OffsetFetch(ctx, &OFR)
|
|
if err != nil {
|
|
t.Errorf("TestOffsetFetch() error: %v", err)
|
|
return
|
|
}
|
|
fmt.Printf("%v", *Response)
|
|
}
|
|
|
|
func TestGetOffsetFromGroupID(t *testing.T) {
|
|
t.SkipNow() //ненужный, только для АЭС
|
|
|
|
config_main.LoadEnv()
|
|
FillSettings()
|
|
CreateClient()
|
|
|
|
TopicName := "kol_atom_ul_uni.stack.documents"
|
|
GroupID := "debezium_adapter_dev_documents"
|
|
Otvet, err := GetOffsetFromGroupID(TopicName, GroupID)
|
|
if err != nil {
|
|
t.Error("TestGetOffsetFromGroupID() error: ", err)
|
|
}
|
|
fmt.Printf("Otvet: %v", Otvet)
|
|
}
|