From d74cf95278c83a3b3ef29aee3ba3ab9a27eb23b3 Mon Sep 17 00:00:00 2001 From: Asim Date: Sat, 23 Apr 2016 00:06:29 +0100 Subject: [PATCH] First for mqtt broker --- broker/mqtt/mqtt.go | 221 ++++++++++++++++++++++++++++++++++++ broker/mqtt/mqtt_handler.go | 46 ++++++++ 2 files changed, 267 insertions(+) create mode 100644 broker/mqtt/mqtt.go create mode 100644 broker/mqtt/mqtt_handler.go diff --git a/broker/mqtt/mqtt.go b/broker/mqtt/mqtt.go new file mode 100644 index 00000000..9b2e707a --- /dev/null +++ b/broker/mqtt/mqtt.go @@ -0,0 +1,221 @@ +package mqtt + +/* + MQTT is a go-micro Broker for the MQTT protocol. + This can be integrated with any broker that supports MQTT, + including Mosquito and AWS IoT. + + TODO: Add encoding + Usually we'll translate Message headers over to + the equivalent in a broker. In MQTT we don't have that + and we don't want to assume data format because the whole + point is that it could be anything. So we'll defer for now. + + Note: Because of the way the MQTT library works, when you + unsubscribe from a topic it will unsubscribe all subscribers. + TODO: Perhaps create a unique client per subscription. + Becomes slightly more difficult to track a disconnect. + +*/ + +import ( + "errors" + "fmt" + "log" + "math/rand" + "strconv" + "strings" + "time" + + "github.com/micro/go-micro/broker" + "github.com/micro/go-micro/cmd" + + "github.com/eclipse/paho.mqtt.golang" +) + +type mqttBroker struct { + addrs []string + opts broker.Options + client mqtt.Client +} + +func init() { + rand.Seed(time.Now().UnixNano()) + cmd.DefaultBrokers["mqtt"] = NewBroker +} + +func setAddrs(addrs []string) []string { + var cAddrs []string + + for _, addr := range addrs { + if len(addr) == 0 { + continue + } + + var scheme string + var host string + var port int + + // split on scheme + parts := strings.Split(addr, "://") + + // no scheme + if len(parts) < 2 { + // default tcp scheme + scheme = "tcp" + parts = strings.Split(parts[0], ":") + // got scheme + } else { + scheme = parts[0] + parts = strings.Split(parts[1], ":") + } + + // no parts + if len(parts) == 0 { + continue + } + + // check scheme + switch scheme { + case "tcp", "ssl", "ws": + default: + continue + } + + if len(parts) < 2 { + // no port + host = parts[0] + + switch scheme { + case "tcp": + port = 1883 + case "ssl": + port = 8883 + case "ws": + // support secure port + port = 80 + default: + port = 1883 + } + // got host port + } else { + host = parts[0] + port, _ = strconv.Atoi(parts[1]) + } + + addr = fmt.Sprintf("%s://%s:%d", scheme, host, port) + cAddrs = append(cAddrs, addr) + + } + + // default an address if we have none + if len(cAddrs) == 0 { + cAddrs = []string{"tcp://127.0.0.1:1883"} + } + + return cAddrs +} + +func newClient(addrs []string, opts broker.Options) mqtt.Client { + // create opts + cOpts := mqtt.NewClientOptions() + cOpts.SetAutoReconnect(true) + cOpts.SetClientID(fmt.Sprintf("%d-%d", rand.Intn(10), time.Now().UnixNano())) + + // setup tls + if opts.TLSConfig != nil { + cOpts.SetTLSConfig(opts.TLSConfig) + } + + // add brokers + for _, addr := range addrs { + cOpts.AddBroker(addr) + } + + return mqtt.NewClient(cOpts) +} + +func newBroker(opts ...broker.Option) broker.Broker { + var options broker.Options + for _, o := range opts { + o(&options) + } + + addrs := setAddrs(options.Addrs) + client := newClient(addrs, options) + + return &mqttBroker{ + opts: options, + client: client, + addrs: addrs, + } +} + +func (m *mqttBroker) Options() broker.Options { + return m.opts +} + +func (m *mqttBroker) Address() string { + return strings.Join(m.addrs, ",") +} + +func (m *mqttBroker) Connect() error { + t := m.client.Connect() + return t.Error() +} + +func (m *mqttBroker) Disconnect() error { + m.client.Disconnect(0) + return nil +} + +func (m *mqttBroker) Init(opts ...broker.Option) error { + if m.client.IsConnected() { + return errors.New("cannot init while connected") + } + + for _, o := range opts { + o(&m.opts) + } + + m.addrs = setAddrs(m.opts.Addrs) + m.client = newClient(m.addrs, m.opts) + return nil +} + +func (m *mqttBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { + // TODO: Support encoding to preserve headers + t := m.client.Publish(topic, 0, false, msg.Body) + return t.Error() +} + +func (m *mqttBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + var options broker.SubscribeOptions + for _, o := range opts { + o(&options) + } + + t := m.client.Subscribe(topic, 0, func(c mqtt.Client, m mqtt.Message) { + if err := h(&mqttPub{msg: m}); err != nil { + log.Println(err) + } + }) + + if t.Error() != nil { + return nil, t.Error() + } + + return &mqttSub{ + opts: options, + client: m.client, + topic: topic, + }, nil +} + +func (m *mqttBroker) String() string { + return "mqtt" +} + +func NewBroker(opts ...broker.Option) broker.Broker { + return newBroker(opts...) +} diff --git a/broker/mqtt/mqtt_handler.go b/broker/mqtt/mqtt_handler.go new file mode 100644 index 00000000..6300f8d9 --- /dev/null +++ b/broker/mqtt/mqtt_handler.go @@ -0,0 +1,46 @@ +package mqtt + +import ( + "github.com/eclipse/paho.mqtt.golang" + "github.com/micro/go-micro/broker" +) + +// mqttPub is a broker.Publication +type mqttPub struct { + msg mqtt.Message +} + +// mqttPub is a broker.Subscriber +type mqttSub struct { + opts broker.SubscribeOptions + topic string + client mqtt.Client +} + +func (m *mqttPub) Ack() error { + return nil +} + +func (m *mqttPub) Topic() string { + return m.msg.Topic() +} + +func (m *mqttPub) Message() *broker.Message { + // TODO: Support encoding to preserve headers + return &broker.Message{ + Body: m.msg.Payload(), + } +} + +func (m *mqttSub) Options() broker.SubscribeOptions { + return m.opts +} + +func (m *mqttSub) Topic() string { + return m.topic +} + +func (m *mqttSub) Unsubscribe() error { + t := m.client.Unsubscribe(m.topic) + return t.Error() +}