1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-05 10:20:53 +02:00
go-micro/plugins/broker/nsq/nsq.go
2021-01-20 21:01:10 +00:00

365 lines
6.3 KiB
Go

// Package nsq provides an NSQ broker
package nsq
import (
"context"
"math/rand"
"sync"
"time"
"github.com/google/uuid"
"github.com/asim/go-micro/v3/broker"
"github.com/asim/go-micro/v3/codec/json"
"github.com/asim/go-micro/v3/cmd"
"github.com/nsqio/go-nsq"
)
type nsqBroker struct {
lookupdAddrs []string
addrs []string
opts broker.Options
config *nsq.Config
sync.Mutex
running bool
p []*nsq.Producer
c []*subscriber
}
type publication struct {
topic string
m *broker.Message
nm *nsq.Message
opts broker.PublishOptions
err error
}
type subscriber struct {
topic string
opts broker.SubscribeOptions
c *nsq.Consumer
// handler so we can resubcribe
h nsq.HandlerFunc
// concurrency
n int
}
var (
DefaultConcurrentHandlers = 1
)
func init() {
rand.Seed(time.Now().UnixNano())
cmd.DefaultBrokers["nsq"] = NewBroker
}
func (n *nsqBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&n.opts)
}
var addrs []string
for _, addr := range n.opts.Addrs {
if len(addr) > 0 {
addrs = append(addrs, addr)
}
}
if len(addrs) == 0 {
addrs = []string{"127.0.0.1:4150"}
}
n.addrs = addrs
n.configure(n.opts.Context)
return nil
}
func (n *nsqBroker) configure(ctx context.Context) {
if v, ok := ctx.Value(lookupdAddrsKey{}).([]string); ok {
n.lookupdAddrs = v
}
if v, ok := ctx.Value(consumerOptsKey{}).([]string); ok {
cfgFlag := &nsq.ConfigFlag{Config: n.config}
for _, opt := range v {
cfgFlag.Set(opt)
}
}
}
func (n *nsqBroker) Options() broker.Options {
return n.opts
}
func (n *nsqBroker) Address() string {
return n.addrs[rand.Intn(len(n.addrs))]
}
func (n *nsqBroker) Connect() error {
n.Lock()
defer n.Unlock()
if n.running {
return nil
}
producers := make([]*nsq.Producer, 0, len(n.addrs))
// create producers
for _, addr := range n.addrs {
p, err := nsq.NewProducer(addr, n.config)
if err != nil {
return err
}
if err = p.Ping(); err != nil {
return err
}
producers = append(producers, p)
}
// create consumers
for _, c := range n.c {
channel := c.opts.Queue
if len(channel) == 0 {
channel = uuid.New().String() + "#ephemeral"
}
cm, err := nsq.NewConsumer(c.topic, channel, n.config)
if err != nil {
return err
}
cm.AddConcurrentHandlers(c.h, c.n)
c.c = cm
if len(n.lookupdAddrs) > 0 {
c.c.ConnectToNSQLookupds(n.lookupdAddrs)
} else {
err = c.c.ConnectToNSQDs(n.addrs)
if err != nil {
return err
}
}
}
n.p = producers
n.running = true
return nil
}
func (n *nsqBroker) Disconnect() error {
n.Lock()
defer n.Unlock()
if !n.running {
return nil
}
// stop the producers
for _, p := range n.p {
p.Stop()
}
// stop the consumers
for _, c := range n.c {
c.c.Stop()
if len(n.lookupdAddrs) > 0 {
// disconnect from all lookupd
for _, addr := range n.lookupdAddrs {
c.c.DisconnectFromNSQLookupd(addr)
}
} else {
// disconnect from all nsq brokers
for _, addr := range n.addrs {
c.c.DisconnectFromNSQD(addr)
}
}
}
n.p = nil
n.running = false
return nil
}
func (n *nsqBroker) Publish(topic string, message *broker.Message, opts ...broker.PublishOption) error {
p := n.p[rand.Intn(len(n.p))]
options := broker.PublishOptions{}
for _, o := range opts {
o(&options)
}
var (
doneChan chan *nsq.ProducerTransaction
delay time.Duration
)
if options.Context != nil {
if v, ok := options.Context.Value(asyncPublishKey{}).(chan *nsq.ProducerTransaction); ok {
doneChan = v
}
if v, ok := options.Context.Value(deferredPublishKey{}).(time.Duration); ok {
delay = v
}
}
b, err := n.opts.Codec.Marshal(message)
if err != nil {
return err
}
if doneChan != nil {
if delay > 0 {
return p.DeferredPublishAsync(topic, delay, b, doneChan)
}
return p.PublishAsync(topic, b, doneChan)
} else {
if delay > 0 {
return p.DeferredPublish(topic, delay, b)
}
return p.Publish(topic, b)
}
}
func (n *nsqBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
options := broker.SubscribeOptions{
AutoAck: true,
}
for _, o := range opts {
o(&options)
}
concurrency, maxInFlight := DefaultConcurrentHandlers, DefaultConcurrentHandlers
if options.Context != nil {
if v, ok := options.Context.Value(concurrentHandlerKey{}).(int); ok {
maxInFlight, concurrency = v, v
}
if v, ok := options.Context.Value(maxInFlightKey{}).(int); ok {
maxInFlight = v
}
}
channel := options.Queue
if len(channel) == 0 {
channel = uuid.New().String() + "#ephemeral"
}
config := *n.config
config.MaxInFlight = maxInFlight
c, err := nsq.NewConsumer(topic, channel, &config)
if err != nil {
return nil, err
}
h := nsq.HandlerFunc(func(nm *nsq.Message) error {
if !options.AutoAck {
nm.DisableAutoResponse()
}
var m broker.Message
if err := n.opts.Codec.Unmarshal(nm.Body, &m); err != nil {
return err
}
p := &publication{topic: topic, m: &m}
p.err = handler(p)
return p.err
})
c.AddConcurrentHandlers(h, concurrency)
if len(n.lookupdAddrs) > 0 {
err = c.ConnectToNSQLookupds(n.lookupdAddrs)
} else {
err = c.ConnectToNSQDs(n.addrs)
}
if err != nil {
return nil, err
}
sub := &subscriber{
c: c,
opts: options,
topic: topic,
h: h,
n: concurrency,
}
n.c = append(n.c, sub)
return sub, nil
}
func (n *nsqBroker) String() string {
return "nsq"
}
func (p *publication) Topic() string {
return p.topic
}
func (p *publication) Message() *broker.Message {
return p.m
}
func (p *publication) Ack() error {
p.nm.Finish()
return nil
}
func (p *publication) Error() error {
return p.err
}
func (s *subscriber) Options() broker.SubscribeOptions {
return s.opts
}
func (s *subscriber) Topic() string {
return s.topic
}
func (s *subscriber) Unsubscribe() error {
s.c.Stop()
return nil
}
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
// Default codec
Codec: json.Marshaler{},
// Default context
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
var addrs []string
for _, addr := range options.Addrs {
if len(addr) > 0 {
addrs = append(addrs, addr)
}
}
if len(addrs) == 0 {
addrs = []string{"127.0.0.1:4150"}
}
n := &nsqBroker{
addrs: addrs,
opts: options,
config: nsq.NewConfig(),
}
n.configure(n.opts.Context)
return n
}