From 6ae48c9f29b3ebd9a039ceaaa54a3c892c50e9a7 Mon Sep 17 00:00:00 2001 From: Asim Date: Thu, 17 Dec 2015 20:37:35 +0000 Subject: [PATCH] checkpoint --- client/client.go | 36 ++++--- client/options.go | 12 +++ client/rpc_client.go | 45 +++++---- client/rpc_request.go | 14 ++- client/rpc_stream.go | 101 +++++++++++++++++++- client/rpcplus_client.go | 48 +--------- examples/client/main.go | 12 ++- examples/server/proto/example/example.pb.go | 19 ++-- server/rpc_stream.go | 79 +++++++++++++++ server/server.go | 16 +++- server/server_wrapper.go | 2 + 11 files changed, 284 insertions(+), 100 deletions(-) create mode 100644 server/rpc_stream.go diff --git a/client/client.go b/client/client.go index ad117412..fdff08f7 100644 --- a/client/client.go +++ b/client/client.go @@ -27,13 +27,13 @@ import ( type Client interface { NewPublication(topic string, msg interface{}) Publication - NewRequest(service, method string, req interface{}) Request - NewProtoRequest(service, method string, req interface{}) Request - NewJsonRequest(service, method string, req interface{}) Request + NewRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request + NewProtoRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request + NewJsonRequest(service, method string, req interface{}, reqOpts ...RequestOption) Request Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error CallRemote(ctx context.Context, addr string, req Request, rsp interface{}, opts ...CallOption) error - Stream(ctx context.Context, req Request, rspChan interface{}, opts ...CallOption) (Streamer, error) - StreamRemote(ctx context.Context, addr string, req Request, rspChan interface{}, opts ...CallOption) (Streamer, error) + Stream(ctx context.Context, req Request, opts ...CallOption) (Streamer, error) + StreamRemote(ctx context.Context, addr string, req Request, opts ...CallOption) (Streamer, error) Publish(ctx context.Context, p Publication, opts ...PublishOption) error } @@ -48,10 +48,15 @@ type Request interface { Method() string ContentType() string Request() interface{} + // indicates whether the request will be a streaming one rather than unary + Stream() bool } type Streamer interface { + Context() context.Context Request() Request + Send(interface{}) error + Recv(interface{}) error Error() error Close() error } @@ -59,6 +64,7 @@ type Streamer interface { type Option func(*options) type CallOption func(*callOptions) type PublishOption func(*publishOptions) +type RequestOption func(*requestOptions) var ( DefaultClient Client = newRpcClient() @@ -76,13 +82,13 @@ func CallRemote(ctx context.Context, address string, request Request, response i // Creates a streaming connection with a service and returns responses on the // channel passed in. It's upto the user to close the streamer. -func Stream(ctx context.Context, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { - return DefaultClient.Stream(ctx, request, responseChan, opts...) +func Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { + return DefaultClient.Stream(ctx, request, opts...) } // Creates a streaming connection to the address specified. -func StreamRemote(ctx context.Context, address string, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { - return DefaultClient.StreamRemote(ctx, address, request, responseChan, opts...) +func StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { + return DefaultClient.StreamRemote(ctx, address, request, opts...) } // Publishes a publication using the default client. Using the underlying broker @@ -103,16 +109,16 @@ func NewPublication(topic string, message interface{}) Publication { // Creates a new request using the default client. Content Type will // be set to the default within options and use the appropriate codec -func NewRequest(service, method string, request interface{}) Request { - return DefaultClient.NewRequest(service, method, request) +func NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { + return DefaultClient.NewRequest(service, method, request, reqOpts...) } // Creates a new protobuf request using the default client -func NewProtoRequest(service, method string, request interface{}) Request { - return DefaultClient.NewProtoRequest(service, method, request) +func NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { + return DefaultClient.NewProtoRequest(service, method, request, reqOpts...) } // Creates a new json request using the default client -func NewJsonRequest(service, method string, request interface{}) Request { - return DefaultClient.NewJsonRequest(service, method, request) +func NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { + return DefaultClient.NewJsonRequest(service, method, request, reqOpts...) } diff --git a/client/options.go b/client/options.go index ca0f6fc5..389b34b1 100644 --- a/client/options.go +++ b/client/options.go @@ -24,6 +24,10 @@ type callOptions struct { type publishOptions struct{} +type requestOptions struct { + stream bool +} + // Broker to be used for pub/sub func Broker(b broker.Broker) Option { return func(o *options) { @@ -80,3 +84,11 @@ func WithSelectOption(so selector.SelectOption) CallOption { o.selectOptions = append(o.selectOptions, so) } } + +// Request Options + +func StreamingRequest() RequestOption { + return func(o *requestOptions) { + o.stream = true + } +} diff --git a/client/rpc_client.go b/client/rpc_client.go index bf2abacf..fbeaf921 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -112,7 +112,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r return client.Close() } -func (r *rpcClient) stream(ctx context.Context, address string, request Request, responseChan interface{}) (Streamer, error) { +func (r *rpcClient) stream(ctx context.Context, address string, req Request) (Streamer, error) { msg := &transport.Message{ Header: make(map[string]string), } @@ -124,9 +124,9 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request, } } - msg.Header["Content-Type"] = request.ContentType() + msg.Header["Content-Type"] = req.ContentType() - cf, err := r.newCodec(request.ContentType()) + cf, err := r.newCodec(req.ContentType()) if err != nil { return nil, errors.InternalServerError("go.micro.client", err.Error()) } @@ -136,13 +136,22 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request, return nil, errors.InternalServerError("go.micro.client", fmt.Sprintf("Error sending request: %v", err)) } - client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) - call := client.StreamGo(request.Service(), request.Method(), request.Request(), responseChan) + codec := newRpcPlusCodec(msg, c, cf) + + err = codec.WriteRequest(&request{ + Service: req.Service(), + ServiceMethod: req.Method(), + Seq: 0, + }, req.Request()) + + if err != nil { + return nil, errors.InternalServerError("go.micro.client", err.Error()) + } return &rpcStream{ - request: request, - call: call, - client: client, + context: ctx, + request: req, + codec: codec, }, nil } @@ -180,11 +189,11 @@ func (r *rpcClient) Call(ctx context.Context, request Request, response interfac return err } -func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { - return r.stream(ctx, address, request, responseChan) +func (r *rpcClient) StreamRemote(ctx context.Context, address string, request Request, opts ...CallOption) (Streamer, error) { + return r.stream(ctx, address, request) } -func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan interface{}, opts ...CallOption) (Streamer, error) { +func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Streamer, error) { var copts callOptions for _, opt := range opts { opt(&copts) @@ -209,7 +218,7 @@ func (r *rpcClient) Stream(ctx context.Context, request Request, responseChan in address = fmt.Sprintf("%s:%d", address, node.Port) } - stream, err := r.stream(ctx, address, request, responseChan) + stream, err := r.stream(ctx, address, request) r.opts.selector.Mark(request.Service(), node, err) return stream, err } @@ -247,14 +256,14 @@ func (r *rpcClient) NewPublication(topic string, message interface{}) Publicatio func (r *rpcClient) NewProtoPublication(topic string, message interface{}) Publication { return newRpcPublication(topic, message, "application/octet-stream") } -func (r *rpcClient) NewRequest(service, method string, request interface{}) Request { - return newRpcRequest(service, method, request, r.opts.contentType) +func (r *rpcClient) NewRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { + return newRpcRequest(service, method, request, r.opts.contentType, reqOpts...) } -func (r *rpcClient) NewProtoRequest(service, method string, request interface{}) Request { - return newRpcRequest(service, method, request, "application/octet-stream") +func (r *rpcClient) NewProtoRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { + return newRpcRequest(service, method, request, "application/octet-stream", reqOpts...) } -func (r *rpcClient) NewJsonRequest(service, method string, request interface{}) Request { - return newRpcRequest(service, method, request, "application/json") +func (r *rpcClient) NewJsonRequest(service, method string, request interface{}, reqOpts ...RequestOption) Request { + return newRpcRequest(service, method, request, "application/json", reqOpts...) } diff --git a/client/rpc_request.go b/client/rpc_request.go index eb799193..5a5ec0df 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -5,14 +5,22 @@ type rpcRequest struct { method string contentType string request interface{} + opts requestOptions } -func newRpcRequest(service, method string, request interface{}, contentType string) Request { +func newRpcRequest(service, method string, request interface{}, contentType string, reqOpts ...RequestOption) Request { + var opts requestOptions + + for _, o := range reqOpts { + o(&opts) + } + return &rpcRequest{ service: service, method: method, request: request, contentType: contentType, + opts: opts, } } @@ -31,3 +39,7 @@ func (r *rpcRequest) Method() string { func (r *rpcRequest) Request() interface{} { return r.request } + +func (r *rpcRequest) Stream() bool { + return r.opts.stream +} diff --git a/client/rpc_stream.go b/client/rpc_stream.go index 9f64c588..e118057a 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -1,19 +1,112 @@ package client +import ( + "errors" + "io" + "log" + "sync" + + "golang.org/x/net/context" +) + type rpcStream struct { + sync.RWMutex + seq uint64 + closed bool + err error request Request - call *call - client *client + codec clientCodec + context context.Context +} + +func (r *rpcStream) Context() context.Context { + return r.context } func (r *rpcStream) Request() Request { return r.request } +func (r *rpcStream) Send(msg interface{}) error { + r.Lock() + defer r.Unlock() + + if r.closed { + r.err = errShutdown + return errShutdown + } + + seq := r.seq + r.seq++ + + req := request{ + Service: r.request.Service(), + Seq: seq, + ServiceMethod: r.request.Method(), + } + + if err := r.codec.WriteRequest(&req, msg); err != nil { + r.err = err + return err + } + + return nil +} + +func (r *rpcStream) Recv(msg interface{}) error { + r.Lock() + defer r.Unlock() + + if r.closed { + r.err = errShutdown + return errShutdown + } + + var resp response + if err := r.codec.ReadResponseHeader(&resp); err != nil { + if err == io.EOF && !r.closed { + r.err = io.ErrUnexpectedEOF + return io.ErrUnexpectedEOF + } + r.err = err + return err + } + + switch { + case len(resp.Error) > 0: + // We've got an error response. Give this to the request; + // any subsequent requests will get the ReadResponseBody + // error if there is one. + if resp.Error != lastStreamResponseError { + r.err = serverError(resp.Error) + } else { + r.err = io.EOF + } + if err := r.codec.ReadResponseBody(nil); err != nil { + r.err = errors.New("reading error payload: " + err.Error()) + } + default: + if err := r.codec.ReadResponseBody(msg); err != nil { + r.err = errors.New("reading body " + err.Error()) + } + } + + if r.err != nil && r.err != io.EOF && !r.closed { + log.Println("rpc: client protocol error:", r.err) + } + + return r.err +} + func (r *rpcStream) Error() error { - return r.call.Error + r.RLock() + defer r.RUnlock() + return r.err } func (r *rpcStream) Close() error { - return r.client.Close() + r.Lock() + defer r.Unlock() + r.closed = true + return r.codec.Close() } diff --git a/client/rpcplus_client.go b/client/rpcplus_client.go index f119cc3e..5325474d 100644 --- a/client/rpcplus_client.go +++ b/client/rpcplus_client.go @@ -8,7 +8,6 @@ import ( "errors" "io" "log" - "reflect" "sync" "github.com/youtube/vitess/go/trace" @@ -38,7 +37,6 @@ type call struct { Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming). Error error // After completion, the error status. Done chan *call // Strobes when call is complete (nil for streaming RPCs) - Stream bool // True for a streaming RPC call, false otherwise Subseq uint64 // The next expected subseq in the packets } @@ -145,28 +143,12 @@ func (client *client) input() { // We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one. - if !(call.Stream && resp.Error == lastStreamResponseError) { - call.Error = serverError(resp.Error) - } + call.Error = serverError(resp.Error) err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error payload: " + err.Error()) } client.done(seq) - case call.Stream: - // call.Reply is a chan *T2 - // we need to create a T2 and get a *T2 back - value := reflect.New(reflect.TypeOf(call.Reply).Elem().Elem()).Interface() - err = client.codec.ReadResponseBody(value) - if err != nil { - call.Error = errors.New("reading body " + err.Error()) - } else { - // writing on the channel could block forever. For - // instance, if a client calls 'close', this might block - // forever. the current suggestion is for the - // client to drain the receiving channel in that case - reflect.ValueOf(call.Reply).Send(reflect.ValueOf(value)) - } default: err = client.codec.ReadResponseBody(call.Reply) if err != nil { @@ -203,12 +185,6 @@ func (client *client) done(seq uint64) { } func (call *call) done() { - if call.Stream { - // need to close the channel. client won't be able to read any more. - reflect.ValueOf(call.Reply).Close() - return - } - select { case call.Done <- call: // ok @@ -270,28 +246,6 @@ func (client *client) Go(ctx context.Context, service, serviceMethod string, arg return cal } -// StreamGo invokes the streaming function asynchronously. It returns the call structure representing -// the invocation. -func (client *client) StreamGo(service string, serviceMethod string, args interface{}, replyStream interface{}) *call { - // first check the replyStream object is a stream of pointers to a data structure - typ := reflect.TypeOf(replyStream) - // FIXME: check the direction of the channel, maybe? - if typ.Kind() != reflect.Chan || typ.Elem().Kind() != reflect.Ptr { - log.Panic("rpc: replyStream is not a channel of pointers") - return nil - } - - call := new(call) - call.Service = service - call.ServiceMethod = serviceMethod - call.Args = args - call.Reply = replyStream - call.Stream = true - call.Subseq = 0 - client.send(call) - return call -} - // call invokes the named function, waits for it to complete, and returns its error status. func (client *client) Call(ctx context.Context, service string, serviceMethod string, args interface{}, reply interface{}) error { call := <-client.Go(ctx, service, serviceMethod, args, reply, make(chan *call, 1)).Done diff --git a/examples/client/main.go b/examples/client/main.go index 369553a8..ff93690b 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -60,15 +60,19 @@ func stream() { Count: int64(10), }) - rspChan := make(chan *example.StreamingResponse, 10) - - stream, err := client.Stream(context.Background(), req, rspChan) + stream, err := client.Stream(context.Background(), req) if err != nil { fmt.Println("err:", err) return } - for rsp := range rspChan { + for stream.Error() == nil { + rsp := &example.StreamingResponse{} + err := stream.Recv(rsp) + if err != nil { + fmt.Println(err) + break + } fmt.Println("Stream: rsp:", rsp.Count) } diff --git a/examples/server/proto/example/example.pb.go b/examples/server/proto/example/example.pb.go index 3d7e141d..5c7faae9 100644 --- a/examples/server/proto/example/example.pb.go +++ b/examples/server/proto/example/example.pb.go @@ -127,30 +127,29 @@ func (c *exampleClient) Call(ctx context.Context, in *Request, opts ...client.Ca func (c *exampleClient) Stream(ctx context.Context, in *StreamingRequest, opts ...client.CallOption) (Example_StreamClient, error) { req := c.c.NewRequest(c.serviceName, "Example.Stream", in) - outCh := make(chan *StreamingResponse) - stream, err := c.c.Stream(ctx, req, outCh, opts...) + stream, err := c.c.Stream(ctx, req, opts...) if err != nil { return nil, err } - return &exampleStreamClient{stream, outCh}, nil + return &exampleStreamClient{stream}, nil } type Example_StreamClient interface { - Next() (*StreamingResponse, error) + RecvMsg() (*StreamingResponse, error) client.Streamer } type exampleStreamClient struct { client.Streamer - next chan *StreamingResponse } -func (x *exampleStreamClient) Next() (*StreamingResponse, error) { - out, ok := <-x.next - if !ok { - return nil, fmt.Errorf(`chan closed`) +func (x *exampleStreamClient) RecvMsg() (*StreamingResponse, error) { + m := new(StreamingResponse) + err := x.Recv(m) + if err != nil { + return nil, err } - return out, nil + return m, nil } // Server API for Example service diff --git a/server/rpc_stream.go b/server/rpc_stream.go new file mode 100644 index 00000000..94b111ae --- /dev/null +++ b/server/rpc_stream.go @@ -0,0 +1,79 @@ +package server + +import ( + "errors" + "io" + "log" + "sync" + + "golang.org/x/net/context" +) + +type rpcStream struct { + sync.RWMutex + seq uint64 + closed bool + err error + request Request + codec serverCodec + context context.Context +} + +func (r *rpcStream) Context() context.Context { + return r.context +} + +func (r *rpcStream) Request() Request { + return r.request +} + +func (r *rpcStream) Send(msg interface{}) error { + r.Lock() + defer r.Unlock() + + seq := r.seq + r.seq++ + + resp := response{ + ServiceMethod: r.request.Method(), + Seq: seq, + } + + err := codec.WriteResponse(&resp, msg, false) + if err != nil { + log.Println("rpc: writing response:", err) + } + return err +} + +func (r *rpcStream) Recv(msg interface{}) error { + r.Lock() + defer r.Unlock() + + req := request{} + + if err := codec.ReadRequestHeader(&req); err != nil { + // discard body + codec.ReadRequestBody(nil) + return err + } + + if err = codec.ReadRequestBody(msg); err != nil { + return err + } + + return nil +} + +func (r *rpcStream) Error() error { + r.RLock() + defer r.RUnlock() + return r.err +} + +func (r *rpcStream) Close() error { + r.Lock() + defer r.Unlock() + r.closed = true + return r.codec.Close() +} diff --git a/server/server.go b/server/server.go index 2a197bff..7044da0d 100644 --- a/server/server.go +++ b/server/server.go @@ -35,6 +35,7 @@ import ( log "github.com/golang/glog" "github.com/pborman/uuid" + "golang.org/x/net/context" ) type Server interface { @@ -61,10 +62,23 @@ type Request interface { Method() string ContentType() string Request() interface{} - // indicates whether the response should be streaming + // indicates whether the request will be streamed Stream() bool } +// Streamer represents a stream established with a client. +// A stream can be bidirectional which is indicated by the request. +// The last error will be left in Error(). +// EOF indicated end of the stream. +type Streamer interface { + Context() context.Context + Request() Request + Send(interface{}) error + Recv(interface{}) error + Error() error + Close() error +} + type Option func(*options) var ( diff --git a/server/server_wrapper.go b/server/server_wrapper.go index c6c4303f..45b6d845 100644 --- a/server/server_wrapper.go +++ b/server/server_wrapper.go @@ -19,3 +19,5 @@ type HandlerWrapper func(HandlerFunc) HandlerFunc // SubscriberWrapper wraps the SubscriberFunc and returns the equivalent type SubscriberWrapper func(SubscriberFunc) SubscriberFunc + +type StreamWrapper func(Streamer) Streamer