diff --git a/contrib/registry/consul/client.go b/contrib/registry/consul/client.go index 62ae663a9..b711a5bfe 100644 --- a/contrib/registry/consul/client.go +++ b/contrib/registry/consul/client.go @@ -18,38 +18,29 @@ type Client struct { cli *api.Client ctx context.Context cancel context.CancelFunc + + // resolve service entry endpoints + resolver ServiceResolver } // NewClient creates consul client func NewClient(cli *api.Client) *Client { - c := &Client{cli: cli} + c := &Client{cli: cli, resolver: defaultResolver} c.ctx, c.cancel = context.WithCancel(context.Background()) return c } -// Service get services from consul -func (d *Client) Service(ctx context.Context, service string, index uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) { - opts := &api.QueryOptions{ - WaitIndex: index, - WaitTime: time.Second * 55, - } - opts = opts.WithContext(ctx) - entries, meta, err := d.cli.Health().Service(service, "", passingOnly, opts) - if err != nil { - return nil, 0, err - } - - services := make([]*registry.ServiceInstance, 0) - +func defaultResolver(_ context.Context, entries []*api.ServiceEntry) []*registry.ServiceInstance { + services := make([]*registry.ServiceInstance, 0, len(entries)) for _, entry := range entries { var version string for _, tag := range entry.Service.Tags { - strs := strings.SplitN(tag, "=", 2) - if len(strs) == 2 && strs[0] == "version" { - version = strs[1] + ss := strings.SplitN(tag, "=", 2) + if len(ss) == 2 && ss[0] == "version" { + version = ss[1] } } - var endpoints []string + var endpoints []string //nolint:prealloc for scheme, addr := range entry.Service.TaggedAddresses { if scheme == "lan_ipv4" || scheme == "wan_ipv4" || scheme == "lan_ipv6" || scheme == "wan_ipv6" { continue @@ -64,11 +55,29 @@ func (d *Client) Service(ctx context.Context, service string, index uint64, pass Endpoints: endpoints, }) } - return services, meta.LastIndex, nil + + return services } -// Register register service instacen to consul -func (d *Client) Register(ctx context.Context, svc *registry.ServiceInstance, enableHealthCheck bool) error { +// ServiceResolver is used to resolve service endpoints +type ServiceResolver func(ctx context.Context, entries []*api.ServiceEntry) []*registry.ServiceInstance + +// Service get services from consul +func (c *Client) Service(ctx context.Context, service string, index uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) { + opts := &api.QueryOptions{ + WaitIndex: index, + WaitTime: time.Second * 55, + } + opts = opts.WithContext(ctx) + entries, meta, err := c.cli.Health().Service(service, "", passingOnly, opts) + if err != nil { + return nil, 0, err + } + return c.resolver(ctx, entries), meta.LastIndex, nil +} + +// Register register service instance to consul +func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enableHealthCheck bool) error { addresses := make(map[string]api.ServiceAddress) checkAddresses := make([]string, 0, len(svc.Endpoints)) for _, endpoint := range svc.Endpoints { @@ -103,7 +112,7 @@ func (d *Client) Register(ctx context.Context, svc *registry.ServiceInstance, en }) } } - err := d.cli.Agent().ServiceRegister(asr) + err := c.cli.Agent().ServiceRegister(asr) if err != nil { return err } @@ -113,8 +122,8 @@ func (d *Client) Register(ctx context.Context, svc *registry.ServiceInstance, en for { select { case <-ticker.C: - _ = d.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass") - case <-d.ctx.Done(): + _ = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass") + case <-c.ctx.Done(): return } } @@ -123,7 +132,7 @@ func (d *Client) Register(ctx context.Context, svc *registry.ServiceInstance, en } // Deregister deregister service by service ID -func (d *Client) Deregister(ctx context.Context, serviceID string) error { - d.cancel() - return d.cli.Agent().ServiceDeregister(serviceID) +func (c *Client) Deregister(_ context.Context, serviceID string) error { + c.cancel() + return c.cli.Agent().ServiceDeregister(serviceID) } diff --git a/contrib/registry/consul/registry.go b/contrib/registry/consul/registry.go index d2348b902..41cdd8c75 100644 --- a/contrib/registry/consul/registry.go +++ b/contrib/registry/consul/registry.go @@ -26,6 +26,15 @@ func WithHealthCheck(enable bool) Option { } } +// WithServiceResolver with endpoint function option. +func WithServiceResolver(fn ServiceResolver) Option { + return func(o *Registry) { + if o.cli != nil { + o.cli.resolver = fn + } + } +} + // Config is consul registry config type Config struct { *api.Config