1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-08-10 21:52:01 +02:00

Setup subscription wrappers

This commit is contained in:
Asim
2015-12-02 19:56:57 +00:00
parent cfa676af19
commit eef155490e
3 changed files with 81 additions and 22 deletions

View File

@@ -208,7 +208,7 @@ func (s *rpcServer) Register() error {
defer s.Unlock()
for sb, _ := range s.subscribers {
handler := s.createSubHandler(sb)
handler := s.createSubHandler(sb, s.opts)
sub, err := config.broker.Subscribe(sb.Topic(), handler)
if err != nil {
return err
@@ -279,7 +279,7 @@ func (s *rpcServer) Start() error {
registerHealthChecker(s)
config := s.Config()
ts, err := config.transport.Listen(s.opts.address)
ts, err := config.transport.Listen(config.address)
if err != nil {
return err
}

View File

@@ -154,28 +154,23 @@ func validateSubscriber(sub Subscriber) error {
return nil
}
func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler {
func (s *rpcServer) createSubHandler(sb *subscriber, opts options) broker.Handler {
return func(msg *broker.Message) {
cf, err := s.newCodec(msg.Header["Content-Type"])
if err != nil {
return
}
b := &buffer{bytes.NewBuffer(msg.Body)}
co := cf(b)
if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil {
return
}
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
delete(hdr, "Content-Type")
ctx := c.WithMetadata(context.Background(), hdr)
rctx := reflect.ValueOf(ctx)
for _, handler := range sb.handlers {
for i := 0; i < len(sb.handlers); i++ {
handler := sb.handlers[i]
var isVal bool
var req reflect.Value
@@ -185,26 +180,45 @@ func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler {
req = reflect.New(handler.reqType)
isVal = true
}
if isVal {
req = req.Elem()
}
b := &buffer{bytes.NewBuffer(msg.Body)}
co := cf(b)
defer co.Close()
if err := co.ReadHeader(&codec.Message{}, codec.Publication); err != nil {
continue
}
if err := co.ReadBody(req.Interface()); err != nil {
continue
}
if isVal {
req = req.Elem()
fn := func(ctx context.Context, msg interface{}) error {
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
}
if handler.ctxType != nil {
vals = append(vals, reflect.ValueOf(ctx))
}
vals = append(vals, reflect.ValueOf(msg))
returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil {
return err.(error)
}
return nil
}
var vals []reflect.Value
if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr)
for i := len(opts.subWrappers); i > 0; i-- {
fn = opts.subWrappers[i-1](fn)
}
if handler.ctxType != nil {
vals = append(vals, rctx)
}
vals = append(vals, req)
go handler.method.Call(vals)
go fn(ctx, req.Interface())
}
}
}