mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-24 10:07:04 +02:00
8ee5607254
* broker ErrorHandler option Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * rewrite Event interface, add error Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * implement new interface Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * change ErrorHandler func to broker.Handler Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org> * fix Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
218 lines
3.9 KiB
Go
218 lines
3.9 KiB
Go
// Package broker is a tunnel broker
|
|
package broker
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/micro/go-micro/v2/broker"
|
|
"github.com/micro/go-micro/v2/transport"
|
|
"github.com/micro/go-micro/v2/tunnel"
|
|
)
|
|
|
|
type tunBroker struct {
|
|
opts broker.Options
|
|
tunnel tunnel.Tunnel
|
|
}
|
|
|
|
type tunSubscriber struct {
|
|
topic string
|
|
handler broker.Handler
|
|
opts broker.SubscribeOptions
|
|
|
|
closed chan bool
|
|
listener tunnel.Listener
|
|
}
|
|
|
|
type tunEvent struct {
|
|
topic string
|
|
message *broker.Message
|
|
}
|
|
|
|
// used to access tunnel from options context
|
|
type tunnelKey struct{}
|
|
type tunnelAddr struct{}
|
|
|
|
func (t *tunBroker) Init(opts ...broker.Option) error {
|
|
for _, o := range opts {
|
|
o(&t.opts)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *tunBroker) Options() broker.Options {
|
|
return t.opts
|
|
}
|
|
|
|
func (t *tunBroker) Address() string {
|
|
return t.tunnel.Address()
|
|
}
|
|
|
|
func (t *tunBroker) Connect() error {
|
|
return t.tunnel.Connect()
|
|
}
|
|
|
|
func (t *tunBroker) Disconnect() error {
|
|
return t.tunnel.Close()
|
|
}
|
|
|
|
func (t *tunBroker) Publish(topic string, m *broker.Message, opts ...broker.PublishOption) error {
|
|
// TODO: this is probably inefficient, we might want to just maintain an open connection
|
|
// it may be easier to add broadcast to the tunnel
|
|
c, err := t.tunnel.Dial(topic, tunnel.DialMode(tunnel.Multicast))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer c.Close()
|
|
|
|
return c.Send(&transport.Message{
|
|
Header: m.Header,
|
|
Body: m.Body,
|
|
})
|
|
}
|
|
|
|
func (t *tunBroker) Subscribe(topic string, h broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
|
|
l, err := t.tunnel.Listen(topic, tunnel.ListenMode(tunnel.Multicast))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var options broker.SubscribeOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
tunSub := &tunSubscriber{
|
|
topic: topic,
|
|
handler: h,
|
|
opts: options,
|
|
closed: make(chan bool),
|
|
listener: l,
|
|
}
|
|
|
|
// start processing
|
|
go tunSub.run()
|
|
|
|
return tunSub, nil
|
|
}
|
|
|
|
func (t *tunBroker) String() string {
|
|
return "tunnel"
|
|
}
|
|
|
|
func (t *tunSubscriber) run() {
|
|
for {
|
|
// accept a new connection
|
|
c, err := t.listener.Accept()
|
|
if err != nil {
|
|
select {
|
|
case <-t.closed:
|
|
return
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
|
|
// receive message
|
|
m := new(transport.Message)
|
|
if err := c.Recv(m); err != nil {
|
|
c.Close()
|
|
continue
|
|
}
|
|
|
|
// close the connection
|
|
c.Close()
|
|
|
|
// handle the message
|
|
go t.handler(&tunEvent{
|
|
topic: t.topic,
|
|
message: &broker.Message{
|
|
Header: m.Header,
|
|
Body: m.Body,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
func (t *tunSubscriber) Options() broker.SubscribeOptions {
|
|
return t.opts
|
|
}
|
|
|
|
func (t *tunSubscriber) Topic() string {
|
|
return t.topic
|
|
}
|
|
|
|
func (t *tunSubscriber) Unsubscribe() error {
|
|
select {
|
|
case <-t.closed:
|
|
return nil
|
|
default:
|
|
close(t.closed)
|
|
return t.listener.Close()
|
|
}
|
|
}
|
|
|
|
func (t *tunEvent) Topic() string {
|
|
return t.topic
|
|
}
|
|
|
|
func (t *tunEvent) Message() *broker.Message {
|
|
return t.message
|
|
}
|
|
|
|
func (t *tunEvent) Ack() error {
|
|
return nil
|
|
}
|
|
|
|
func (t *tunEvent) Error() error {
|
|
return nil
|
|
}
|
|
|
|
func NewBroker(opts ...broker.Option) broker.Broker {
|
|
options := broker.Options{
|
|
Context: context.Background(),
|
|
}
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
t, ok := options.Context.Value(tunnelKey{}).(tunnel.Tunnel)
|
|
if !ok {
|
|
t = tunnel.NewTunnel()
|
|
}
|
|
|
|
a, ok := options.Context.Value(tunnelAddr{}).(string)
|
|
if ok {
|
|
// initialise address
|
|
t.Init(tunnel.Address(a))
|
|
}
|
|
|
|
if len(options.Addrs) > 0 {
|
|
// initialise nodes
|
|
t.Init(tunnel.Nodes(options.Addrs...))
|
|
}
|
|
|
|
return &tunBroker{
|
|
opts: options,
|
|
tunnel: t,
|
|
}
|
|
}
|
|
|
|
// WithAddress sets the tunnel address
|
|
func WithAddress(a string) broker.Option {
|
|
return func(o *broker.Options) {
|
|
if o.Context == nil {
|
|
o.Context = context.Background()
|
|
}
|
|
o.Context = context.WithValue(o.Context, tunnelAddr{}, a)
|
|
}
|
|
}
|
|
|
|
// WithTunnel sets the internal tunnel
|
|
func WithTunnel(t tunnel.Tunnel) broker.Option {
|
|
return func(o *broker.Options) {
|
|
if o.Context == nil {
|
|
o.Context = context.Background()
|
|
}
|
|
o.Context = context.WithValue(o.Context, tunnelKey{}, t)
|
|
}
|
|
}
|