1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

Move publication to message

This commit is contained in:
Asim Aslam 2018-04-14 18:21:02 +01:00
parent 65068e8b82
commit d00d76bf7c
5 changed files with 14 additions and 14 deletions

View File

@ -23,7 +23,7 @@ func fnHandlerWrapper(f Function) server.HandlerWrapper {
func fnSubWrapper(f Function) server.SubscriberWrapper { func fnSubWrapper(f Function) server.SubscriberWrapper {
return func(s server.SubscriberFunc) server.SubscriberFunc { return func(s server.SubscriberFunc) server.SubscriberFunc {
return func(ctx context.Context, msg server.Publication) error { return func(ctx context.Context, msg server.Message) error {
defer f.Done() defer f.Done()
return s(ctx, msg) return s(ctx, msg)
} }

View File

@ -8,10 +8,10 @@ type rpcRequest struct {
stream bool stream bool
} }
type rpcPublication struct { type rpcMessage struct {
topic string topic string
contentType string contentType string
message interface{} payload interface{}
} }
func (r *rpcRequest) ContentType() string { func (r *rpcRequest) ContentType() string {
@ -34,14 +34,14 @@ func (r *rpcRequest) Stream() bool {
return r.stream return r.stream
} }
func (r *rpcPublication) ContentType() string { func (r *rpcMessage) ContentType() string {
return r.contentType return r.contentType
} }
func (r *rpcPublication) Topic() string { func (r *rpcMessage) Topic() string {
return r.topic return r.topic
} }
func (r *rpcPublication) Message() interface{} { func (r *rpcMessage) Payload() interface{} {
return r.message return r.payload
} }

View File

@ -21,9 +21,9 @@ type Server interface {
String() string String() string
} }
type Publication interface { type Message interface {
Topic() string Topic() string
Message() interface{} Payload() interface{}
ContentType() string ContentType() string
} }

View File

@ -12,7 +12,7 @@ type HandlerFunc func(ctx context.Context, req Request, rsp interface{}) error
// SubscriberFunc represents a single method of a subscriber. It's used primarily // SubscriberFunc represents a single method of a subscriber. It's used primarily
// for the wrappers. What's handed to the actual method is the concrete // for the wrappers. What's handed to the actual method is the concrete
// publication message. // publication message.
type SubscriberFunc func(ctx context.Context, msg Publication) error type SubscriberFunc func(ctx context.Context, msg Message) error
// HandlerWrapper wraps the HandlerFunc and returns the equivalent // HandlerWrapper wraps the HandlerFunc and returns the equivalent
type HandlerWrapper func(HandlerFunc) HandlerFunc type HandlerWrapper func(HandlerFunc) HandlerFunc

View File

@ -204,7 +204,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
return err return err
} }
fn := func(ctx context.Context, msg Publication) error { fn := func(ctx context.Context, msg Message) error {
var vals []reflect.Value var vals []reflect.Value
if sb.typ.Kind() != reflect.Func { if sb.typ.Kind() != reflect.Func {
vals = append(vals, sb.rcvr) vals = append(vals, sb.rcvr)
@ -213,7 +213,7 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
vals = append(vals, reflect.ValueOf(ctx)) vals = append(vals, reflect.ValueOf(ctx))
} }
vals = append(vals, reflect.ValueOf(msg.Message())) vals = append(vals, reflect.ValueOf(msg.Payload()))
returnValues := handler.method.Call(vals) returnValues := handler.method.Call(vals)
if err := returnValues[0].Interface(); err != nil { if err := returnValues[0].Interface(); err != nil {
@ -229,10 +229,10 @@ func (s *rpcServer) createSubHandler(sb *subscriber, opts Options) broker.Handle
s.wg.Add(1) s.wg.Add(1)
go func() { go func() {
defer s.wg.Done() defer s.wg.Done()
fn(ctx, &rpcPublication{ fn(ctx, &rpcMessage{
topic: sb.topic, topic: sb.topic,
contentType: ct, contentType: ct,
message: req.Interface(), payload: req.Interface(),
}) })
}() }()
} }