mirror of
https://github.com/ManyakRus/starter.git
synced 2024-11-21 18:16:31 +02:00
сделал GetOffsetFromGroupID()
This commit is contained in:
parent
a7709b057a
commit
463b5bde0b
@ -224,8 +224,8 @@ func ConnectTopic(TopicName, GroupID string) *kafka.Reader {
|
||||
}
|
||||
|
||||
// GetOffsetFromGroupID - получает оффсет группы для конкретного топика, партиция 0
|
||||
func GetOffsetFromGroupID(TopicName, GroupID string) (*kafka.OffsetFetchResponse, error) {
|
||||
var Otvet *kafka.OffsetFetchResponse
|
||||
func GetOffsetFromGroupID(TopicName, GroupID string) (int64, error) {
|
||||
var Otvet int64 = 0
|
||||
var err error
|
||||
|
||||
//
|
||||
@ -246,11 +246,21 @@ func GetOffsetFromGroupID(TopicName, GroupID string) (*kafka.OffsetFetchResponse
|
||||
OFR.Topics = MapTopics
|
||||
|
||||
//
|
||||
Otvet, err = Client.OffsetFetch(ctx, &OFR)
|
||||
Response, err := Client.OffsetFetch(ctx, &OFR)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("OffsetFetch() error: %w", err)
|
||||
return Otvet, err
|
||||
}
|
||||
|
||||
//
|
||||
MassOffset := Response.Topics[TopicName]
|
||||
if len(MassOffset) != 1 {
|
||||
err = fmt.Errorf("GetOffsetFromGroupID() error: len(MassOffset) != 1")
|
||||
return Otvet, err
|
||||
}
|
||||
|
||||
//
|
||||
Otvet = MassOffset[0].CommittedOffset
|
||||
|
||||
return Otvet, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user