From f80f0eb38e66cd2ce9979bc2fd5dd532e20b6461 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 3 Jun 2019 15:55:47 +0100 Subject: [PATCH] fix broken pipe error --- server/rpc_router.go | 18 ++++-------------- server/rpc_stream.go | 14 ++++++++++++-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/server/rpc_router.go b/server/rpc_router.go index af29d4b8..1ffb4963 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -214,9 +214,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, // declare a local error to see if we errored out already // keep track of the type, to make sure we return // the same one consistently - var lastError error - - stream := &rpcStream{ + rawStream := &rpcStream{ context: ctx, codec: cc.(codec.Codec), request: r, @@ -229,9 +227,8 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, if err := returnValues[0].Interface(); err != nil { // the function returned an error, we use that return err.(error) - } else if lastError != nil { - // we had an error inside sendReply, we use that - return lastError + } else if serr := rawStream.Error(); serr == io.EOF || serr == io.ErrUnexpectedEOF { + return nil } else { // no error, we send the special EOS error return lastStreamResponseError @@ -242,14 +239,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex, r.stream = true // execute handler - if err := fn(ctx, r, stream); err != nil { - return err - } - - // this is the last packet, we don't do anything with - // the error here (well sendStreamResponse will log it - // already) - return router.sendResponse(sending, req, nil, cc, true) + return fn(ctx, r, rawStream) } func (m *methodType) prepareContext(ctx context.Context) reflect.Value { diff --git a/server/rpc_stream.go b/server/rpc_stream.go index dc49b18a..185f1ff9 100644 --- a/server/rpc_stream.go +++ b/server/rpc_stream.go @@ -38,7 +38,11 @@ func (r *rpcStream) Send(msg interface{}) error { Type: codec.Response, } - return r.codec.Write(&resp, msg) + if err := r.codec.Write(&resp, msg); err != nil { + r.err = err + } + + return nil } func (r *rpcStream) Recv(msg interface{}) error { @@ -51,12 +55,18 @@ func (r *rpcStream) Recv(msg interface{}) error { if err := r.codec.ReadHeader(req, req.Type); err != nil { // discard body r.codec.ReadBody(nil) + r.err = err return err } // we need to stay up to date with sequence numbers r.id = req.Id - return r.codec.ReadBody(msg) + if err := r.codec.ReadBody(msg); err != nil { + r.err = err + return err + } + + return nil } func (r *rpcStream) Error() error {