1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-23 17:53:05 +02:00

The mega cruft proxy PR (#974)

* the mega cruft proxy PR

* Rename broker id

* add protocol=grpc

* fix compilation breaks

* Add the tunnel broker to the network

* fix broker id

* continue to be backwards compatible in the protocol
This commit is contained in:
Asim Aslam 2019-11-25 16:31:43 +00:00 committed by GitHub
parent 252667398e
commit 080363e8c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 595 additions and 196 deletions

View File

@ -126,7 +126,7 @@ func newHttpBroker(opts ...Option) Broker {
}
h := &httpBroker{
id: "broker-" + uuid.New().String(),
id: "go.micro.http.broker-" + uuid.New().String(),
address: addr,
opts: options,
r: reg,
@ -472,7 +472,7 @@ func (h *httpBroker) Init(opts ...Option) error {
}
if len(h.id) == 0 {
h.id = "broker-" + uuid.New().String()
h.id = "go.micro.http.broker-" + uuid.New().String()
}
// get registry
@ -648,9 +648,6 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
return nil, err
}
// create unique id
id := h.id + "." + uuid.New().String()
var secure bool
if h.opts.Secure || h.opts.TLSConfig != nil {
@ -659,7 +656,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
// register service
node := &registry.Node{
Id: id,
Id: h.id,
Address: mnet.HostPort(addr, port),
Metadata: map[string]string{
"secure": fmt.Sprintf("%t", secure),
@ -684,7 +681,7 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO
subscriber := &httpSubscriber{
opts: options,
hb: h,
id: id,
id: h.id,
topic: topic,
fn: handler,
svc: service,

View File

@ -15,10 +15,16 @@ var (
{
Id: "foo-1.0.0-123",
Address: "localhost:9999",
Metadata: map[string]string{
"protocol": "mucp",
},
},
{
Id: "foo-1.0.0-321",
Address: "localhost:9999",
Metadata: map[string]string{
"protocol": "mucp",
},
},
},
},
@ -29,6 +35,9 @@ var (
{
Id: "foo-1.0.1-321",
Address: "localhost:6666",
Metadata: map[string]string{
"protocol": "mucp",
},
},
},
},
@ -39,6 +48,9 @@ var (
{
Id: "foo-1.0.3-345",
Address: "localhost:8888",
Metadata: map[string]string{
"protocol": "mucp",
},
},
},
},

View File

@ -13,6 +13,7 @@ import (
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
@ -70,8 +71,13 @@ func (g *grpcClient) next(request client.Request, opts client.CallOptions) (sele
}, nil
}
// only get the things that are of grpc protocol
selectOptions := append(opts.SelectOptions, selector.WithFilter(
selector.FilterLabel("protocol", "grpc"),
))
// get next nodes from the selector
next, err := g.opts.Selector.Select(service, opts.SelectOptions...)
next, err := g.opts.Selector.Select(service, selectOptions...)
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
@ -510,29 +516,56 @@ func (g *grpcClient) Stream(ctx context.Context, req client.Request, opts ...cli
}
func (g *grpcClient) Publish(ctx context.Context, p client.Message, opts ...client.PublishOption) error {
var options client.PublishOptions
for _, o := range opts {
o(&options)
}
md, ok := metadata.FromContext(ctx)
if !ok {
md = make(map[string]string)
}
md["Content-Type"] = p.ContentType()
md["Micro-Topic"] = p.Topic()
cf, err := g.newGRPCCodec(p.ContentType())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
var body []byte
// passed in raw data
if d, ok := p.Payload().(*raw.Frame); ok {
body = d.Data
} else {
// set the body
b, err := cf.Marshal(p.Payload())
if err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
body = b
}
g.once.Do(func() {
g.opts.Broker.Connect()
})
return g.opts.Broker.Publish(p.Topic(), &broker.Message{
topic := p.Topic()
// get proxy topic
if prx := os.Getenv("MICRO_PROXY"); len(prx) > 0 {
options.Exchange = prx
}
// get the exchange
if len(options.Exchange) > 0 {
topic = options.Exchange
}
return g.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: b,
Body: body,
})
}

View File

@ -45,6 +45,9 @@ func TestGRPCClient(t *testing.T) {
{
Id: "test-1",
Address: l.Addr().String(),
Metadata: map[string]string{
"protocol": "grpc",
},
},
},
})

