From 6097c3296cf484672f260dc8242610a7f5f954ed Mon Sep 17 00:00:00 2001 From: Asim Date: Wed, 23 Dec 2015 20:05:47 +0000 Subject: [PATCH] Shared queue distribution of messages --- broker/http_broker.go | 54 +++++++++++++++++++++++++++++++++++-------- broker/options.go | 10 ++++++++ 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/broker/http_broker.go b/broker/http_broker.go index 83858a78..d2dc5094 100644 --- a/broker/http_broker.go +++ b/broker/http_broker.go @@ -4,12 +4,15 @@ import ( "bytes" "encoding/json" "fmt" + "io" "io/ioutil" + "math/rand" "net" "net/http" "strconv" "strings" "sync" + "time" log "github.com/golang/glog" "github.com/micro/go-micro/errors" @@ -47,9 +50,14 @@ type httpPublication struct { } var ( - DefaultSubPath = "/_sub" + DefaultSubPath = "/_sub" + broadcastVersion = "ff.http.broadcast" ) +func init() { + rand.Seed(time.Now().Unix()) +} + func newHttpBroker(addrs []string, opt ...Option) Broker { addr := ":0" if len(addrs) > 0 && len(addrs[0]) > 0 { @@ -217,14 +225,36 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) return err } - for _, service := range s { - for _, node := range service.Nodes { - r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", bytes.NewBuffer(b)) - if err == nil { - r.Body.Close() - } + fn := func(node *registry.Node, b io.Reader) { + r, err := http.Post(fmt.Sprintf("http://%s:%d%s", node.Address, node.Port, DefaultSubPath), "application/json", b) + if err == nil { + r.Body.Close() } } + + buf := bytes.NewBuffer(nil) + + for _, service := range s { + // broadcast version means broadcast to all nodes + if service.Version == broadcastVersion { + for _, node := range service.Nodes { + buf.Reset() + buf.Write(b) + fn(node, buf) + } + return nil + } + + node := service.Nodes[rand.Int()%len(service.Nodes)] + buf.Reset() + buf.Write(b) + fn(node, buf) + return nil + } + + buf.Reset() + buf = nil + return nil } @@ -243,9 +273,15 @@ func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeO Port: port, } + version := opt.Queue + if len(version) == 0 { + version = broadcastVersion + } + service := ®istry.Service{ - Name: "topic:" + topic, - Nodes: []*registry.Node{node}, + Name: "topic:" + topic, + Version: version, + Nodes: []*registry.Node{node}, } subscriber := &httpSubscriber{ diff --git a/broker/options.go b/broker/options.go index 935aff5b..12295781 100644 --- a/broker/options.go +++ b/broker/options.go @@ -9,6 +9,10 @@ type SubscribeOptions struct { AutoAck bool // NumHandlers defaults to 1 NumHandlers int + // Subscribers with the same queue name + // will create a shared subscription where each + // receives a subset of messages. + Queue string } type Option func(*Options) @@ -33,6 +37,12 @@ func NumHandlers(i int) SubscribeOption { } } +func QueueName(name string) SubscribeOption { + return func(o *SubscribeOptions) { + o.Queue = name + } +} + func newSubscribeOptions(opts ...SubscribeOption) SubscribeOptions { opt := SubscribeOptions{ AutoAck: true,