mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-14 02:33:03 +02:00
fix: do not re register after service logout (#2647)
This commit is contained in:
parent
a006328db6
commit
50da181d69
@ -2,6 +2,7 @@ package consul
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
@ -204,21 +205,32 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab
|
|||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
_ = c.cli.Agent().ServiceDeregister(svc.ID)
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
_ = c.cli.Agent().ServiceDeregister(svc.ID)
|
||||||
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
// ensure that unregistered services will not be re-registered by mistake
|
||||||
|
if errors.Is(c.ctx.Err(), context.Canceled) || errors.Is(c.ctx.Err(), context.DeadlineExceeded) {
|
||||||
|
_ = c.cli.Agent().ServiceDeregister(svc.ID)
|
||||||
|
return
|
||||||
|
}
|
||||||
err = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass")
|
err = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("[Consul] update ttl heartbeat to consul failed! err=%v", err)
|
log.Errorf("[Consul] update ttl heartbeat to consul failed! err=%v", err)
|
||||||
// when the previous report fails, try to re register the service
|
// when the previous report fails, try to re register the service
|
||||||
time.AfterFunc(time.Duration(rand.Intn(5))*time.Second, func() {
|
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
|
||||||
if err := c.cli.Agent().ServiceRegister(asr); err != nil {
|
if err := c.cli.Agent().ServiceRegister(asr); err != nil {
|
||||||
log.Errorf("[Consul] re registry service failed!, err=%v", err)
|
log.Errorf("[Consul] re registry service failed!, err=%v", err)
|
||||||
} else {
|
} else {
|
||||||
log.Warn("[Consul] re registry of service occurred success")
|
log.Warn("[Consul] re registry of service occurred success")
|
||||||
}
|
}
|
||||||
})
|
|
||||||
}
|
}
|
||||||
case <-c.ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -228,6 +240,6 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab
|
|||||||
|
|
||||||
// Deregister service by service ID
|
// Deregister service by service ID
|
||||||
func (c *Client) Deregister(_ context.Context, serviceID string) error {
|
func (c *Client) Deregister(_ context.Context, serviceID string) error {
|
||||||
c.cancel()
|
defer c.cancel()
|
||||||
return c.cli.Agent().ServiceDeregister(serviceID)
|
return c.cli.Agent().ServiceDeregister(serviceID)
|
||||||
}
|
}
|
||||||
|
@ -74,14 +74,14 @@ func TestRegistry_Register(t *testing.T) {
|
|||||||
serverName: "server-1",
|
serverName: "server-1",
|
||||||
server: []*registry.ServiceInstance{
|
server: []*registry.ServiceInstance{
|
||||||
{
|
{
|
||||||
ID: "1",
|
ID: "2",
|
||||||
Name: "server-1",
|
Name: "server-1",
|
||||||
Version: "v0.0.1",
|
Version: "v0.0.1",
|
||||||
Metadata: nil,
|
Metadata: nil,
|
||||||
Endpoints: []string{"http://127.0.0.1:8000"},
|
Endpoints: []string{"http://127.0.0.1:8000"},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
ID: "1",
|
ID: "2",
|
||||||
Name: "server-1",
|
Name: "server-1",
|
||||||
Version: "v0.0.2",
|
Version: "v0.0.2",
|
||||||
Metadata: nil,
|
Metadata: nil,
|
||||||
@ -91,7 +91,7 @@ func TestRegistry_Register(t *testing.T) {
|
|||||||
},
|
},
|
||||||
want: []*registry.ServiceInstance{
|
want: []*registry.ServiceInstance{
|
||||||
{
|
{
|
||||||
ID: "1",
|
ID: "2",
|
||||||
Name: "server-1",
|
Name: "server-1",
|
||||||
Version: "v0.0.2",
|
Version: "v0.0.2",
|
||||||
Metadata: nil,
|
Metadata: nil,
|
||||||
@ -168,6 +168,13 @@ func TestRegistry_GetService(t *testing.T) {
|
|||||||
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
|
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
instance2 := ®istry.ServiceInstance{
|
||||||
|
ID: "2",
|
||||||
|
Name: "server-1",
|
||||||
|
Version: "v0.0.1",
|
||||||
|
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
|
||||||
|
}
|
||||||
|
|
||||||
type fields struct {
|
type fields struct {
|
||||||
registry *Registry
|
registry *Registry
|
||||||
}
|
}
|
||||||
@ -223,10 +230,10 @@ func TestRegistry_GetService(t *testing.T) {
|
|||||||
want: nil,
|
want: nil,
|
||||||
wantErr: true,
|
wantErr: true,
|
||||||
preFunc: func(t *testing.T) {
|
preFunc: func(t *testing.T) {
|
||||||
if err := r.Register(context.Background(), instance1); err != nil {
|
if err := r.Register(context.Background(), instance2); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
watch, err := r.Watch(context.Background(), instance1.Name)
|
watch, err := r.Watch(context.Background(), instance2.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -236,7 +243,7 @@ func TestRegistry_GetService(t *testing.T) {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
deferFunc: func(t *testing.T) {
|
deferFunc: func(t *testing.T) {
|
||||||
err := r.Deregister(context.Background(), instance1)
|
err := r.Deregister(context.Background(), instance2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -282,6 +289,20 @@ func TestRegistry_Watch(t *testing.T) {
|
|||||||
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
|
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
instance2 := ®istry.ServiceInstance{
|
||||||
|
ID: "2",
|
||||||
|
Name: "server-1",
|
||||||
|
Version: "v0.0.1",
|
||||||
|
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
|
||||||
|
}
|
||||||
|
|
||||||
|
instance3 := ®istry.ServiceInstance{
|
||||||
|
ID: "3",
|
||||||
|
Name: "server-1",
|
||||||
|
Version: "v0.0.1",
|
||||||
|
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
|
||||||
|
}
|
||||||
|
|
||||||
type args struct {
|
type args struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel func()
|
cancel func()
|
||||||
@ -316,7 +337,7 @@ func TestRegistry_Watch(t *testing.T) {
|
|||||||
args: args{
|
args: args{
|
||||||
ctx: canceledCtx,
|
ctx: canceledCtx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
instance: instance1,
|
instance: instance2,
|
||||||
opts: []Option{
|
opts: []Option{
|
||||||
WithHealthCheck(false),
|
WithHealthCheck(false),
|
||||||
},
|
},
|
||||||
@ -330,14 +351,14 @@ func TestRegistry_Watch(t *testing.T) {
|
|||||||
name: "register with healthCheck",
|
name: "register with healthCheck",
|
||||||
args: args{
|
args: args{
|
||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
instance: instance1,
|
instance: instance3,
|
||||||
opts: []Option{
|
opts: []Option{
|
||||||
WithHeartbeat(true),
|
WithHeartbeat(true),
|
||||||
WithHealthCheck(true),
|
WithHealthCheck(true),
|
||||||
WithHealthCheckInterval(5),
|
WithHealthCheckInterval(5),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: []*registry.ServiceInstance{instance1},
|
want: []*registry.ServiceInstance{instance3},
|
||||||
wantErr: false,
|
wantErr: false,
|
||||||
preFunc: func(t *testing.T) {
|
preFunc: func(t *testing.T) {
|
||||||
lis, err := net.Listen("tcp", addr)
|
lis, err := net.Listen("tcp", addr)
|
||||||
|
Loading…
Reference in New Issue
Block a user