View File

@ -13,6 +13,7 @@ import (
"github.com/micro/go-micro/client/pool"
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
@ -349,8 +350,13 @@ func (r *rpcClient) next(request Request, opts CallOptions) (selector.Next, erro
}, nil
}
// only get the things that are of mucp protocol
selectOptions := append(opts.SelectOptions, selector.WithFilter(
selector.FilterLabel("protocol", "mucp"),
))
// get next nodes from the selector
next, err := r.opts.Selector.Select(service, opts.SelectOptions...)
next, err := r.opts.Selector.Select(service, selectOptions...)
if err != nil {
if err == selector.ErrNotFound {
return nil, errors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
@ -583,6 +589,12 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
return errors.InternalServerError("go.micro.client", err.Error())
}
var body []byte
// passed in raw data
if d, ok := msg.Payload().(*raw.Frame); ok {
body = d.Data
} else {
// new buffer
b := buf.New(nil)
@ -596,13 +608,18 @@ func (r *rpcClient) Publish(ctx context.Context, msg Message, opts ...PublishOpt
}, msg.Payload()); err != nil {
return errors.InternalServerError("go.micro.client", err.Error())
}
// set the body
body = b.Bytes()
}
r.once.Do(func() {
r.opts.Broker.Connect()
})
return r.opts.Broker.Publish(topic, &broker.Message{
Header: md,
Body: b.Bytes(),
Body: body,
})
}

View File

@ -143,6 +143,9 @@ func TestCallWrapper(t *testing.T) {
{
Id: id,
Address: address,
Metadata: map[string]string{
"protocol": "mucp",
},
},
},
})

View File

