diff --git a/client/rpc_client.go b/client/rpc_client.go index e8e0bbeb..bf2abacf 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -105,7 +105,7 @@ func (r *rpcClient) call(ctx context.Context, address string, request Request, r defer c.Close() client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) - err = client.Call(ctx, request.Method(), request.Request(), response) + err = client.Call(ctx, request.Service(), request.Method(), request.Request(), response) if err != nil { return err } @@ -137,7 +137,7 @@ func (r *rpcClient) stream(ctx context.Context, address string, request Request, } client := newClientWithCodec(newRpcPlusCodec(msg, c, cf)) - call := client.StreamGo(request.Method(), request.Request(), responseChan) + call := client.StreamGo(request.Service(), request.Method(), request.Request(), responseChan) return &rpcStream{ request: request, diff --git a/client/rpc_codec.go b/client/rpc_codec.go index f2fd28e3..fd5df68b 100644 --- a/client/rpc_codec.go +++ b/client/rpc_codec.go @@ -64,14 +64,19 @@ func newRpcPlusCodec(req *transport.Message, client transport.Client, c codec.Ne func (c *rpcPlusCodec) WriteRequest(req *request, body interface{}) error { m := &codec.Message{ - Id: req.Seq, - Method: req.ServiceMethod, - Type: codec.Request, + Id: req.Seq, + Target: req.Service, + Method: req.ServiceMethod, + Type: codec.Request, + Headers: map[string]string{}, } if err := c.codec.Write(m, body); err != nil { return err } c.req.Body = c.buf.wbuf.Bytes() + for k, v := range m.Headers { + c.req.Header[k] = v + } return c.client.Send(c.req) } diff --git a/client/rpcplus_client.go b/client/rpcplus_client.go index 4fb2d400..f119cc3e 100644 --- a/client/rpcplus_client.go +++ b/client/rpcplus_client.go @@ -32,6 +32,7 @@ var errShutdown = errors.New("connection is shut down") // call represents an active RPC. type call struct { + Service string ServiceMethod string // The name of the service and method to call. Args interface{} // The argument to the function (*struct). Reply interface{} // The reply from the function (*struct for single, chan * struct for streaming). @@ -65,6 +66,7 @@ type clientCodec interface { } type request struct { + Service string ServiceMethod string // format: "Service.Method" Seq uint64 // sequence number chosen by client next *request // for free list in Server @@ -95,6 +97,7 @@ func (client *client) send(call *call) { client.mutex.Unlock() // Encode and send the request. + client.request.Service = call.Service client.request.Seq = seq client.request.ServiceMethod = call.ServiceMethod err := client.codec.WriteRequest(&client.request, call.Args) @@ -241,12 +244,13 @@ func (client *client) Close() error { // the invocation. The done channel will signal when the call is complete by returning // the same call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. -func (client *client) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *call) *call { +func (client *client) Go(ctx context.Context, service, serviceMethod string, args interface{}, reply interface{}, done chan *call) *call { span := trace.NewSpanFromContext(ctx) span.StartClient(serviceMethod) defer span.Finish() cal := new(call) + cal.Service = service cal.ServiceMethod = serviceMethod cal.Args = args cal.Reply = reply @@ -268,7 +272,7 @@ func (client *client) Go(ctx context.Context, serviceMethod string, args interfa // StreamGo invokes the streaming function asynchronously. It returns the call structure representing // the invocation. -func (client *client) StreamGo(serviceMethod string, args interface{}, replyStream interface{}) *call { +func (client *client) StreamGo(service string, serviceMethod string, args interface{}, replyStream interface{}) *call { // first check the replyStream object is a stream of pointers to a data structure typ := reflect.TypeOf(replyStream) // FIXME: check the direction of the channel, maybe? @@ -278,6 +282,7 @@ func (client *client) StreamGo(serviceMethod string, args interface{}, replyStre } call := new(call) + call.Service = service call.ServiceMethod = serviceMethod call.Args = args call.Reply = replyStream @@ -288,7 +293,7 @@ func (client *client) StreamGo(serviceMethod string, args interface{}, replyStre } // call invokes the named function, waits for it to complete, and returns its error status. -func (client *client) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error { - call := <-client.Go(ctx, serviceMethod, args, reply, make(chan *call, 1)).Done +func (client *client) Call(ctx context.Context, service string, serviceMethod string, args interface{}, reply interface{}) error { + call := <-client.Go(ctx, service, serviceMethod, args, reply, make(chan *call, 1)).Done return call.Error } diff --git a/examples/mercury/README.md b/examples/mercury/README.md new file mode 100644 index 00000000..bc866fc5 --- /dev/null +++ b/examples/mercury/README.md @@ -0,0 +1,7 @@ +# Mercury + +An **experimental** integration for [mondo/mercury](https://github.com/mondough/mercury) + +mercury/{client,server} are standard mercury implementations for compatibility testing sake. + +micro/{client,server} are micro implementations of mercury's request/response system. diff --git a/examples/mercury/mercury/client/client.go b/examples/mercury/mercury/client/client.go new file mode 100644 index 00000000..cacbf704 --- /dev/null +++ b/examples/mercury/mercury/client/client.go @@ -0,0 +1,30 @@ +package main + +import ( + "fmt" + "time" + + hello "github.com/micro/micro/examples/greeter/server/proto/hello" + "github.com/mondough/mercury" + tmsg "github.com/mondough/typhon/message" + "github.com/mondough/typhon/rabbit" +) + +func main() { + req := mercury.NewRequest() + req.SetService("foo") + req.SetEndpoint("Say.Hello") + req.SetBody(&hello.Request{ + Name: "John", + }) + tmsg.ProtoMarshaler().MarshalBody(req) + trans := rabbit.NewTransport() + rsp, err := trans.Send(req, time.Second) + if err != nil { + fmt.Println(err) + return + } + tmsg.ProtoUnmarshaler(new(hello.Response)).UnmarshalPayload(rsp) + + fmt.Println(rsp.Body()) +} diff --git a/examples/mercury/mercury/server/server.go b/examples/mercury/mercury/server/server.go new file mode 100644 index 00000000..ef1236af --- /dev/null +++ b/examples/mercury/mercury/server/server.go @@ -0,0 +1,34 @@ +package main + +import ( + "github.com/mondough/mercury" + "github.com/mondough/mercury/server" + "github.com/mondough/mercury/service" + "github.com/mondough/typhon/rabbit" + + hello "github.com/micro/micro/examples/greeter/server/proto/hello" +) + +func handler(req mercury.Request) (mercury.Response, error) { + request := req.Body().(*hello.Request) + rsp := req.Response(&hello.Response{ + Msg: "Hey " + request.Name, + }) + return rsp, nil +} + +func main() { + s := service.Init(service.Config{ + Name: "foo", + Transport: rabbit.NewTransport(), + }) + + s.Server().AddEndpoints(server.Endpoint{ + Name: "Say.Hello", + Handler: handler, + Request: new(hello.Request), + Response: new(hello.Response), + }) + + s.Run() +} diff --git a/examples/mercury/micro/client/client.go b/examples/mercury/micro/client/client.go new file mode 100644 index 00000000..21e48f79 --- /dev/null +++ b/examples/mercury/micro/client/client.go @@ -0,0 +1,37 @@ +package main + +import ( + "fmt" + + "github.com/micro/go-micro/client" + mcodec "github.com/micro/go-plugins/codec/mercury" + "github.com/micro/go-plugins/selector/mercury" + "github.com/micro/go-plugins/transport/rabbitmq" + hello "github.com/micro/micro/examples/greeter/server/proto/hello" + + "golang.org/x/net/context" +) + +func main() { + rabbitmq.DefaultExchange = "b2a" + rabbitmq.DefaultRabbitURL = "amqp://localhost:5672" + + c := client.NewClient( + client.Selector(mercury.NewSelector()), + client.Transport(rabbitmq.NewTransport([]string{})), + client.Codec("application/x-protobuf", mcodec.NewCodec), + client.ContentType("application/x-protobuf"), + ) + + req := c.NewRequest("foo", "Say.Hello", &hello.Request{ + Name: "John", + }) + + rsp := &hello.Response{} + + if err := c.Call(context.Background(), req, rsp); err != nil { + fmt.Println(err) + } + + fmt.Println(rsp) +} diff --git a/examples/mercury/micro/server/server.go b/examples/mercury/micro/server/server.go new file mode 100644 index 00000000..0d3112f4 --- /dev/null +++ b/examples/mercury/micro/server/server.go @@ -0,0 +1,38 @@ +package main + +import ( + "flag" + "github.com/micro/go-micro/server" + mcodec "github.com/micro/go-plugins/codec/mercury" + "github.com/micro/go-plugins/transport/rabbitmq" + hello "github.com/micro/micro/examples/greeter/server/proto/hello" + + "golang.org/x/net/context" +) + +type Say struct{} + +func (s *Say) Hello(ctx context.Context, req *hello.Request, rsp *hello.Response) error { + rsp.Msg = "Hey " + req.Name + return nil +} + +func main() { + flag.Parse() + rabbitmq.DefaultExchange = "b2a" + rabbitmq.DefaultRabbitURL = "amqp://localhost:5672" + + s := server.NewServer( + server.Name("foo"), + server.Id("foo"), + server.Address("foo"), + server.Transport(rabbitmq.NewTransport([]string{})), + server.Codec("application/x-protobuf", mcodec.NewCodec), + ) + s.Handle( + s.NewHandler(&Say{}), + ) + + s.Start() + select {} +} diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 966266a8..3aeb963f 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -61,7 +61,10 @@ func newRpcPlusCodec(req *transport.Message, socket transport.Socket, c codec.Ne } func (c *rpcPlusCodec) ReadRequestHeader(r *request) error { - var m codec.Message + m := codec.Message{ + Headers: c.req.Header, + } + err := c.codec.ReadHeader(&m, codec.Request) r.ServiceMethod = m.Method r.Seq = m.Id @@ -75,16 +78,19 @@ func (c *rpcPlusCodec) ReadRequestBody(b interface{}) error { func (c *rpcPlusCodec) WriteResponse(r *response, body interface{}, last bool) error { c.buf.wbuf.Reset() m := &codec.Message{ - Method: r.ServiceMethod, - Id: r.Seq, - Error: r.Error, - Type: codec.Response, + Method: r.ServiceMethod, + Id: r.Seq, + Error: r.Error, + Type: codec.Response, + Headers: map[string]string{}, } if err := c.codec.Write(m, body); err != nil { return err } + + m.Headers["Content-Type"] = c.req.Header["Content-Type"] return c.socket.Send(&transport.Message{ - Header: map[string]string{"Content-Type": c.req.Header["Content-Type"]}, + Header: m.Headers, Body: c.buf.wbuf.Bytes(), }) }