From 1fdaabbd4813a24c0d8cfcb804da7e8bb52ea105 Mon Sep 17 00:00:00 2001 From: gangan liu <315874482@qq.com> Date: Wed, 20 Mar 2024 09:52:21 +0800 Subject: [PATCH] fix: When grpc ends idle mode, it needs to continue to obtain the latest instance of the service (#3162) * Add test cases * Delete serviceSet when serviceSet has no watcher * The context of resolve is controlled independently * resolve use set context * Remove test declare and do not use it * gofmt * golangci-lint fix --- contrib/registry/consul/registry.go | 8 +- contrib/registry/consul/registry_test.go | 220 +++++++++++++++++++++++ contrib/registry/consul/service.go | 4 + contrib/registry/consul/watcher.go | 4 + 4 files changed, 233 insertions(+), 3 deletions(-) diff --git a/contrib/registry/consul/registry.go b/contrib/registry/consul/registry.go index 8105ab1d0..c83fa74ce 100644 --- a/contrib/registry/consul/registry.go +++ b/contrib/registry/consul/registry.go @@ -190,6 +190,7 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er services: &atomic.Value{}, serviceName: name, } + set.ctx, set.cancel = context.WithCancel(context.Background()) r.registry[name] = set } @@ -208,10 +209,8 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er // otherwise the initial data may be blocked forever during the watch. w.event <- struct{}{} } - if !ok { - err := r.resolve(ctx, set) - if err != nil { + if err := r.resolve(set.ctx, set); err != nil { return nil, err } } @@ -249,6 +248,9 @@ func (r *Registry) resolve(ctx context.Context, ss *serviceSet) error { } idx = tmpIdx case <-ctx.Done(): + r.lock.Lock() + delete(r.registry, ss.serviceName) + r.lock.Unlock() return } } diff --git a/contrib/registry/consul/registry_test.go b/contrib/registry/consul/registry_test.go index 02b617209..d6799b693 100644 --- a/contrib/registry/consul/registry_test.go +++ b/contrib/registry/consul/registry_test.go @@ -413,6 +413,226 @@ func TestRegistry_Watch(t *testing.T) { } } +func TestRegistry_IdleAndWatch(t *testing.T) { + addr := fmt.Sprintf("%s:9091", getIntranetIP()) + + time.Sleep(time.Millisecond * 100) + cli, err := api.NewClient(&api.Config{Address: "127.0.0.1:8500", WaitTime: 2 * time.Second}) + if err != nil { + t.Fatalf("create consul client failed: %v", err) + } + + r := New(cli, []Option{ + WithHealthCheck(false), + }...) + + instance1 := ®istry.ServiceInstance{ + ID: "1", + Name: "server-1", + Version: "v0.0.1", + Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)}, + } + + type args struct { + ctx context.Context + instance *registry.ServiceInstance + } + + tests := []struct { + name string + args args + want []*registry.ServiceInstance + wantErr bool + }{ + { + name: "many client, one idle", + args: args{ + ctx: context.Background(), + instance: instance1, + }, + want: []*registry.ServiceInstance{instance1}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err = r.Register(tt.args.ctx, tt.args.instance) + if err != nil { + t.Error(err) + } + defer func() { + err = r.Deregister(tt.args.ctx, tt.args.instance) + if err != nil { + t.Error(err) + } + }() + + for i := 0; i < 10; i++ { + watch, err1 := r.Watch(context.Background(), tt.args.instance.Name) + if err1 != nil { + t.Error(err1) + } + go func(i int) { + // first + service, err2 := watch.Next() + if (err2 != nil) != tt.wantErr { + t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("GetService() got = %v", service) + return + } + // instance changes + service, err2 = watch.Next() + if i == 9 { + return + } + if (err2 != nil) != tt.wantErr { + t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("GetService() got = %v", service) + return + } + if !reflect.DeepEqual(service, tt.want) { + t.Errorf("GetService() got = %v, want %v", service, tt.want) + } + // t.Logf("service:%v, i:%d", service, i) + }(i) + if i == 9 { + time.Sleep(time.Second * 3) + // become idle, close watcher + err1 = watch.Stop() + if err1 != nil { + t.Errorf("watch stop err:%v", err) + } + } + } + time.Sleep(2 * time.Second) + change := tt.args.instance + change.Version = "v0.0.2" + err = r.Register(tt.args.ctx, change) + if err != nil { + t.Error(err) + } + time.Sleep(1 * time.Second) + }) + } +} + +func TestRegistry_IdleAndWatch2(t *testing.T) { + addr := fmt.Sprintf("%s:9091", getIntranetIP()) + + time.Sleep(time.Millisecond * 100) + cli, err := api.NewClient(&api.Config{Address: "127.0.0.1:8500", WaitTime: 2 * time.Second}) + if err != nil { + t.Fatalf("create consul client failed: %v", err) + } + + instance1 := ®istry.ServiceInstance{ + ID: "1", + Name: "server-1", + Version: "v0.0.1", + Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)}, + } + type args struct { + ctx context.Context + opts []Option + instance *registry.ServiceInstance + } + + tests := []struct { + name string + args args + want []*registry.ServiceInstance + wantErr bool + }{ + { + name: "all clients are idle, create a new one", + args: args{ + ctx: context.Background(), + instance: instance1, + opts: []Option{ + WithHealthCheck(false), + }, + }, + want: []*registry.ServiceInstance{instance1}, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := New(cli, tt.args.opts...) + + err = r.Register(tt.args.ctx, tt.args.instance) + if err != nil { + t.Error(err) + } + defer func() { + err = r.Deregister(tt.args.ctx, tt.args.instance) + if err != nil { + t.Error(err) + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + for i := 0; i < 10; i++ { + stopCtx, stopCancel := context.WithCancel(ctx) + watch, err1 := r.Watch(context.Background(), tt.args.instance.Name) + if err1 != nil { + t.Error(err1) + } + go func(i int) { + // first + service, err2 := watch.Next() + if (err2 != nil) != tt.wantErr { + t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("GetService() got = %v", service) + return + } + _, err2 = watch.Next() + if err2 == nil { + t.Errorf("watch exit exception:%d ", i) + } + }(i) + go func() { + select { + case <-stopCtx.Done(): + err1 = watch.Stop() + if err1 != nil { + t.Errorf("watch stop err:%v", err) + } + return + case <-time.After(time.Minute): + stopCancel() + err1 = watch.Stop() + if err1 != nil { + t.Errorf("watch stop err:%v", err) + } + return + } + }() + } + time.Sleep(time.Second * 3) + cancel() + time.Sleep(time.Second * 2) + // Everything is idle. Add new watch. + watchCtx := context.Background() + watch, err := r.Watch(watchCtx, tt.args.instance.Name) + if err != nil { + t.Error(err) + } + service, err := watch.Next() + if (err != nil) != tt.wantErr { + t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("GetService() got = %v", service) + return + } + if !reflect.DeepEqual(service, tt.want) { + t.Errorf("GetService() got = %v, want %v", service, tt.want) + } + }) + } +} + func getIntranetIP() string { addrs, err := net.InterfaceAddrs() if err != nil { diff --git a/contrib/registry/consul/service.go b/contrib/registry/consul/service.go index a0a166094..6567b7726 100644 --- a/contrib/registry/consul/service.go +++ b/contrib/registry/consul/service.go @@ -1,6 +1,7 @@ package consul import ( + "context" "sync" "sync/atomic" @@ -12,6 +13,9 @@ type serviceSet struct { watcher map[*watcher]struct{} services *atomic.Value lock sync.RWMutex + + ctx context.Context + cancel context.CancelFunc } func (s *serviceSet) broadcast(ss []*registry.ServiceInstance) { diff --git a/contrib/registry/consul/watcher.go b/contrib/registry/consul/watcher.go index 0a5d35751..f84caeb40 100644 --- a/contrib/registry/consul/watcher.go +++ b/contrib/registry/consul/watcher.go @@ -36,5 +36,9 @@ func (w *watcher) Stop() error { w.set.lock.Lock() defer w.set.lock.Unlock() delete(w.set.watcher, w) + // close resolve + if len(w.set.watcher) == 0 { + w.set.cancel() + } return nil }