@ -145,6 +145,11 @@ func setupProtocol(msg *transport.Message, node *registry.Node) codec.NewCodec {
return nil
}
// processing topic publishing
if len(msg.Header["Micro-Topic"]) > 0 {
return nil
}
// no protocol use old codecs
switch msg.Header["Content-Type"] {
case "application/json":

View File

@ -19,6 +19,7 @@ import (
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/tunnel"
bun "github.com/micro/go-micro/tunnel/broker"
tun "github.com/micro/go-micro/tunnel/transport"
"github.com/micro/go-micro/util/backoff"
"github.com/micro/go-micro/util/log"
@ -112,6 +113,11 @@ func newNetwork(opts ...Option) Network {
tun.WithTunnel(options.Tunnel),
)
// create the tunnel broker
tunBroker := bun.NewBroker(
bun.WithTunnel(options.Tunnel),
)
// server is network server
server := server.NewServer(
server.Id(options.Id),
@ -119,10 +125,12 @@ func newNetwork(opts ...Option) Network {
server.Advertise(advertise),
server.Name(options.Name),
server.Transport(tunTransport),
server.Broker(tunBroker),
)
// client is network client
client := client.NewClient(
client.Broker(tunBroker),
client.Transport(tunTransport),
client.Selector(
rtr.NewSelector(

View File

@ -10,7 +10,6 @@ import (
"github.com/micro/go-micro/client/grpc"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy"
"github.com/micro/go-micro/server"
)
@ -62,8 +61,14 @@ func readLoop(r server.Request, s client.Stream) error {
}
}
func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy.grpc", "SendRequest is unsupported")
// ProcessMessage acts as a message exchange and forwards messages to ongoing topics
// TODO: should we look at p.Endpoint and only send to the local endpoint? probably
func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error {
// TODO: check that we're not broadcast storming by sending to the same topic
// that we're actually subscribed to
// directly publish to the local client
return p.Client.Publish(ctx, msg)
}
// ServeRequest honours the server.Proxy interface

View File

@ -10,7 +10,6 @@ import (
"net/url"
"path"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/config/options"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/proxy"
@ -45,8 +44,69 @@ func getEndpoint(hdr map[string]string) string {
return ""
}
func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy.http", "SendRequest is unsupported")
func getTopic(hdr map[string]string) string {
ep := hdr["Micro-Topic"]
if len(ep) > 0 && ep[0] == '/' {
return ep
}
return "/" + hdr["Micro-Topic"]
}
// ProcessMessage handles incoming asynchronous messages
func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error {
if p.Endpoint == "" {
p.Endpoint = proxy.DefaultEndpoint
}
// get the header
hdr := msg.Header()
// get topic
// use /topic as endpoint
endpoint := getTopic(hdr)
// set the endpoint
if len(endpoint) == 0 {
endpoint = p.Endpoint
} else {
// add endpoint to backend
u, err := url.Parse(p.Endpoint)
if err != nil {
return errors.InternalServerError(msg.Topic(), err.Error())
}
u.Path = path.Join(u.Path, endpoint)
endpoint = u.String()
}
// send to backend
hreq, err := http.NewRequest("POST", endpoint, bytes.NewReader(msg.Body()))
if err != nil {
return errors.InternalServerError(msg.Topic(), err.Error())
}
// set the headers
for k, v := range hdr {
hreq.Header.Set(k, v)
}
// make the call
hrsp, err := http.DefaultClient.Do(hreq)
if err != nil {
return errors.InternalServerError(msg.Topic(), err.Error())
}
// read body
b, err := ioutil.ReadAll(hrsp.Body)
hrsp.Body.Close()
if err != nil {
return errors.InternalServerError(msg.Topic(), err.Error())
}
if hrsp.StatusCode != 200 {
return errors.New(msg.Topic(), string(b), int32(hrsp.StatusCode))
}
return nil
}
// ServeRequest honours the server.Router interface

View File

@ -281,8 +281,34 @@ func (p *Proxy) watchRoutes() {
}
}
func (p *Proxy) SendRequest(ctx context.Context, req client.Request, rsp client.Response) error {
return errors.InternalServerError("go.micro.proxy", "SendRequest is unsupported")
// ProcessMessage acts as a message exchange and forwards messages to ongoing topics
// TODO: should we look at p.Endpoint and only send to the local endpoint? probably
func (p *Proxy) ProcessMessage(ctx context.Context, msg server.Message) error {
// TODO: check that we're not broadcast storming by sending to the same topic
// that we're actually subscribed to
log.Tracef("Received message for %s", msg.Topic())
var errors []string
// directly publish to the local client
if err := p.Client.Publish(ctx, msg); err != nil {
errors = append(errors, err.Error())
}
// publish to all links
for _, client := range p.Links {
if err := client.Publish(ctx, msg); err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) == 0 {
return nil
}
// there is no error...muahaha
return fmt.Errorf("Message processing error: %s", strings.Join(errors, "\n"))
}
// ServeRequest honours the server.Router interface
@ -302,6 +328,8 @@ func (p *Proxy) ServeRequest(ctx context.Context, req server.Request, rsp server
return errors.BadRequest("go.micro.proxy", "service name is blank")
}
log.Tracef("Received request for %s", service)
// are we network routing or local routing
if len(p.Links) == 0 {
local = true

View File

@ -13,9 +13,9 @@ import (
// Proxy can be used as a proxy server for go-micro services
type Proxy interface {
options.Options
// SendRequest honours the client.Router interface
SendRequest(context.Context, client.Request, client.Response) error
// ServeRequest honours the server.Router interface
// ProcessMessage handles inbound messages
ProcessMessage(context.Context, server.Message) error
// ServeRequest handles inbound requests
ServeRequest(context.Context, server.Request, server.Response) error
}

View File

@ -89,6 +89,11 @@ func newGRPCServer(opts ...server.Option) server.Server {
type grpcRouter struct {
h func(context.Context, server.Request, interface{}) error
m func(context.Context, server.Message) error
}
func (r grpcRouter) ProcessMessage(ctx context.Context, msg server.Message) error {
return r.m(ctx, msg)
}
func (r grpcRouter) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
@ -258,7 +263,7 @@ func (g *grpcServer) handler(srv interface{}, stream grpc.ServerStream) error {
handler = g.opts.HdlrWrappers[i-1](handler)
}
r := grpcRouter{handler}
r := grpcRouter{h: handler}
// serve the actual request using the request router
if err := r.ServeRequest(ctx, request, response); err != nil {
@ -564,7 +569,7 @@ func (g *grpcServer) Register() error {
node.Metadata["registry"] = config.Registry.String()
node.Metadata["server"] = g.String()
node.Metadata["transport"] = g.String()
// node.Metadata["transport"] = config.Transport.String()
node.Metadata["protocol"] = "grpc"
g.RLock()
// Maps are ordered randomly, sort the keys for consistency

View File

@ -20,6 +20,9 @@ type rpcMessage struct {
topic string
contentType string
payload interface{}
header map[string]string
body []byte
codec codec.Codec
}
func (r *rpcRequest) ContentType() string {
@ -73,3 +76,15 @@ func (r *rpcMessage) Topic() string {
func (r *rpcMessage) Payload() interface{} {
return r.payload
}
func (r *rpcMessage) Header() map[string]string {
return r.header
}
func (r *rpcMessage) Body() []byte {
return r.body
}
func (r *rpcMessage) Codec() codec.Reader {
return r.codec
}

View File

@ -33,7 +33,6 @@ type subscriber struct {
}
func newSubscriber(topic string, sub interface{}, opts ...server.SubscriberOption) server.Subscriber {
options := server.SubscriberOptions{
AutoAck: true,
}
@ -239,6 +238,8 @@ func (g *grpcServer) createSubHandler(sb *subscriber, opts server.Options) broke
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
header: msg.Header,
body: msg.Body,
})
}()
}

View File

@ -135,12 +135,18 @@ func setupProtocol(msg *transport.Message) codec.NewCodec {
endpoint := getHeader("Micro-Endpoint", msg.Header)
protocol := getHeader("Micro-Protocol", msg.Header)
target := getHeader("Micro-Target", msg.Header)
topic := getHeader("Micro-Topic", msg.Header)
// if the protocol exists (mucp) do nothing
if len(protocol) > 0 {
return nil
}
// newer method of processing messages over transport
if len(topic) > 0 {
return nil
}
// if no service/method/endpoint then it's the old protocol
if len(service) == 0 && len(method) == 0 && len(endpoint) == 0 {
return defaultCodecs[msg.Header["Content-Type"]]

33
server/rpc_event.go Normal file
View File

@ -0,0 +1,33 @@
package server
import (
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/transport"
)
// event is a broker event we handle on the server transport
type event struct {
message *broker.Message
}
func (e *event) Ack() error {
// there is no ack support
return nil
}
func (e *event) Message() *broker.Message {
return e.message
}
func (e *event) Topic() string {
return e.message.Header["Micro-Topic"]
}
func newEvent(msg transport.Message) *event {
return &event{
message: &broker.Message{
Header: msg.Header,
Body: msg.Body,
},
}
}

View File

@ -1,8 +1,11 @@
package server
import (
"bytes"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/transport"
"github.com/micro/go-micro/util/buf"
)
type rpcRequest struct {
@ -23,6 +26,9 @@ type rpcMessage struct {
topic string
contentType string
payload interface{}
header map[string]string
body []byte
codec codec.NewCodec
}
func (r *rpcRequest) Codec() codec.Reader {
@ -86,3 +92,16 @@ func (r *rpcMessage) Topic() string {
func (r *rpcMessage) Payload() interface{} {
return r.payload
}
func (r *rpcMessage) Header() map[string]string {
return r.header
}
func (r *rpcMessage) Body() []byte {
return r.body
}
func (r *rpcMessage) Codec() codec.Reader {
b := buf.New(bytes.NewBuffer(r.body))
return r.codec(b)
}

View File

@ -9,6 +9,7 @@ package server
import (
"context"
"errors"
"fmt"
"io"
"reflect"
"strings"
@ -61,18 +62,29 @@ type response struct {
// router represents an RPC router.
type router struct {
name string
mu sync.Mutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *request
respLock sync.Mutex // protects freeResp
freeResp *response
// handler wrappers
hdlrWrappers []HandlerWrapper
// subscriber wrappers
subWrappers []SubscriberWrapper
su sync.RWMutex
subscribers map[string][]*subscriber
}
func newRpcRouter() *router {
return &router{
serviceMap: make(map[string]*service),
subscribers: make(map[string][]*subscriber),
}
}
@ -449,3 +461,144 @@ func (router *router) ServeRequest(ctx context.Context, r Request, rsp Response)
}
return service.call(ctx, router, sending, mtype, req, argv, replyv, rsp.Codec())
}
func (router *router) NewSubscriber(topic string, handler interface{}, opts ...SubscriberOption) Subscriber {
return newSubscriber(topic, handler, opts...)
}
func (router *router) Subscribe(s Subscriber) error {
sub, ok := s.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
if err := validateSubscriber(sub); err != nil {
return err
}
router.su.Lock()
defer router.su.Unlock()
// append to subscribers
subs := router.subscribers[sub.Topic()]
subs = append(subs, sub)
router.subscribers[sub.Topic()] = subs
return nil
}
func (router *router) ProcessMessage(ctx context.Context, msg Message) error {
router.su.RLock()
// get the subscribers by topic
subs, ok := router.subscribers[msg.Topic()]
if !ok {
router.su.RUnlock()
return nil
}
// unlock since we only need to get the subs
router.su.RUnlock()
var results []string
// we may have multiple subscribers for the topic
for _, sub := range subs {
// we may have multiple handlers per subscriber
for i := 0; i < len(sub.handlers); i++ {
// get the handler
handler := sub.handlers[i]
var isVal bool
var req reflect.Value
// check whether the handler is a pointer
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
// if its a value get the element
if isVal {
req = req.Elem()
}
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
// if its a value get the element
if isVal {
req = req.Elem()
}
cc := msg.Codec()
// read the header. mostly a noop
if err := cc.ReadHeader(&codec.Message{}, codec.Event); err != nil {
return err
}
// read the body into the handler request value
if err := cc.ReadBody(req.Interface()); err != nil {
return err
}
// create the handler which will honour the SubscriberFunc type
fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value
if sub.typ.Kind() != reflect.Func {
vals = append(vals, sub.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
// values to pass the handler
vals = append(vals, reflect.ValueOf(msg.Payload()))
// execute the actuall call of the handler
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}
return nil
}
// wrap with subscriber wrappers
for i := len(router.subWrappers); i > 0; i-- {
fn = router.subWrappers[i-1](fn)
}
// create new rpc message
rpcMsg := &rpcMessage{
topic: msg.Topic(),
contentType: msg.ContentType(),
payload: req.Interface(),
codec: msg.(*rpcMessage).codec,
header: msg.Header(),
body: msg.Body(),
}
// execute the message handler
if err := fn(ctx, rpcMsg); err != nil {
results = append(results, err.Error())
}
}
}
// if no errors just return
if len(results) == 0 {
return nil
}
return errors.New("subscriber error: " + strings.Join(results, "\n"))
}

View File

@ -14,6 +14,7 @@ import (
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
raw "github.com/micro/go-micro/codec/bytes"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/transport"
@ -30,11 +31,13 @@ type rpcServer struct {
sync.RWMutex
opts Options
handlers map[string]Handler
subscribers map[*subscriber][]broker.Subscriber
subscribers map[Subscriber][]broker.Subscriber
// marks the serve as started
started bool
// used for first registration
registered bool
// subscribe to service name
subscriber broker.Subscriber
// graceful exit
wg *sync.WaitGroup
}
@ -43,12 +46,13 @@ func newRpcServer(opts ...Option) Server {
options := newOptions(opts...)
router := newRpcRouter()
router.hdlrWrappers = options.HdlrWrappers
router.subWrappers = options.SubWrappers
return &rpcServer{
opts: options,
router: router,
handlers: make(map[string]Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
subscribers: make(map[Subscriber][]broker.Subscriber),
exit: make(chan chan error),
wg: wait(options.Context),
}
@ -56,12 +60,85 @@ func newRpcServer(opts ...Option) Server {
type rpcRouter struct {
h func(context.Context, Request, interface{}) error
m func(context.Context, Message) error
}
func (r rpcRouter) ProcessMessage(ctx context.Context, msg Message) error {
return r.m(ctx, msg)
}
func (r rpcRouter) ServeRequest(ctx context.Context, req Request, rsp Response) error {
return r.h(ctx, req, rsp)
}
// HandleEvent handles inbound messages to the service directly
// TODO: handle requests from an event. We won't send a response.
func (s *rpcServer) HandleEvent(e broker.Event) error {
// formatting horrible cruft
msg := e.Message()
if msg.Header == nil {
// create empty map in case of headers empty to avoid panic later
msg.Header = make(map[string]string)
}
// get codec
ct := msg.Header["Content-Type"]
// default content type
if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType
ct = DefaultContentType
}
// get codec
cf, err := s.newCodec(ct)
if err != nil {
return err
}
// copy headers
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
// create context
ctx := metadata.NewContext(context.Background(), hdr)
// TODO: inspect message header
// Micro-Service means a request
// Micro-Topic means a message
rpcMsg := &rpcMessage{
topic: msg.Header["Micro-Topic"],
contentType: ct,
payload: &raw.Frame{msg.Body},
codec: cf,
header: msg.Header,
body: msg.Body,
}
// existing router
r := Router(s.router)
// if the router is present then execute it
if s.opts.Router != nil {
// create a wrapped function
handler := s.opts.Router.ProcessMessage
// execute the wrapper for it
for i := len(s.opts.SubWrappers); i > 0; i-- {
handler = s.opts.SubWrappers[i-1](handler)
}
// set the router
r = rpcRouter{m: handler}
}
return r.ProcessMessage(ctx, rpcMsg)
}
// ServeConn serves a single connection
func (s *rpcServer) ServeConn(sock transport.Socket) {
var wg sync.WaitGroup
@ -97,6 +174,26 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
return
}
// check the message header for
// Micro-Service is a request
// Micro-Topic is a message
if t := msg.Header["Micro-Topic"]; len(t) > 0 {
// process the event
ev := newEvent(msg)
// TODO: handle the error event
if err := s.HandleEvent(ev); err != nil {
msg.Header["Micro-Error"] = err.Error()
}
// write back some 200
sock.Send(&transport.Message{
Header: msg.Header,
})
// we're done
continue
}
// business as usual
// use Micro-Stream as the stream identifier
// in the event its blank we'll always process
// on the same socket
@ -263,7 +360,7 @@ func (s *rpcServer) ServeConn(sock transport.Socket) {
}
// set the router
r = rpcRouter{handler}
r = rpcRouter{h: handler}
}
// wait for processing to exit
@ -366,6 +463,7 @@ func (s *rpcServer) Init(opts ...Option) error {
r := newRpcRouter()
r.hdlrWrappers = s.opts.HdlrWrappers
r.serviceMap = s.router.serviceMap
r.subWrappers = s.opts.SubWrappers
s.router = r
}
@ -391,29 +489,18 @@ func (s *rpcServer) Handle(h Handler) error {
}
func (s *rpcServer) NewSubscriber(topic string, sb interface{}, opts ...SubscriberOption) Subscriber {
return newSubscriber(topic, sb, opts...)
return s.router.NewSubscriber(topic, sb, opts...)
}
func (s *rpcServer) Subscribe(sb Subscriber) error {
sub, ok := sb.(*subscriber)
if !ok {
return fmt.Errorf("invalid subscriber: expected *subscriber")
}
if len(sub.handlers) == 0 {
return fmt.Errorf("invalid subscriber: no handler functions")
}
s.Lock()
defer s.Unlock()
if err := validateSubscriber(sb); err != nil {
if err := s.router.Subscribe(sb); err != nil {
return err
}
s.Lock()
defer s.Unlock()
_, ok = s.subscribers[sub]
if ok {
return fmt.Errorf("subscriber %v already exists", s)
}
s.subscribers[sub] = nil
s.subscribers[sb] = nil
return nil
}
@ -483,7 +570,7 @@ func (s *rpcServer) Register() error {
}
sort.Strings(handlerList)
var subscriberList []*subscriber
var subscriberList []Subscriber
for e := range s.subscribers {
// Only advertise non internal subscribers
if !e.Options().Internal {
@ -491,7 +578,7 @@ func (s *rpcServer) Register() error {
}
}
sort.Slice(subscriberList, func(i, j int) bool {
return subscriberList[i].topic > subscriberList[j].topic
return subscriberList[i].Topic() > subscriberList[j].Topic()
})
endpoints := make([]*registry.Endpoint, 0, len(handlerList)+len(subscriberList))
@ -535,8 +622,17 @@ func (s *rpcServer) Register() error {
s.registered = true
// subscribe to the topic with own name
sub, err := s.opts.Broker.Subscribe(config.Name, s.HandleEvent)
if err != nil {
return err
}
// save the subscriber
s.subscriber = sub
// subscribe for all of the subscribers
for sb := range s.subscribers {
handler := s.createSubHandler(sb, s.opts)
var opts []broker.SubscribeOption
if queue := sb.Options().Queue; len(queue) > 0 {
opts = append(opts, broker.Queue(queue))
@ -550,10 +646,11 @@ func (s *rpcServer) Register() error {
opts = append(opts, broker.DisableAutoAck())
}
sub, err := config.Broker.Subscribe(sb.Topic(), handler, opts...)
sub, err := config.Broker.Subscribe(sb.Topic(), s.HandleEvent, opts...)
if err != nil {
return err
}
log.Logf("Subscribing %s to topic: %s", node.Id, sub.Topic())
s.subscribers[sb] = []broker.Subscriber{sub}
}
@ -621,6 +718,12 @@ func (s *rpcServer) Deregister() error {
s.registered = false
// close the subscriber
if s.subscriber != nil {
s.subscriber.Unsubscribe()
s.subscriber = nil
}
for sb, subs := range s.subscribers {
for _, sub := range subs {
log.Logf("Unsubscribing %s from topic: %s", node.Id, sub.Topic())

View File

@ -29,15 +29,26 @@ type Server interface {
// Router handle serving messages
type Router interface {
// ProcessMessage processes a message
ProcessMessage(context.Context, Message) error
// ServeRequest processes a request to completion
ServeRequest(context.Context, Request, Response) error
}
// Message is an async message interface
type Message interface {
// Topic of the message
Topic() string
// The decoded payload value
Payload() interface{}
// The content type of the payload
ContentType() string
// The raw headers of the message
Header() map[string]string
// The raw body of the message
Body() []byte
// Codec used to decode the message
Codec() codec.Reader
}
// Request is a synchronous request interface

View File

@ -1,17 +1,10 @@
package server
import (
"bytes"
"context"
"fmt"
"reflect"
"strings"
"github.com/micro/go-micro/broker"
"github.com/micro/go-micro/codec"
"github.com/micro/go-micro/metadata"
"github.com/micro/go-micro/registry"
"github.com/micro/go-micro/util/buf"
)
const (
@ -165,124 +158,6 @@ func validateSubscriber(sub Subscriber) error {
return nil
}
func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handler {
return func(p broker.Event) error {
msg := p.Message()
if msg.Header == nil {
// create empty map in case of headers empty to avoid panic later
msg.Header = make(map[string]string)
}
// get codec
ct := msg.Header["Content-Type"]
// default content type
if len(ct) == 0 {
msg.Header["Content-Type"] = DefaultContentType
ct = DefaultContentType
}
// get codec
cf, err := s.newCodec(ct)
if err != nil {
return err
}
// copy headers
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
// create context
ctx := metadata.NewContext(context.Background(), hdr)
results := make(chan error, len(sb.handlers))
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
if handler.reqType.Kind() == reflect.Ptr {
req = reflect.New(handler.reqType.Elem())
} else {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
b := buf.New(bytes.NewBuffer(msg.Body))
co := cf(b)
defer co.Close()
if err := co.ReadHeader(&codec.Message{}, codec.Event); err != nil {
return err
}
if err := co.ReadBody(req.Interface()); err != nil {
return err
}
fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}
return nil
}
for i := len(opts.SubWrappers); i > 0; i-- {
fn = opts.SubWrappers[i-1](fn)
}
if s.wg != nil {
s.wg.Add(1)
}
go func() {
if s.wg != nil {
defer s.wg.Done()
}
results <- fn(ctx, &rpcMessage{
topic: sb.topic,
contentType: ct,
payload: req.Interface(),
})
}()
}
var errors []string
for i := 0; i < len(sb.handlers); i++ {
if err := <-results; err != nil {
errors = append(errors, err.Error())
}
}
if len(errors) > 0 {
return fmt.Errorf("subscriber error: %s", strings.Join(errors, "\n"))
}
return nil
}
}
func (s *subscriber) Topic() string {
return s.topic
}

View File

@ -22,6 +22,13 @@ var (
once sync.Once
)
func (s *Server) ProcessMessage(ctx context.Context, msg server.Message) error {
if msg.Topic() == s.Name {
return server.DefaultRouter.ProcessMessage(ctx, msg)
}
return s.Proxy.ProcessMessage(ctx, msg)
}
func (s *Server) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
if req.Service() == s.Name {
return server.DefaultRouter.ServeRequest(ctx, req, rsp)