mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-24 10:07:04 +02:00
249 lines
4.6 KiB
Go
249 lines
4.6 KiB
Go
package mqtt
|
|
|
|
/*
|
|
MQTT is a go-micro Broker for the MQTT protocol.
|
|
This can be integrated with any broker that supports MQTT,
|
|
including Mosquito and AWS IoT.
|
|
|
|
TODO: Strip encoding?
|
|
Where brokers don't support headers we're actually
|
|
encoding the broker.Message in json to simplify usage
|
|
and cross broker compatibility. To actually use the
|
|
MQTT broker more widely on the internet we may need to
|
|
support stripping the encoding.
|
|
|
|
Note: Because of the way the MQTT library works, when you
|
|
unsubscribe from a topic it will unsubscribe all subscribers.
|
|
TODO: Perhaps create a unique client per subscription.
|
|
Becomes slightly more difficult to track a disconnect.
|
|
|
|
*/
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/micro/go-micro/broker"
|
|
)
|
|
|
|
type mqttBroker struct {
|
|
addrs []string
|
|
opts broker.Options
|
|
client mqtt.Client
|
|
}
|
|
|
|
func init() {
|
|
rand.Seed(time.Now().UnixNano())
|
|
}
|
|
|
|
func setAddrs(addrs []string) []string {
|
|
var cAddrs []string
|
|
|
|
for _, addr := range addrs {
|
|
if len(addr) == 0 {
|
|
continue
|
|
}
|
|
|
|
var scheme string
|
|
var host string
|
|
var port int
|
|
|
|
// split on scheme
|
|
parts := strings.Split(addr, "://")
|
|
|
|
// no scheme
|
|
if len(parts) < 2 {
|
|
// default tcp scheme
|
|
scheme = "tcp"
|
|
parts = strings.Split(parts[0], ":")
|
|
// got scheme
|
|
} else {
|
|
scheme = parts[0]
|
|
parts = strings.Split(parts[1], ":")
|
|
}
|
|
|
|
// no parts
|
|
if len(parts) == 0 {
|
|
continue
|
|
}
|
|
|
|
// check scheme
|
|
switch scheme {
|
|
case "tcp", "ssl", "ws":
|
|
default:
|
|
continue
|
|
}
|
|
|
|
if len(parts) < 2 {
|
|
// no port
|
|
host = parts[0]
|
|
|
|
switch scheme {
|
|
case "tcp":
|
|
port = 1883
|
|
case "ssl":
|
|
port = 8883
|
|
case "ws":
|
|
// support secure port
|
|
port = 80
|
|
default:
|
|
port = 1883
|
|
}
|
|
// got host port
|
|
} else {
|
|
host = parts[0]
|
|
port, _ = strconv.Atoi(parts[1])
|
|
}
|
|
|
|
addr = fmt.Sprintf("%s://%s:%d", scheme, host, port)
|
|
cAddrs = append(cAddrs, addr)
|
|
|
|
}
|
|
|
|
// default an address if we have none
|
|
if len(cAddrs) == 0 {
|
|
cAddrs = []string{"tcp://127.0.0.1:1883"}
|
|
}
|
|
|
|
return cAddrs
|
|
}
|
|
|
|
func newClient(addrs []string, opts broker.Options) mqtt.Client {
|
|
// create opts
|
|
cOpts := mqtt.NewClientOptions()
|
|
cOpts.SetClientID(fmt.Sprintf("%d%d", time.Now().UnixNano(), rand.Intn(10)))
|
|
cOpts.SetCleanSession(false)
|
|
|
|
// setup tls
|
|
if opts.TLSConfig != nil {
|
|
cOpts.SetTLSConfig(opts.TLSConfig)
|
|
}
|
|
|
|
// add brokers
|
|
for _, addr := range addrs {
|
|
cOpts.AddBroker(addr)
|
|
}
|
|
|
|
return mqtt.NewClient(cOpts)
|
|
}
|
|
|
|
func newBroker(opts ...broker.Option) broker.Broker {
|
|
var options broker.Options
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
addrs := setAddrs(options.Addrs)
|
|
client := newClient(addrs, options)
|
|
|
|
return &mqttBroker{
|
|
opts: options,
|
|
client: client,
|
|
addrs: addrs,
|
|
}
|
|
}
|
|
|
|
func (m *mqttBroker) Options() broker.Options {
|
|
return m.opts
|
|
}
|
|
|
|
func (m *mqttBroker) Address() string {
|
|
return strings.Join(m.addrs, ",")
|
|
}
|
|
|
|
func (m *mqttBroker) Connect() error {
|
|
if m.client.IsConnected() {
|
|
return nil
|
|
}
|
|
|
|
if t := m.client.Connect(); t.Wait() && t.Error() != nil {
|
|
return t.Error()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *mqttBroker) Disconnect() error {
|
|
if !m.client.IsConnected() {
|
|
return nil
|
|
}
|
|
m.client.Disconnect(0)
|
|
return nil
|
|
}
|
|
|
|
func (m *mqttBroker) Init(opts ...broker.Option) error {
|
|
if m.client.IsConnected() {
|
|
return errors.New("cannot init while connected")
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(&m.opts)
|
|
}
|
|
|
|
m.addrs = setAddrs(m.opts.Addrs)
|
|
m.client = newClient(m.addrs, m.opts)
|
|
return nil
|
|
}
|
|
|
|
func (m *mqttBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
|
|
if !m.client.IsConnected() {
|
|
return errors.New("not connected")
|
|
}
|
|
|
|
b, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t := m.client.Publish(topic, 1, false, b)
|
|
return t.Error()
|
|
}
|
|
|
|
func (m *mqttBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
|
if !m.client.IsConnected() {
|
|
return nil, errors.New("not connected")
|
|
}
|
|
|
|
var options broker.SubscribeOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
t := m.client.Subscribe(topic, 1, func(c mqtt.Client, m mqtt.Message) {
|
|
var msg *broker.Message
|
|
if err := json.Unmarshal(m.Payload(), &msg); err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
|
|
if err := h(&mqttPub{topic: topic, msg: msg}); err != nil {
|
|
log.Println(err)
|
|
}
|
|
})
|
|
|
|
if t.Wait() && t.Error() != nil {
|
|
return nil, t.Error()
|
|
}
|
|
|
|
return &mqttSub{
|
|
opts: options,
|
|
client: m.client,
|
|
topic: topic,
|
|
}, nil
|
|
}
|
|
|
|
func (m *mqttBroker) String() string {
|
|
return "mqtt"
|
|
}
|
|
|
|
func NewBroker(opts ...broker.Option) broker.Broker {
|
|
return newBroker(opts...)
|
|
}
|