1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-03-17 21:07:54 +02:00

fix: When grpc ends idle mode, it needs to continue to obtain the latest instance of the service (#3162)

* Add test cases

* Delete serviceSet when serviceSet has no watcher

* The context of resolve is controlled independently

* resolve use set context

* Remove test declare and do not use it

* gofmt

* golangci-lint fix
This commit is contained in:
gangan liu 2024-03-20 09:52:21 +08:00 committed by GitHub
parent cd9857fc59
commit 1fdaabbd48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 233 additions and 3 deletions

View File

@ -190,6 +190,7 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
services: &atomic.Value{},
serviceName: name,
}
set.ctx, set.cancel = context.WithCancel(context.Background())
r.registry[name] = set
}
@ -208,10 +209,8 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
// otherwise the initial data may be blocked forever during the watch.
w.event <- struct{}{}
}
if !ok {
err := r.resolve(ctx, set)
if err != nil {
if err := r.resolve(set.ctx, set); err != nil {
return nil, err
}
}
@ -249,6 +248,9 @@ func (r *Registry) resolve(ctx context.Context, ss *serviceSet) error {
}
idx = tmpIdx
case <-ctx.Done():
r.lock.Lock()
delete(r.registry, ss.serviceName)
r.lock.Unlock()
return
}
}

View File

@ -413,6 +413,226 @@ func TestRegistry_Watch(t *testing.T) {
}
}
func TestRegistry_IdleAndWatch(t *testing.T) {
addr := fmt.Sprintf("%s:9091", getIntranetIP())
time.Sleep(time.Millisecond * 100)
cli, err := api.NewClient(&api.Config{Address: "127.0.0.1:8500", WaitTime: 2 * time.Second})
if err != nil {
t.Fatalf("create consul client failed: %v", err)
}
r := New(cli, []Option{
WithHealthCheck(false),
}...)
instance1 := &registry.ServiceInstance{
ID: "1",
Name: "server-1",
Version: "v0.0.1",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}
type args struct {
ctx context.Context
instance *registry.ServiceInstance
}
tests := []struct {
name string
args args
want []*registry.ServiceInstance
wantErr bool
}{
{
name: "many client, one idle",
args: args{
ctx: context.Background(),
instance: instance1,
},
want: []*registry.ServiceInstance{instance1},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err = r.Register(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}
defer func() {
err = r.Deregister(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}
}()
for i := 0; i < 10; i++ {
watch, err1 := r.Watch(context.Background(), tt.args.instance.Name)
if err1 != nil {
t.Error(err1)
}
go func(i int) {
// first
service, err2 := watch.Next()
if (err2 != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
// instance changes
service, err2 = watch.Next()
if i == 9 {
return
}
if (err2 != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
if !reflect.DeepEqual(service, tt.want) {
t.Errorf("GetService() got = %v, want %v", service, tt.want)
}
// t.Logf("service:%v, i:%d", service, i)
}(i)
if i == 9 {
time.Sleep(time.Second * 3)
// become idle, close watcher
err1 = watch.Stop()
if err1 != nil {
t.Errorf("watch stop err:%v", err)
}
}
}
time.Sleep(2 * time.Second)
change := tt.args.instance
change.Version = "v0.0.2"
err = r.Register(tt.args.ctx, change)
if err != nil {
t.Error(err)
}
time.Sleep(1 * time.Second)
})
}
}
func TestRegistry_IdleAndWatch2(t *testing.T) {
addr := fmt.Sprintf("%s:9091", getIntranetIP())
time.Sleep(time.Millisecond * 100)
cli, err := api.NewClient(&api.Config{Address: "127.0.0.1:8500", WaitTime: 2 * time.Second})
if err != nil {
t.Fatalf("create consul client failed: %v", err)
}
instance1 := &registry.ServiceInstance{
ID: "1",
Name: "server-1",
Version: "v0.0.1",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}
type args struct {
ctx context.Context
opts []Option
instance *registry.ServiceInstance
}
tests := []struct {
name string
args args
want []*registry.ServiceInstance
wantErr bool
}{
{
name: "all clients are idle, create a new one",
args: args{
ctx: context.Background(),
instance: instance1,
opts: []Option{
WithHealthCheck(false),
},
},
want: []*registry.ServiceInstance{instance1},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := New(cli, tt.args.opts...)
err = r.Register(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}
defer func() {
err = r.Deregister(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 10; i++ {
stopCtx, stopCancel := context.WithCancel(ctx)
watch, err1 := r.Watch(context.Background(), tt.args.instance.Name)
if err1 != nil {
t.Error(err1)
}
go func(i int) {
// first
service, err2 := watch.Next()
if (err2 != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
_, err2 = watch.Next()
if err2 == nil {
t.Errorf("watch exit exception:%d ", i)
}
}(i)
go func() {
select {
case <-stopCtx.Done():
err1 = watch.Stop()
if err1 != nil {
t.Errorf("watch stop err:%v", err)
}
return
case <-time.After(time.Minute):
stopCancel()
err1 = watch.Stop()
if err1 != nil {
t.Errorf("watch stop err:%v", err)
}
return
}
}()
}
time.Sleep(time.Second * 3)
cancel()
time.Sleep(time.Second * 2)
// Everything is idle. Add new watch.
watchCtx := context.Background()
watch, err := r.Watch(watchCtx, tt.args.instance.Name)
if err != nil {
t.Error(err)
}
service, err := watch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
if !reflect.DeepEqual(service, tt.want) {
t.Errorf("GetService() got = %v, want %v", service, tt.want)
}
})
}
}
func getIntranetIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {

View File

@ -1,6 +1,7 @@
package consul
import (
"context"
"sync"
"sync/atomic"
@ -12,6 +13,9 @@ type serviceSet struct {
watcher map[*watcher]struct{}
services *atomic.Value
lock sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
func (s *serviceSet) broadcast(ss []*registry.ServiceInstance) {

View File

@ -36,5 +36,9 @@ func (w *watcher) Stop() error {
w.set.lock.Lock()
defer w.set.lock.Unlock()
delete(w.set.watcher, w)
// close resolve
if len(w.set.watcher) == 0 {
w.set.cancel()
}
return nil
}