diff --git a/broker/broker.go b/broker/broker.go index 2cd0de34..e63ad90e 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -31,12 +31,22 @@ type Publication interface { Ack() error } +// Subscriber is a convenience return type for the Subscribe method type Subscriber interface { Options() SubscribeOptions Topic() string Unsubscribe() error } +// Codec is used for encoding where the broker doesn't natively support +// headers in the message type. In this case the entire message is +// encoded as the payload +type Codec interface { + Marshal(interface{}) ([]byte, error) + Unmarshal([]byte, interface{}) error + String() string +} + var ( DefaultBroker Broker = newHttpBroker() ) diff --git a/broker/codec.go b/broker/codec.go new file mode 100644 index 00000000..2affd780 --- /dev/null +++ b/broker/codec.go @@ -0,0 +1,19 @@ +package broker + +import ( + "encoding/json" +) + +type jsonCodec struct{} + +func (j jsonCodec) Marshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (j jsonCodec) Unmarshal(d []byte, v interface{}) error { + return json.Unmarshal(d, v) +} + +func (j jsonCodec) String() string { + return "json" +} diff --git a/broker/codec/json/json.go b/broker/codec/json/json.go new file mode 100644 index 00000000..f0cb8736 --- /dev/null +++ b/broker/codec/json/json.go @@ -0,0 +1,25 @@ +package json + +import ( + "encoding/json" + + "github.com/micro/go-micro/broker" +) + +type jsonCodec struct{} + +func (j jsonCodec) Marshal(v interface{}) ([]byte, error) { + return json.Marshal(v) +} + +func (j jsonCodec) Unmarshal(d []byte, v interface{}) error { + return json.Unmarshal(d, v) +} + +func (j jsonCodec) String() string { + return "json" +} + +func NewCodec() broker.Codec { + return jsonCodec{} +} diff --git a/broker/codec/noop/noop.go b/broker/codec/noop/noop.go new file mode 100644 index 00000000..81918ed3 --- /dev/null +++ b/broker/codec/noop/noop.go @@ -0,0 +1,34 @@ +package noop + +import ( + "errors" + + "github.com/micro/go-micro/broker" +) + +type noopCodec struct{} + +func (n noopCodec) Marshal(v interface{}) ([]byte, error) { + msg, ok := v.(*broker.Message) + if !ok { + return nil, errors.New("invalid message") + } + return msg.Body, nil +} + +func (n noopCodec) Unmarshal(d []byte, v interface{}) error { + msg, ok := v.(*broker.Message) + if !ok { + return errors.New("invalid message") + } + msg.Body = d + return nil +} + +func (n noopCodec) String() string { + return "noop" +} + +func NewCodec() broker.Codec { + return noopCodec{} +} diff --git a/broker/http_broker.go b/broker/http_broker.go index d135683c..c2d39eed 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -3,7 +3,6 @@ package broker import ( "bytes" "crypto/tls" - "encoding/json" "fmt" "io" "io/ioutil" @@ -96,6 +95,7 @@ func newTransport(config *tls.Config) *http.Transport { func newHttpBroker(opts ...Option) Broker { options := Options{ + Codec: jsonCodec{}, Context: context.TODO(), } @@ -269,7 +269,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { } var m *Message - if err = json.Unmarshal(b, &m); err != nil { + if err = h.opts.Codec.Unmarshal(b, &m); err != nil { errr := errors.InternalServerError("go.micro.broker", fmt.Sprintf("Error parsing request body: %v", err)) w.WriteHeader(500) w.Write([]byte(errr.Error())) @@ -352,7 +352,7 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) m.Header[":topic"] = topic - b, err := json.Marshal(m) + b, err := h.opts.Codec.Marshal(m) if err != nil { return err } diff --git a/broker/options.go b/broker/options.go index 6256cac5..7824a97e 100644 --- a/broker/options.go +++ b/broker/options.go @@ -10,6 +10,7 @@ import ( type Options struct { Addrs []string Secure bool + Codec Codec TLSConfig *tls.Config // Other options for implementations of the interface @@ -96,6 +97,14 @@ func Secure(b bool) Option { } } +// Codec sets the codec used for encoding/decoding used where +// a broker does not support headers +func SetCodec(c Codec) Option { + return func(o *Options) { + o.Codec = c + } +} + // Specify TLS Config func TLSConfig(t *tls.Config) Option { return func(o *Options) {