From 26cf7c80ad1ef5c49f53730e8cca62b85923fa32 Mon Sep 17 00:00:00 2001 From: libi <7769922+libi@users.noreply.github.com> Date: Sat, 6 Mar 2021 18:06:20 +0800 Subject: [PATCH] fix discovery resolver watch goroutine leak (#750) * fix discovery resolver watch goroutine leak --- cmd/protoc-gen-go-errors/go.sum | 1 + transport/grpc/resolver/discovery/builder.go | 9 ++- transport/grpc/resolver/discovery/resolver.go | 12 ++++ .../grpc/resolver/discovery/resolver_test.go | 58 +++++++++++++++++++ 4 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 transport/grpc/resolver/discovery/resolver_test.go diff --git a/cmd/protoc-gen-go-errors/go.sum b/cmd/protoc-gen-go-errors/go.sum index 4a2185cdf..616ef4ee7 100644 --- a/cmd/protoc-gen-go-errors/go.sum +++ b/cmd/protoc-gen-go-errors/go.sum @@ -13,6 +13,7 @@ github.com/go-kratos/kratos/v2 v2.0.0-20210217083752-d86d233d93ce h1:LfOsLN9s8tA github.com/go-kratos/kratos/v2 v2.0.0-20210217083752-d86d233d93ce/go.mod h1:oLvFyDBJkkWN8TPqb+NmpvRrSy9uM/K+XQubVRc11a8= github.com/go-kratos/kratos/v2 v2.0.0-alpha4 h1:MKkkSZigSMg7Kx8HzrobZ93zlgmi0tAKWM9bMf6YTpU= github.com/go-kratos/kratos/v2 v2.0.0-alpha4/go.mod h1:oLvFyDBJkkWN8TPqb+NmpvRrSy9uM/K+XQubVRc11a8= +github.com/go-kratos/kratos/v2 v2.0.0-alpha5/go.mod h1:oLvFyDBJkkWN8TPqb+NmpvRrSy9uM/K+XQubVRc11a8= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/transport/grpc/resolver/discovery/builder.go b/transport/grpc/resolver/discovery/builder.go index aabe0e6d3..93f6aa5a3 100644 --- a/transport/grpc/resolver/discovery/builder.go +++ b/transport/grpc/resolver/discovery/builder.go @@ -42,10 +42,13 @@ func (d *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res if err != nil { return nil, err } + ctx, cancel := context.WithCancel(context.Background()) r := &discoveryResolver{ - w: w, - cc: cc, - log: log.NewHelper("grpc/resolver/discovery", d.logger), + w: w, + cc: cc, + log: log.NewHelper("grpc/resolver/discovery", d.logger), + ctx: ctx, + cancel: cancel, } go r.watch() return r, nil diff --git a/transport/grpc/resolver/discovery/resolver.go b/transport/grpc/resolver/discovery/resolver.go index af0ec5dcb..d26c0ae71 100644 --- a/transport/grpc/resolver/discovery/resolver.go +++ b/transport/grpc/resolver/discovery/resolver.go @@ -1,6 +1,7 @@ package discovery import ( + "context" "net/url" "time" @@ -14,10 +15,20 @@ type discoveryResolver struct { w registry.Watcher cc resolver.ClientConn log *log.Helper + + ctx context.Context + cancel context.CancelFunc } func (r *discoveryResolver) watch() { for { + select { + case <-r.ctx.Done(): + //goroutine exit + return + default: + } + ins, err := r.w.Next() if err != nil { r.log.Errorf("Failed to watch discovery endpoint: %v", err) @@ -47,6 +58,7 @@ func (r *discoveryResolver) update(ins []*registry.ServiceInstance) { } func (r *discoveryResolver) Close() { + r.cancel() r.w.Close() } diff --git a/transport/grpc/resolver/discovery/resolver_test.go b/transport/grpc/resolver/discovery/resolver_test.go new file mode 100644 index 000000000..01ea932fa --- /dev/null +++ b/transport/grpc/resolver/discovery/resolver_test.go @@ -0,0 +1,58 @@ +package discovery + +import ( + "context" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/registry" + "google.golang.org/grpc/resolver" + "testing" + "time" +) + +type testClientConn struct { + resolver.ClientConn // For unimplemented functions + te *testing.T +} + +func (t *testClientConn) UpdateState(s resolver.State) { + t.te.Log("UpdateState", s) +} + +type testWatch struct { +} + +func (m *testWatch) Next() ([]*registry.ServiceInstance, error) { + time.Sleep(time.Millisecond * 200) + ins := []*registry.ServiceInstance{ + ®istry.ServiceInstance{ + ID: "mock_ID", + Name: "mock_Name", + Version: "mock_Version", + }, + } + return ins, nil +} + +// Watch creates a watcher according to the service name. +func (m *testWatch) Close() error { + return nil +} + +func TestWatch(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + r := &discoveryResolver{ + w: &testWatch{}, + cc: &testClientConn{te: t}, + log: log.NewHelper("grpc/resolver/discovery", log.DefaultLogger), + ctx: ctx, + cancel: cancel, + } + go func() { + time.Sleep(time.Second * 2) + r.Close() + }() + r.watch() + + t.Log("watch goroutine exited after 2 second") +}