1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-01-28 03:57:02 +02:00

feat(registry): consul support re-registry when service does not exist (#2606)

This commit is contained in:
包子 2023-01-09 11:48:12 +08:00 committed by GitHub
parent eafbe908a8
commit 33d51a84c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 24 deletions

View File

@ -3,6 +3,7 @@ package consul
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand"
"net" "net"
"net/url" "net/url"
"strconv" "strconv"
@ -41,25 +42,6 @@ type Client struct {
serviceChecks api.AgentServiceChecks serviceChecks api.AgentServiceChecks
} }
// Deprecated use newClient instead.
func NewClient(cli *api.Client) *Client {
return newClient(cli, SingleDatacenter)
}
func newClient(cli *api.Client, dc Datacenter) *Client {
c := &Client{
dc: dc,
cli: cli,
resolver: defaultResolver,
healthcheckInterval: 10,
heartbeat: true,
deregisterCriticalServiceAfter: 600,
}
c.ctx, c.cancel = context.WithCancel(context.Background())
return c
}
func defaultResolver(_ context.Context, entries []*api.ServiceEntry) []*registry.ServiceInstance { func defaultResolver(_ context.Context, entries []*api.ServiceEntry) []*registry.ServiceInstance {
services := make([]*registry.ServiceInstance, 0, len(entries)) services := make([]*registry.ServiceInstance, 0, len(entries))
for _, entry := range entries { for _, entry := range entries {
@ -225,7 +207,15 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab
case <-ticker.C: case <-ticker.C:
err = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass") err = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass")
if err != nil { if err != nil {
log.Errorf("[Consul]update ttl heartbeat to consul failed!err:=%v", err) log.Errorf("[Consul] update ttl heartbeat to consul failed! err=%v", err)
// when the previous report fails, try to re register the service
time.AfterFunc(time.Duration(rand.Intn(5))*time.Second, func() {
if err := c.cli.Agent().ServiceRegister(asr); err != nil {
log.Errorf("[Consul] re registry service failed!, err=%v", err)
} else {
log.Warn("[Consul] re registry of service occurred success")
}
})
} }
case <-c.ctx.Done(): case <-c.ctx.Done():
return return

View File

@ -37,7 +37,7 @@ func WithTimeout(timeout time.Duration) Option {
// WithDatacenter with registry datacenter option // WithDatacenter with registry datacenter option
func WithDatacenter(dc Datacenter) Option { func WithDatacenter(dc Datacenter) Option {
return func(o *Registry) { return func(o *Registry) {
o.dc = dc o.cli.dc = dc
} }
} }
@ -98,21 +98,27 @@ type Registry struct {
registry map[string]*serviceSet registry map[string]*serviceSet
lock sync.RWMutex lock sync.RWMutex
timeout time.Duration timeout time.Duration
dc Datacenter
} }
// New creates consul registry // New creates consul registry
func New(apiClient *api.Client, opts ...Option) *Registry { func New(apiClient *api.Client, opts ...Option) *Registry {
r := &Registry{ r := &Registry{
dc: SingleDatacenter,
registry: make(map[string]*serviceSet), registry: make(map[string]*serviceSet),
enableHealthCheck: true, enableHealthCheck: true,
timeout: 10 * time.Second, timeout: 10 * time.Second,
cli: &Client{
dc: SingleDatacenter,
cli: apiClient,
resolver: defaultResolver,
healthcheckInterval: 10,
heartbeat: true,
deregisterCriticalServiceAfter: 600,
},
} }
for _, o := range opts { for _, o := range opts {
o(r) o(r)
} }
r.cli = newClient(apiClient, r.dc) r.cli.ctx, r.cli.cancel = context.WithCancel(context.Background())
return r return r
} }