1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-24 10:07:04 +02:00

Only support single signature for subscriber and validate

This commit is contained in:
Asim 2015-12-02 17:42:14 +00:00
parent b1511ed813
commit cfa676af19
4 changed files with 74 additions and 5 deletions

View File

@ -39,19 +39,23 @@ func main() {
) )
// Register Subscribers // Register Subscribers
server.Subscribe( if err := server.Subscribe(
server.NewSubscriber( server.NewSubscriber(
"topic.go.micro.srv.example", "topic.go.micro.srv.example",
new(subscriber.Example), new(subscriber.Example),
), ),
) ); err != nil {
log.Fatal(err)
}
server.Subscribe( if err := server.Subscribe(
server.NewSubscriber( server.NewSubscriber(
"topic.go.micro.srv.example", "topic.go.micro.srv.example",
subscriber.Handler, subscriber.Handler,
), ),
) ); err != nil {
log.Fatal(err)
}
// Run server // Run server
if err := server.Run(); err != nil { if err := server.Run(); err != nil {

View File

@ -13,6 +13,7 @@ func (e *Example) Handle(ctx context.Context, msg *example.Message) error {
return nil return nil
} }
func Handler(msg *example.Message) { func Handler(ctx context.Context, msg *example.Message) error {
log.Info("Function Received message: ", msg.Say) log.Info("Function Received message: ", msg.Say)
return nil
} }

View File

@ -132,6 +132,10 @@ func (s *rpcServer) Subscribe(sb Subscriber) error {
return fmt.Errorf("invalid subscriber: no handler functions") return fmt.Errorf("invalid subscriber: no handler functions")
} }
if err := validateSubscriber(sb); err != nil {
return err
}
s.Lock() s.Lock()
_, ok = s.subscribers[sub] _, ok = s.subscribers[sub]
if ok { if ok {

View File

@ -2,6 +2,7 @@ package server
import ( import (
"bytes" "bytes"
"fmt"
"reflect" "reflect"
"github.com/micro/go-micro/broker" "github.com/micro/go-micro/broker"
@ -11,6 +12,10 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
const (
subSig = "func(context.Context, interface{}) error"
)
type handler struct { type handler struct {
method reflect.Value method reflect.Value
reqType reflect.Type reqType reflect.Type
@ -94,6 +99,61 @@ func newSubscriber(topic string, sub interface{}) Subscriber {
} }
} }
func validateSubscriber(sub Subscriber) error {
typ := reflect.TypeOf(sub.Subscriber())
var argType reflect.Type
if typ.Kind() == reflect.Func {
name := "Func"
switch typ.NumIn() {
case 2:
argType = typ.In(1)
default:
return fmt.Errorf("subscriber %v takes wrong number of args: %v required signature %s", name, typ.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("subscriber %v argument type not exported: %v", name, argType)
}
if typ.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of outs: %v require signature %s",
name, typ.NumOut(), subSig)
}
if returnType := typ.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v returns %v not error", name, returnType.String())
}
} else {
hdlr := reflect.ValueOf(sub.Subscriber())
name := reflect.Indirect(hdlr).Type().Name()
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
switch method.Type.NumIn() {
case 3:
argType = method.Type.In(2)
default:
return fmt.Errorf("subscriber %v.%v takes wrong number of args: %v required signature %s",
name, method.Name, method.Type.NumIn(), subSig)
}
if !isExportedOrBuiltinType(argType) {
return fmt.Errorf("%v argument type not exported: %v", name, argType)
}
if method.Type.NumOut() != 1 {
return fmt.Errorf(
"subscriber %v.%v has wrong number of outs: %v require signature %s",
name, method.Name, method.Type.NumOut(), subSig)
}
if returnType := method.Type.Out(0); returnType != typeOfError {
return fmt.Errorf("subscriber %v.%v returns %v not error", name, method.Name, returnType.String())
}
}
}
return nil
}
func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler { func (s *rpcServer) createSubHandler(sb *subscriber) broker.Handler {
return func(msg *broker.Message) { return func(msg *broker.Message) {
cf, err := s.newCodec(msg.Header["Content-Type"]) cf, err := s.newCodec(msg.Header["Content-Type"])