1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

fix broken pipe error

This commit is contained in:
Asim Aslam 2019-06-03 15:55:47 +01:00
parent 850f8bafdf
commit f80f0eb38e
2 changed files with 16 additions and 16 deletions

View File

@ -214,9 +214,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
// declare a local error to see if we errored out already // declare a local error to see if we errored out already
// keep track of the type, to make sure we return // keep track of the type, to make sure we return
// the same one consistently // the same one consistently
var lastError error rawStream := &rpcStream{
stream := &rpcStream{
context: ctx, context: ctx,
codec: cc.(codec.Codec), codec: cc.(codec.Codec),
request: r, request: r,
@ -229,9 +227,8 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
if err := returnValues[0].Interface(); err != nil { if err := returnValues[0].Interface(); err != nil {
// the function returned an error, we use that // the function returned an error, we use that
return err.(error) return err.(error)
} else if lastError != nil { } else if serr := rawStream.Error(); serr == io.EOF || serr == io.ErrUnexpectedEOF {
// we had an error inside sendReply, we use that return nil
return lastError
} else { } else {
// no error, we send the special EOS error // no error, we send the special EOS error
return lastStreamResponseError return lastStreamResponseError
@ -242,14 +239,7 @@ func (s *service) call(ctx context.Context, router *router, sending *sync.Mutex,
r.stream = true r.stream = true
// execute handler // execute handler
if err := fn(ctx, r, stream); err != nil { return fn(ctx, r, rawStream)
return err
}
// this is the last packet, we don't do anything with
// the error here (well sendStreamResponse will log it
// already)
return router.sendResponse(sending, req, nil, cc, true)
} }
func (m *methodType) prepareContext(ctx context.Context) reflect.Value { func (m *methodType) prepareContext(ctx context.Context) reflect.Value {

View File

@ -38,7 +38,11 @@ func (r *rpcStream) Send(msg interface{}) error {
Type: codec.Response, Type: codec.Response,
} }
return r.codec.Write(&resp, msg) if err := r.codec.Write(&resp, msg); err != nil {
r.err = err
}
return nil
} }
func (r *rpcStream) Recv(msg interface{}) error { func (r *rpcStream) Recv(msg interface{}) error {
@ -51,12 +55,18 @@ func (r *rpcStream) Recv(msg interface{}) error {
if err := r.codec.ReadHeader(req, req.Type); err != nil { if err := r.codec.ReadHeader(req, req.Type); err != nil {
// discard body // discard body
r.codec.ReadBody(nil) r.codec.ReadBody(nil)
r.err = err
return err return err
} }
// we need to stay up to date with sequence numbers // we need to stay up to date with sequence numbers
r.id = req.Id r.id = req.Id
return r.codec.ReadBody(msg) if err := r.codec.ReadBody(msg); err != nil {
r.err = err
return err
}
return nil
} }
func (r *rpcStream) Error() error { func (r *rpcStream) Error() error {