From 9df19e826ef83ab1ef4b7030db7ee261b10fa4d7 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sun, 19 Jan 2020 22:53:56 +0000 Subject: [PATCH] cancel stream --- client/grpc/grpc.go | 6 +++++- client/grpc/stream.go | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index f00a12b0..1d4eea04 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -212,7 +212,10 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client grpcCallOptions = append(grpcCallOptions, opts...) } - st, err := cc.NewStream(ctx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) + // create a new cancelling context + newCtx, cancel := context.WithCancel(ctx) + + st, err := cc.NewStream(newCtx, desc, methodToGRPC(req.Service(), req.Endpoint()), grpcCallOptions...) if err != nil { return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error creating stream: %v", err)) } @@ -240,6 +243,7 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client response: rsp, stream: st, conn: cc, + cancel: cancel, }, nil } diff --git a/client/grpc/stream.go b/client/grpc/stream.go index ea9da43f..e5ebd156 100644 --- a/client/grpc/stream.go +++ b/client/grpc/stream.go @@ -19,6 +19,7 @@ type grpcStream struct { request client.Request response client.Response context context.Context + cancel func() } func (g *grpcStream) Context() context.Context { @@ -79,7 +80,8 @@ func (g *grpcStream) Close() error { if g.closed { return nil } - + // cancel the context + g.cancel() g.closed = true g.stream.CloseSend() return g.conn.Close()