1
0
mirror of https://github.com/ManyakRus/starter.git synced 2025-12-06 00:18:25 +02:00
Files
starter/vendor/github.com/segmentio/kafka-go/listpartitionreassignments.go
2023-12-25 13:30:55 +03:00

136 lines
3.7 KiB
Go

package kafka
import (
"context"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/listpartitionreassignments"
)
// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API.
type ListPartitionReassignmentsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// Topics we want reassignments for, mapped by their name, or nil to list everything.
Topics map[string]ListPartitionReassignmentsRequestTopic
// Timeout is the amount of time to wait for the request to complete.
Timeout time.Duration
}
// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single
// topic.
type ListPartitionReassignmentsRequestTopic struct {
// The partitions to list partition reassignments for.
PartitionIndexes []int
}
// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API.
type ListPartitionReassignmentsResponse struct {
// Error is set to a non-nil value including the code and message if a top-level
// error was encountered.
Error error
// Topics contains results for each topic, mapped by their name.
Topics map[string]ListPartitionReassignmentsResponseTopic
}
// ListPartitionReassignmentsResponseTopic contains the detailed result of
// ongoing reassignments for a topic.
type ListPartitionReassignmentsResponseTopic struct {
// Partitions contains result for topic partitions.
Partitions []ListPartitionReassignmentsResponsePartition
}
// ListPartitionReassignmentsResponsePartition contains the detailed result of
// ongoing reassignments for a single partition.
type ListPartitionReassignmentsResponsePartition struct {
// PartitionIndex contains index of the partition.
PartitionIndex int
// Replicas contains the current replica set.
Replicas []int
// AddingReplicas contains the set of replicas we are currently adding.
AddingReplicas []int
// RemovingReplicas contains the set of replicas we are currently removing.
RemovingReplicas []int
}
func (c *Client) ListPartitionReassignments(
ctx context.Context,
req *ListPartitionReassignmentsRequest,
) (*ListPartitionReassignmentsResponse, error) {
apiReq := &listpartitionreassignments.Request{
TimeoutMs: int32(req.Timeout.Milliseconds()),
}
for topicName, topicReq := range req.Topics {
apiReq.Topics = append(
apiReq.Topics,
listpartitionreassignments.RequestTopic{
Name: topicName,
PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes),
},
)
}
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
apiResp := protoResp.(*listpartitionreassignments.Response)
resp := &ListPartitionReassignmentsResponse{
Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
Topics: make(map[string]ListPartitionReassignmentsResponseTopic),
}
for _, topicResult := range apiResp.Topics {
respTopic := ListPartitionReassignmentsResponseTopic{}
for _, partitionResult := range topicResult.Partitions {
respTopic.Partitions = append(
respTopic.Partitions,
ListPartitionReassignmentsResponsePartition{
PartitionIndex: int(partitionResult.PartitionIndex),
Replicas: int32ToIntArray(partitionResult.Replicas),
AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas),
RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas),
},
)
}
resp.Topics[topicResult.Name] = respTopic
}
return resp, nil
}
func intToInt32Array(arr []int) []int32 {
if arr == nil {
return nil
}
res := make([]int32, len(arr))
for i := range arr {
res[i] = int32(arr[i])
}
return res
}
func int32ToIntArray(arr []int32) []int {
if arr == nil {
return nil
}
res := make([]int, len(arr))
for i := range arr {
res[i] = int(arr[i])
}
return res
}