1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-23 17:53:05 +02:00
go-micro/tunnel/broker/broker.go
Vasiliy Tolstov f23638c036 fix import paths for v2 release
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
2020-01-30 14:44:40 +03:00

214 lines
3.9 KiB
Go

// Package broker is a tunnel broker
package broker
import (
"context"
"github.com/micro/go-micro/v2/broker"
"github.com/micro/go-micro/v2/transport"
"github.com/micro/go-micro/v2/tunnel"
)
type tunBroker struct {
opts broker.Options
tunnel tunnel.Tunnel
}
type tunSubscriber struct {
topic string
handler broker.Handler
opts broker.SubscribeOptions
closed chan bool
listener tunnel.Listener
}
type tunEvent struct {
topic string
message *broker.Message
}
// used to access tunnel from options context
type tunnelKey struct{}
type tunnelAddr struct{}
func (t *tunBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
func (t *tunBroker) Options() broker.Options {
return t.opts
}
func (t *tunBroker) Address() string {
return t.tunnel.Address()
}
func (t *tunBroker) Connect() error {
return t.tunnel.Connect()
}
func (t *tunBroker) Disconnect() error {
return t.tunnel.Close()
}
func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.PublishOption) error {
// TODO: this is probably inefficient, we might want to just maintain an open connection
// it may be easier to add broadcast to the tunnel
c, err := t.tunnel.Dial(topic, tunnel.DialMode(tunnel.Multicast))
if err != nil {
return err
}
defer c.Close()
return c.Send(&transport.Message{
Header: m.Header,
Body: m.Body,
})
}
func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
l, err := t.tunnel.Listen(topic, tunnel.ListenMode(tunnel.Multicast))
if err != nil {
return nil, err
}
var options broker.SubscribeOptions
for _, o := range opts {
o(&options)
}
tunSub := &tunSubscriber{
topic: topic,
handler: h,
opts: options,
closed: make(chan bool),
listener: l,
}
// start processing
go tunSub.run()
return tunSub, nil
}
func (t *tunBroker) String() string {
return "tunnel"
}
func (t *tunSubscriber) run() {
for {
// accept a new connection
c, err := t.listener.Accept()
if err != nil {
select {
case <-t.closed:
return
default:
continue
}
}
// receive message
m := new(transport.Message)
if err := c.Recv(m); err != nil {
c.Close()
continue
}
// close the connection
c.Close()
// handle the message
go t.handler(&tunEvent{
topic: t.topic,
message: &broker.Message{
Header: m.Header,
Body: m.Body,
},
})
}
}
func (t *tunSubscriber) Options() broker.SubscribeOptions {
return t.opts
}
func (t *tunSubscriber) Topic() string {
return t.topic
}
func (t *tunSubscriber) Unsubscribe() error {
select {
case <-t.closed:
return nil
default:
close(t.closed)
return t.listener.Close()
}
}
func (t *tunEvent) Topic() string {
return t.topic
}
func (t *tunEvent) Message() *broker.Message {
return t.message
}
func (t *tunEvent) Ack() error {
return nil
}
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
if !ok {
t = tunnel.NewTunnel()
}
a, ok := options.Context.Value(tunnelAddr{}).(string)
if ok {
// initialise address
t.Init(tunnel.Address(a))
}
if len(options.Addrs) > 0 {
// initialise nodes
t.Init(tunnel.Nodes(options.Addrs...))
}
return &tunBroker{
opts: options,
tunnel: t,
}
}
// WithAddress sets the tunnel address
func WithAddress(a string) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
}
}
// WithTunnel sets the internal tunnel
func WithTunnel(t tunnel.Tunnel) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, tunnelKey{}, t)
}
}