From 33d51a84c3094e650d874e8a2d5dc0fc1b35b1b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8C=85=E5=AD=90?= Date: Mon, 9 Jan 2023 11:48:12 +0800 Subject: [PATCH] feat(registry): consul support re-registry when service does not exist (#2606) --- contrib/registry/consul/client.go | 30 ++++++++++------------------- contrib/registry/consul/registry.go | 14 ++++++++++---- 2 files changed, 20 insertions(+), 24 deletions(-) diff --git a/contrib/registry/consul/client.go b/contrib/registry/consul/client.go index dec4a7f84..31d0da687 100644 --- a/contrib/registry/consul/client.go +++ b/contrib/registry/consul/client.go @@ -3,6 +3,7 @@ package consul import ( "context" "fmt" + "math/rand" "net" "net/url" "strconv" @@ -41,25 +42,6 @@ type Client struct { serviceChecks api.AgentServiceChecks } -// Deprecated use newClient instead. -func NewClient(cli *api.Client) *Client { - return newClient(cli, SingleDatacenter) -} - -func newClient(cli *api.Client, dc Datacenter) *Client { - c := &Client{ - dc: dc, - cli: cli, - resolver: defaultResolver, - healthcheckInterval: 10, - heartbeat: true, - deregisterCriticalServiceAfter: 600, - } - - c.ctx, c.cancel = context.WithCancel(context.Background()) - return c -} - func defaultResolver(_ context.Context, entries []*api.ServiceEntry) []*registry.ServiceInstance { services := make([]*registry.ServiceInstance, 0, len(entries)) for _, entry := range entries { @@ -225,7 +207,15 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab case <-ticker.C: err = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass") if err != nil { - log.Errorf("[Consul]update ttl heartbeat to consul failed!err:=%v", err) + log.Errorf("[Consul] update ttl heartbeat to consul failed! err=%v", err) + // when the previous report fails, try to re register the service + time.AfterFunc(time.Duration(rand.Intn(5))*time.Second, func() { + if err := c.cli.Agent().ServiceRegister(asr); err != nil { + log.Errorf("[Consul] re registry service failed!, err=%v", err) + } else { + log.Warn("[Consul] re registry of service occurred success") + } + }) } case <-c.ctx.Done(): return diff --git a/contrib/registry/consul/registry.go b/contrib/registry/consul/registry.go index e5ac1819c..8105ab1d0 100644 --- a/contrib/registry/consul/registry.go +++ b/contrib/registry/consul/registry.go @@ -37,7 +37,7 @@ func WithTimeout(timeout time.Duration) Option { // WithDatacenter with registry datacenter option func WithDatacenter(dc Datacenter) Option { return func(o *Registry) { - o.dc = dc + o.cli.dc = dc } } @@ -98,21 +98,27 @@ type Registry struct { registry map[string]*serviceSet lock sync.RWMutex timeout time.Duration - dc Datacenter } // New creates consul registry func New(apiClient *api.Client, opts ...Option) *Registry { r := &Registry{ - dc: SingleDatacenter, registry: make(map[string]*serviceSet), enableHealthCheck: true, timeout: 10 * time.Second, + cli: &Client{ + dc: SingleDatacenter, + cli: apiClient, + resolver: defaultResolver, + healthcheckInterval: 10, + heartbeat: true, + deregisterCriticalServiceAfter: 600, + }, } for _, o := range opts { o(r) } - r.cli = newClient(apiClient, r.dc) + r.cli.ctx, r.cli.cancel = context.WithCancel(context.Background()) return r }