diff --git a/client/rpc_codec.go b/client/rpc_codec.go index 58c24fd2..01fe9208 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -43,9 +43,7 @@ type readWriteCloser struct { type clientCodec interface { WriteRequest(*request, interface{}) error - ReadResponseHeader(*response) error - ReadResponseBody(interface{}) error - + ReadResponse(*response, interface{}) error Close() error } @@ -129,28 +127,33 @@ func (c *rpcCodec) WriteRequest(req *request, body interface{}) error { return nil } -func (c *rpcCodec) ReadResponseHeader(r *response) error { +func (c *rpcCodec) ReadResponse(r *response, b interface{}) error { var m transport.Message if err := c.client.Recv(&m); err != nil { return errors.InternalServerError("go.micro.client.transport", err.Error()) } c.buf.rbuf.Reset() c.buf.rbuf.Write(m.Body) + var me codec.Message + // set headers + me.Header = m.Header + + // read header err := c.codec.ReadHeader(&me, codec.Response) r.ServiceMethod = me.Method r.Seq = me.Id r.Error = me.Error + if err != nil { return errors.InternalServerError("go.micro.client.codec", err.Error()) } - return nil -} -func (c *rpcCodec) ReadResponseBody(b interface{}) error { + // read body if err := c.codec.ReadBody(b); err != nil { return errors.InternalServerError("go.micro.client.codec", err.Error()) } + return nil } diff --git a/client/rpc_stream.go b/client/rpc_stream.go index e5a27033..eda78d47 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -68,7 +68,8 @@ func (r *rpcStream) Recv(msg interface{}) error { } var resp response - if err := r.codec.ReadResponseHeader(&resp); err != nil { + + if err := r.codec.ReadResponse(&resp, msg); err != nil { if err == io.EOF && !r.isClosed() { r.err = io.ErrUnexpectedEOF return io.ErrUnexpectedEOF @@ -87,13 +88,6 @@ func (r *rpcStream) Recv(msg interface{}) error { } else { r.err = io.EOF } - if err := r.codec.ReadResponseBody(nil); err != nil { - r.err = err - } - default: - if err := r.codec.ReadResponseBody(msg); err != nil { - r.err = err - } } return r.err