mirror of
https://github.com/ManyakRus/starter.git
synced 2025-12-10 00:51:35 +02:00
115 lines
3.3 KiB
Go
115 lines
3.3 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/deleteacls"
|
|
)
|
|
|
|
// DeleteACLsRequest represents a request sent to a kafka broker to delete
|
|
// ACLs.
|
|
type DeleteACLsRequest struct {
|
|
// Address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// List of ACL filters to use for deletion.
|
|
Filters []DeleteACLsFilter
|
|
}
|
|
|
|
type DeleteACLsFilter struct {
|
|
ResourceTypeFilter ResourceType
|
|
ResourceNameFilter string
|
|
ResourcePatternTypeFilter PatternType
|
|
PrincipalFilter string
|
|
HostFilter string
|
|
Operation ACLOperationType
|
|
PermissionType ACLPermissionType
|
|
}
|
|
|
|
// DeleteACLsResponse represents a response from a kafka broker to an ACL
|
|
// deletion request.
|
|
type DeleteACLsResponse struct {
|
|
// The amount of time that the broker throttled the request.
|
|
Throttle time.Duration
|
|
|
|
// List of the results from the deletion request.
|
|
Results []DeleteACLsResult
|
|
}
|
|
|
|
type DeleteACLsResult struct {
|
|
Error error
|
|
MatchingACLs []DeleteACLsMatchingACLs
|
|
}
|
|
|
|
type DeleteACLsMatchingACLs struct {
|
|
Error error
|
|
ResourceType ResourceType
|
|
ResourceName string
|
|
ResourcePatternType PatternType
|
|
Principal string
|
|
Host string
|
|
Operation ACLOperationType
|
|
PermissionType ACLPermissionType
|
|
}
|
|
|
|
// DeleteACLs sends ACLs deletion request to a kafka broker and returns the
|
|
// response.
|
|
func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) {
|
|
filters := make([]deleteacls.RequestFilter, 0, len(req.Filters))
|
|
|
|
for _, filter := range req.Filters {
|
|
filters = append(filters, deleteacls.RequestFilter{
|
|
ResourceTypeFilter: int8(filter.ResourceTypeFilter),
|
|
ResourceNameFilter: filter.ResourceNameFilter,
|
|
ResourcePatternTypeFilter: int8(filter.ResourcePatternTypeFilter),
|
|
PrincipalFilter: filter.PrincipalFilter,
|
|
HostFilter: filter.HostFilter,
|
|
Operation: int8(filter.Operation),
|
|
PermissionType: int8(filter.PermissionType),
|
|
})
|
|
}
|
|
|
|
m, err := c.roundTrip(ctx, req.Addr, &deleteacls.Request{
|
|
Filters: filters,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).DeleteACLs: %w", err)
|
|
}
|
|
|
|
res := m.(*deleteacls.Response)
|
|
|
|
results := make([]DeleteACLsResult, 0, len(res.FilterResults))
|
|
|
|
for _, result := range res.FilterResults {
|
|
matchingACLs := make([]DeleteACLsMatchingACLs, 0, len(result.MatchingACLs))
|
|
|
|
for _, matchingACL := range result.MatchingACLs {
|
|
matchingACLs = append(matchingACLs, DeleteACLsMatchingACLs{
|
|
Error: makeError(matchingACL.ErrorCode, matchingACL.ErrorMessage),
|
|
ResourceType: ResourceType(matchingACL.ResourceType),
|
|
ResourceName: matchingACL.ResourceName,
|
|
ResourcePatternType: PatternType(matchingACL.ResourcePatternType),
|
|
Principal: matchingACL.Principal,
|
|
Host: matchingACL.Host,
|
|
Operation: ACLOperationType(matchingACL.Operation),
|
|
PermissionType: ACLPermissionType(matchingACL.PermissionType),
|
|
})
|
|
}
|
|
|
|
results = append(results, DeleteACLsResult{
|
|
Error: makeError(result.ErrorCode, result.ErrorMessage),
|
|
MatchingACLs: matchingACLs,
|
|
})
|
|
}
|
|
|
|
ret := &DeleteACLsResponse{
|
|
Throttle: makeDuration(res.ThrottleTimeMs),
|
|
Results: results,
|
|
}
|
|
|
|
return ret, nil
|
|
}
|