diff --git a/broker/http/http.go b/broker/http/http.go deleted file mode 100644 index 269bdba5..00000000 --- a/broker/http/http.go +++ /dev/null @@ -1,710 +0,0 @@ -// Package http provides a http based message broker -package http - -import ( - "bytes" - "context" - "crypto/tls" - "errors" - "fmt" - "io" - "io/ioutil" - "math/rand" - "net" - "net/http" - "net/url" - "runtime" - "sync" - "time" - - "github.com/google/uuid" - "github.com/micro/go-micro/broker" - "github.com/micro/go-micro/codec/json" - "github.com/micro/go-micro/config/cmd" - merr "github.com/micro/go-micro/errors" - "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/registry/cache" - maddr "github.com/micro/go-micro/util/addr" - mnet "github.com/micro/go-micro/util/net" - mls "github.com/micro/go-micro/util/tls" - "golang.org/x/net/http2" -) - -func init() { - cmd.DefaultBrokers["http"] = NewBroker -} - -// HTTP Broker is a point to point async broker -type httpBroker struct { - id string - address string - opts broker.Options - - mux *http.ServeMux - - c *http.Client - r registry.Registry - - sync.RWMutex - subscribers map[string][]*httpSubscriber - running bool - exit chan chan error - - // offline message inbox - mtx sync.RWMutex - inbox map[string][][]byte -} - -type httpSubscriber struct { - opts broker.SubscribeOptions - id string - topic string - fn broker.Handler - svc *registry.Service - hb *httpBroker -} - -type httpEvent struct { - m *broker.Message - t string -} - -var ( - DefaultSubPath = "/_sub" - serviceName = "go.micro.http.broker" - broadcastVersion = "ff.http.broadcast" - registerTTL = time.Minute - registerInterval = time.Second * 30 -) - -func init() { - rand.Seed(time.Now().Unix()) -} - -func newTransport(config *tls.Config) *http.Transport { - if config == nil { - config = &tls.Config{ - InsecureSkipVerify: true, - } - } - - dialTLS := func(network string, addr string) (net.Conn, error) { - return tls.Dial(network, addr, config) - } - - t := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ - Timeout: 30 * time.Second, - KeepAlive: 30 * time.Second, - }).Dial, - TLSHandshakeTimeout: 10 * time.Second, - DialTLS: dialTLS, - } - runtime.SetFinalizer(&t, func(tr **http.Transport) { - (*tr).CloseIdleConnections() - }) - - // setup http2 - http2.ConfigureTransport(t) - - return t -} - -func newHttpBroker(opts ...broker.Option) broker.Broker { - options := broker.Options{ - Codec: json.Marshaler{}, - Context: context.TODO(), - Registry: registry.DefaultRegistry, - } - - for _, o := range opts { - o(&options) - } - - // set address - addr := ":0" - if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { - addr = options.Addrs[0] - } - - h := &httpBroker{ - id: uuid.New().String(), - address: addr, - opts: options, - r: options.Registry, - c: &http.Client{Transport: newTransport(options.TLSConfig)}, - subscribers: make(map[string][]*httpSubscriber), - exit: make(chan chan error), - mux: http.NewServeMux(), - inbox: make(map[string][][]byte), - } - - // specify the message handler - h.mux.Handle(DefaultSubPath, h) - - // get optional handlers - if h.opts.Context != nil { - handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler) - if ok { - for pattern, handler := range handlers { - h.mux.Handle(pattern, handler) - } - } - } - - return h -} - -func (h *httpEvent) Ack() error { - return nil -} - -func (h *httpEvent) Message() *broker.Message { - return h.m -} - -func (h *httpEvent) Topic() string { - return h.t -} - -func (h *httpSubscriber) Options() broker.SubscribeOptions { - return h.opts -} - -func (h *httpSubscriber) Topic() string { - return h.topic -} - -func (h *httpSubscriber) Unsubscribe() error { - return h.hb.unsubscribe(h) -} - -func (h *httpBroker) saveMessage(topic string, msg []byte) { - h.mtx.Lock() - defer h.mtx.Unlock() - - // get messages - c := h.inbox[topic] - - // save message - c = append(c, msg) - - // max length 64 - if len(c) > 64 { - c = c[:64] - } - - // save inbox - h.inbox[topic] = c -} - -func (h *httpBroker) getMessage(topic string, num int) [][]byte { - h.mtx.Lock() - defer h.mtx.Unlock() - - // get messages - c, ok := h.inbox[topic] - if !ok { - return nil - } - - // more message than requests - if len(c) >= num { - msg := c[:num] - h.inbox[topic] = c[num:] - return msg - } - - // reset inbox - h.inbox[topic] = nil - - // return all messages - return c -} - -func (h *httpBroker) subscribe(s *httpSubscriber) error { - h.Lock() - defer h.Unlock() - - if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { - return err - } - - h.subscribers[s.topic] = append(h.subscribers[s.topic], s) - return nil -} - -func (h *httpBroker) unsubscribe(s *httpSubscriber) error { - h.Lock() - defer h.Unlock() - - //nolint:prealloc - var subscribers []*httpSubscriber - - // look for subscriber - for _, sub := range h.subscribers[s.topic] { - // deregister and skip forward - if sub == s { - _ = h.r.Deregister(sub.svc) - continue - } - // keep subscriber - subscribers = append(subscribers, sub) - } - - // set subscribers - h.subscribers[s.topic] = subscribers - - return nil -} - -func (h *httpBroker) run(l net.Listener) { - t := time.NewTicker(registerInterval) - defer t.Stop() - - for { - select { - // heartbeat for each subscriber - case <-t.C: - h.RLock() - for _, subs := range h.subscribers { - for _, sub := range subs { - _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) - } - } - h.RUnlock() - // received exit signal - case ch := <-h.exit: - ch <- l.Close() - h.RLock() - for _, subs := range h.subscribers { - for _, sub := range subs { - _ = h.r.Deregister(sub.svc) - } - } - h.RUnlock() - return - } - } -} - -func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if req.Method != "POST" { - err := merr.BadRequest("go.micro.broker", "Method not allowed") - http.Error(w, err.Error(), http.StatusMethodNotAllowed) - return - } - defer req.Body.Close() - - req.ParseForm() - - b, err := ioutil.ReadAll(req.Body) - if err != nil { - errr := merr.InternalServerError("go.micro.broker", "Error reading request body: %v", err) - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - var m *broker.Message - if err = h.opts.Codec.Unmarshal(b, &m); err != nil { - errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err) - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - topic := m.Header[":topic"] - delete(m.Header, ":topic") - - if len(topic) == 0 { - errr := merr.InternalServerError("go.micro.broker", "Topic not found") - w.WriteHeader(500) - w.Write([]byte(errr.Error())) - return - } - - p := &httpEvent{m: m, t: topic} - id := req.Form.Get("id") - - //nolint:prealloc - var subs []broker.Handler - - h.RLock() - for _, subscriber := range h.subscribers[topic] { - if id != subscriber.id { - continue - } - subs = append(subs, subscriber.fn) - } - h.RUnlock() - - // execute the handler - for _, fn := range subs { - fn(p) - } -} - -func (h *httpBroker) Address() string { - h.RLock() - defer h.RUnlock() - return h.address -} - -func (h *httpBroker) Connect() error { - h.RLock() - if h.running { - h.RUnlock() - return nil - } - h.RUnlock() - - h.Lock() - defer h.Unlock() - - var l net.Listener - var err error - - if h.opts.Secure || h.opts.TLSConfig != nil { - config := h.opts.TLSConfig - - fn := func(addr string) (net.Listener, error) { - if config == nil { - hosts := []string{addr} - - // check if its a valid host:port - if host, _, err := net.SplitHostPort(addr); err == nil { - if len(host) == 0 { - hosts = maddr.IPs() - } else { - hosts = []string{host} - } - } - - // generate a certificate - cert, err := mls.Certificate(hosts...) - if err != nil { - return nil, err - } - config = &tls.Config{Certificates: []tls.Certificate{cert}} - } - return tls.Listen("tcp", addr, config) - } - - l, err = mnet.Listen(h.address, fn) - } else { - fn := func(addr string) (net.Listener, error) { - return net.Listen("tcp", addr) - } - - l, err = mnet.Listen(h.address, fn) - } - - if err != nil { - return err - } - - addr := h.address - h.address = l.Addr().String() - - go http.Serve(l, h.mux) - go func() { - h.run(l) - h.Lock() - h.opts.Addrs = []string{addr} - h.address = addr - h.Unlock() - }() - - // get registry - reg := h.opts.Registry - if reg == nil { - reg = registry.DefaultRegistry - } - // set cache - h.r = cache.New(reg) - - // set running - h.running = true - return nil -} - -func (h *httpBroker) Disconnect() error { - h.RLock() - if !h.running { - h.RUnlock() - return nil - } - h.RUnlock() - - h.Lock() - defer h.Unlock() - - // stop cache - rc, ok := h.r.(cache.Cache) - if ok { - rc.Stop() - } - - // exit and return err - ch := make(chan error) - h.exit <- ch - err := <-ch - - // set not running - h.running = false - return err -} - -func (h *httpBroker) Init(opts ...broker.Option) error { - h.RLock() - if h.running { - h.RUnlock() - return errors.New("cannot init while connected") - } - h.RUnlock() - - h.Lock() - defer h.Unlock() - - for _, o := range opts { - o(&h.opts) - } - - if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 { - h.address = h.opts.Addrs[0] - } - - if len(h.id) == 0 { - h.id = "go.micro.http.broker-" + uuid.New().String() - } - - // get registry - reg := h.opts.Registry - if reg == nil { - reg = registry.DefaultRegistry - } - - // get cache - if rc, ok := h.r.(cache.Cache); ok { - rc.Stop() - } - - // set registry - h.r = cache.New(reg) - - // reconfigure tls config - if c := h.opts.TLSConfig; c != nil { - h.c = &http.Client{ - Transport: newTransport(c), - } - } - - return nil -} - -func (h *httpBroker) Options() broker.Options { - return h.opts -} - -func (h *httpBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error { - // create the message first - m := &broker.Message{ - Header: make(map[string]string), - Body: msg.Body, - } - - for k, v := range msg.Header { - m.Header[k] = v - } - - m.Header[":topic"] = topic - - // encode the message - b, err := h.opts.Codec.Marshal(m) - if err != nil { - return err - } - - // save the message - h.saveMessage(topic, b) - - // now attempt to get the service - h.RLock() - s, err := h.r.GetService(serviceName) - if err != nil { - h.RUnlock() - return err - } - h.RUnlock() - - pub := func(node *registry.Node, t string, b []byte) error { - scheme := "http" - - // check if secure is added in metadata - if node.Metadata["secure"] == "true" { - scheme = "https" - } - - vals := url.Values{} - vals.Add("id", node.Id) - - uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultSubPath, vals.Encode()) - r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) - if err != nil { - return err - } - - // discard response body - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() - return nil - } - - srv := func(s []*registry.Service, b []byte) { - for _, service := range s { - var nodes []*registry.Node - - for _, node := range service.Nodes { - // only use nodes tagged with broker http - if node.Metadata["broker"] != "http" { - continue - } - - // look for nodes for the topic - if node.Metadata["topic"] != topic { - continue - } - - nodes = append(nodes, node) - } - - // only process if we have nodes - if len(nodes) == 0 { - continue - } - - switch service.Version { - // broadcast version means broadcast to all nodes - case broadcastVersion: - var success bool - - // publish to all nodes - for _, node := range nodes { - // publish async - if err := pub(node, topic, b); err == nil { - success = true - } - } - - // save if it failed to publish at least once - if !success { - h.saveMessage(topic, b) - } - default: - // select node to publish to - node := nodes[rand.Int()%len(nodes)] - - // publish async to one node - if err := pub(node, topic, b); err != nil { - // if failed save it - h.saveMessage(topic, b) - } - } - } - } - - // do the rest async - go func() { - // get a third of the backlog - messages := h.getMessage(topic, 8) - delay := (len(messages) > 1) - - // publish all the messages - for _, msg := range messages { - // serialize here - srv(s, msg) - - // sending a backlog of messages - if delay { - time.Sleep(time.Millisecond * 100) - } - } - }() - - return nil -} - -func (h *httpBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - var err error - var host, port string - options := broker.NewSubscribeOptions(opts...) - - // parse address for host, port - host, port, err = net.SplitHostPort(h.Address()) - if err != nil { - return nil, err - } - - addr, err := maddr.Extract(host) - if err != nil { - return nil, err - } - - var secure bool - - if h.opts.Secure || h.opts.TLSConfig != nil { - secure = true - } - - // register service - node := ®istry.Node{ - Id: topic + "-" + h.id, - Address: mnet.HostPort(addr, port), - Metadata: map[string]string{ - "secure": fmt.Sprintf("%t", secure), - "broker": "http", - "topic": topic, - }, - } - - // check for queue group or broadcast queue - version := options.Queue - if len(version) == 0 { - version = broadcastVersion - } - - service := ®istry.Service{ - Name: serviceName, - Version: version, - Nodes: []*registry.Node{node}, - } - - // generate subscriber - subscriber := &httpSubscriber{ - opts: options, - hb: h, - id: node.Id, - topic: topic, - fn: handler, - svc: service, - } - - // subscribe now - if err := h.subscribe(subscriber); err != nil { - return nil, err - } - - // return the subscriber - return subscriber, nil -} - -func (h *httpBroker) String() string { - return "http" -} - -// NewBroker returns a new http broker -func NewBroker(opts ...broker.Option) broker.Broker { - return newHttpBroker(opts...) -} diff --git a/broker/http/http_test.go b/broker/http/http_test.go deleted file mode 100644 index f3aec647..00000000 --- a/broker/http/http_test.go +++ /dev/null @@ -1,392 +0,0 @@ -package http - -import ( - "sync" - "testing" - "time" - - "github.com/google/uuid" - "github.com/micro/go-micro/broker" - "github.com/micro/go-micro/debug/log/noop" - "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/registry/memory" - "github.com/micro/go-micro/util/log" -) - -var ( - // mock data - testData = map[string][]*registry.Service{ - "foo": { - { - Name: "foo", - Version: "1.0.0", - Nodes: []*registry.Node{ - { - Id: "foo-1.0.0-123", - Address: "localhost:9999", - }, - { - Id: "foo-1.0.0-321", - Address: "localhost:9999", - }, - }, - }, - { - Name: "foo", - Version: "1.0.1", - Nodes: []*registry.Node{ - { - Id: "foo-1.0.1-321", - Address: "localhost:6666", - }, - }, - }, - { - Name: "foo", - Version: "1.0.3", - Nodes: []*registry.Node{ - { - Id: "foo-1.0.3-345", - Address: "localhost:8888", - }, - }, - }, - }, - } -) - -func newTestRegistry() registry.Registry { - return memory.NewRegistry(memory.Services(testData)) -} - -func sub(be *testing.B, c int) { - // set no op logger - log.SetLogger(noop.NewLog()) - - be.StopTimer() - m := newTestRegistry() - - b := NewBroker(broker.Registry(m)) - topic := uuid.New().String() - - if err := b.Init(); err != nil { - be.Fatalf("Unexpected init error: %v", err) - } - - if err := b.Connect(); err != nil { - be.Fatalf("Unexpected connect error: %v", err) - } - - msg := &broker.Message{ - Header: map[string]string{ - "Content-Type": "application/json", - }, - Body: []byte(`{"message": "Hello World"}`), - } - - var subs []broker.Subscriber - done := make(chan bool, c) - - for i := 0; i < c; i++ { - sub, err := b.Subscribe(topic, func(p broker.Event) error { - done <- true - m := p.Message() - - if string(m.Body) != string(msg.Body) { - be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) - } - - return nil - }, broker.Queue("shared")) - if err != nil { - be.Fatalf("Unexpected subscribe error: %v", err) - } - subs = append(subs, sub) - } - - for i := 0; i < be.N; i++ { - be.StartTimer() - if err := b.Publish(topic, msg); err != nil { - be.Fatalf("Unexpected publish error: %v", err) - } - <-done - be.StopTimer() - } - - for _, sub := range subs { - sub.Unsubscribe() - } - - if err := b.Disconnect(); err != nil { - be.Fatalf("Unexpected disconnect error: %v", err) - } -} - -func pub(be *testing.B, c int) { - // set no op logger - log.SetLogger(noop.NewLog()) - - be.StopTimer() - m := newTestRegistry() - b := NewBroker(broker.Registry(m)) - topic := uuid.New().String() - - if err := b.Init(); err != nil { - be.Fatalf("Unexpected init error: %v", err) - } - - if err := b.Connect(); err != nil { - be.Fatalf("Unexpected connect error: %v", err) - } - - msg := &broker.Message{ - Header: map[string]string{ - "Content-Type": "application/json", - }, - Body: []byte(`{"message": "Hello World"}`), - } - - done := make(chan bool, c*4) - - sub, err := b.Subscribe(topic, func(p broker.Event) error { - done <- true - m := p.Message() - if string(m.Body) != string(msg.Body) { - be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) - } - return nil - }, broker.Queue("shared")) - if err != nil { - be.Fatalf("Unexpected subscribe error: %v", err) - } - - var wg sync.WaitGroup - ch := make(chan int, c*4) - be.StartTimer() - - for i := 0; i < c; i++ { - go func() { - for range ch { - if err := b.Publish(topic, msg); err != nil { - be.Fatalf("Unexpected publish error: %v", err) - } - select { - case <-done: - case <-time.After(time.Second): - } - wg.Done() - } - }() - } - - for i := 0; i < be.N; i++ { - wg.Add(1) - ch <- i - } - - wg.Wait() - be.StopTimer() - sub.Unsubscribe() - close(ch) - close(done) - - if err := b.Disconnect(); err != nil { - be.Fatalf("Unexpected disconnect error: %v", err) - } -} - -func TestBroker(t *testing.T) { - m := newTestRegistry() - b := NewBroker(broker.Registry(m)) - - if err := b.Init(); err != nil { - t.Fatalf("Unexpected init error: %v", err) - } - - if err := b.Connect(); err != nil { - t.Fatalf("Unexpected connect error: %v", err) - } - - msg := &broker.Message{ - Header: map[string]string{ - "Content-Type": "application/json", - }, - Body: []byte(`{"message": "Hello World"}`), - } - - done := make(chan bool) - - sub, err := b.Subscribe("test", func(p broker.Event) error { - m := p.Message() - - if string(m.Body) != string(msg.Body) { - t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) - } - - close(done) - return nil - }) - if err != nil { - t.Fatalf("Unexpected subscribe error: %v", err) - } - - if err := b.Publish("test", msg); err != nil { - t.Fatalf("Unexpected publish error: %v", err) - } - - <-done - sub.Unsubscribe() - - if err := b.Disconnect(); err != nil { - t.Fatalf("Unexpected disconnect error: %v", err) - } -} - -func TestConcurrentSubBroker(t *testing.T) { - m := newTestRegistry() - b := NewBroker(broker.Registry(m)) - - if err := b.Init(); err != nil { - t.Fatalf("Unexpected init error: %v", err) - } - - if err := b.Connect(); err != nil { - t.Fatalf("Unexpected connect error: %v", err) - } - - msg := &broker.Message{ - Header: map[string]string{ - "Content-Type": "application/json", - }, - Body: []byte(`{"message": "Hello World"}`), - } - - var subs []broker.Subscriber - var wg sync.WaitGroup - - for i := 0; i < 10; i++ { - sub, err := b.Subscribe("test", func(p broker.Event) error { - defer wg.Done() - - m := p.Message() - - if string(m.Body) != string(msg.Body) { - t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) - } - - return nil - }) - if err != nil { - t.Fatalf("Unexpected subscribe error: %v", err) - } - - wg.Add(1) - subs = append(subs, sub) - } - - if err := b.Publish("test", msg); err != nil { - t.Fatalf("Unexpected publish error: %v", err) - } - - wg.Wait() - - for _, sub := range subs { - sub.Unsubscribe() - } - - if err := b.Disconnect(); err != nil { - t.Fatalf("Unexpected disconnect error: %v", err) - } -} - -func TestConcurrentPubBroker(t *testing.T) { - m := newTestRegistry() - b := NewBroker(broker.Registry(m)) - - if err := b.Init(); err != nil { - t.Fatalf("Unexpected init error: %v", err) - } - - if err := b.Connect(); err != nil { - t.Fatalf("Unexpected connect error: %v", err) - } - - msg := &broker.Message{ - Header: map[string]string{ - "Content-Type": "application/json", - }, - Body: []byte(`{"message": "Hello World"}`), - } - - var wg sync.WaitGroup - - sub, err := b.Subscribe("test", func(p broker.Event) error { - defer wg.Done() - - m := p.Message() - - if string(m.Body) != string(msg.Body) { - t.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body)) - } - - return nil - }) - if err != nil { - t.Fatalf("Unexpected subscribe error: %v", err) - } - - for i := 0; i < 10; i++ { - wg.Add(1) - - if err := b.Publish("test", msg); err != nil { - t.Fatalf("Unexpected publish error: %v", err) - } - } - - wg.Wait() - - sub.Unsubscribe() - - if err := b.Disconnect(); err != nil { - t.Fatalf("Unexpected disconnect error: %v", err) - } -} - -func BenchmarkSub1(b *testing.B) { - sub(b, 1) -} -func BenchmarkSub8(b *testing.B) { - sub(b, 8) -} - -func BenchmarkSub32(b *testing.B) { - sub(b, 32) -} - -func BenchmarkSub64(b *testing.B) { - sub(b, 64) -} - -func BenchmarkSub128(b *testing.B) { - sub(b, 128) -} - -func BenchmarkPub1(b *testing.B) { - pub(b, 1) -} - -func BenchmarkPub8(b *testing.B) { - pub(b, 8) -} - -func BenchmarkPub32(b *testing.B) { - pub(b, 32) -} - -func BenchmarkPub64(b *testing.B) { - pub(b, 64) -} - -func BenchmarkPub128(b *testing.B) { - pub(b, 128) -} diff --git a/broker/http/options.go b/broker/http/options.go deleted file mode 100644 index 03240c42..00000000 --- a/broker/http/options.go +++ /dev/null @@ -1,23 +0,0 @@ -package http - -import ( - "context" - "net/http" - - "github.com/micro/go-micro/broker" -) - -// Handle registers the handler for the given pattern. -func Handle(pattern string, handler http.Handler) broker.Option { - return func(o *broker.Options) { - if o.Context == nil { - o.Context = context.Background() - } - handlers, ok := o.Context.Value("http_handlers").(map[string]http.Handler) - if !ok { - handlers = make(map[string]http.Handler) - } - handlers[pattern] = handler - o.Context = context.WithValue(o.Context, "http_handlers", handlers) - } -}