1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-08-04 21:42:57 +02:00

fix: same for broker

This commit is contained in:
Brian Ketelsen
2025-05-07 16:38:43 -04:00
parent 04bbbf051a
commit c189a81196
4 changed files with 34 additions and 27 deletions

View File

@@ -41,7 +41,7 @@ type Subscriber interface {
var (
// DefaultBroker is the default Broker.
DefaultBroker = NewBroker()
DefaultBroker = NewMemoryBroker()
)
func Init(opts ...Option) error {

View File

@@ -1,5 +1,5 @@
// Package broker provides a http based message broker
package broker
// Package http provides a http based message broker
package http
import (
"bytes"
@@ -16,6 +16,7 @@ import (
"time"
"github.com/google/uuid"
"go-micro.dev/v5/broker"
"go-micro.dev/v5/codec/json"
merr "go-micro.dev/v5/errors"
"go-micro.dev/v5/registry"
@@ -29,7 +30,7 @@ import (
// HTTP Broker is a point to point async broker.
type httpBroker struct {
opts Options
opts broker.Options
r registry.Registry
@@ -51,8 +52,8 @@ type httpBroker struct {
}
type httpSubscriber struct {
opts SubscribeOptions
fn Handler
opts broker.SubscribeOptions
fn broker.Handler
svc *registry.Service
hb *httpBroker
id string
@@ -61,7 +62,7 @@ type httpSubscriber struct {
type httpEvent struct {
err error
m *Message
m *broker.Message
t string
}
@@ -108,8 +109,8 @@ func newTransport(config *tls.Config) *http.Transport {
return t
}
func newHttpBroker(opts ...Option) Broker {
options := *NewOptions(opts...)
func newHttpBroker(opts ...broker.Option) broker.Broker {
options := *broker.NewOptions(opts...)
options.Registry = registry.DefaultRegistry
options.Codec = json.Marshaler{}
@@ -161,7 +162,7 @@ func (h *httpEvent) Error() error {
return h.err
}
func (h *httpEvent) Message() *Message {
func (h *httpEvent) Message() *broker.Message {
return h.m
}
@@ -169,7 +170,7 @@ func (h *httpEvent) Topic() string {
return h.t
}
func (h *httpSubscriber) Options() SubscribeOptions {
func (h *httpSubscriber) Options() broker.SubscribeOptions {
return h.opts
}
@@ -308,7 +309,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
var m *Message
var m *broker.Message
if err = h.opts.Codec.Unmarshal(b, &m); err != nil {
errr := merr.InternalServerError("go.micro.broker", "Error parsing request body: %v", err)
w.WriteHeader(500)
@@ -330,7 +331,7 @@ func (h *httpBroker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
id := req.Form.Get("id")
//nolint:prealloc
var subs []Handler
var subs []broker.Handler
h.RLock()
for _, subscriber := range h.subscribers[topic] {
@@ -458,7 +459,7 @@ func (h *httpBroker) Disconnect() error {
return err
}
func (h *httpBroker) Init(opts ...Option) error {
func (h *httpBroker) Init(opts ...broker.Option) error {
h.RLock()
if h.running {
h.RUnlock()
@@ -505,13 +506,13 @@ func (h *httpBroker) Init(opts ...Option) error {
return nil
}
func (h *httpBroker) Options() Options {
func (h *httpBroker) Options() broker.Options {
return h.opts
}
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error {
func (h *httpBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
// create the message first
m := &Message{
m := &broker.Message{
Header: make(map[string]string),
Body: msg.Body,
}
@@ -637,10 +638,10 @@ func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption)
return nil
}
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) {
func (h *httpBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
var err error
var host, port string
options := NewSubscribeOptions(opts...)
options := broker.NewSubscribeOptions(opts...)
// parse address for host, port
host, port, err = net.SplitHostPort(h.Address())
@@ -706,6 +707,6 @@ func (h *httpBroker) String() string {
}
// NewBroker returns a new http broker.
func NewBroker(opts ...Option) Broker {
func NewBroker(opts ...broker.Option) broker.Broker {
return newHttpBroker(opts...)
}

View File

@@ -1,4 +1,4 @@
package broker_test
package http_test
import (
"sync"
@@ -7,6 +7,7 @@ import (
"github.com/google/uuid"
"go-micro.dev/v5/broker"
"go-micro.dev/v5/broker/http"
"go-micro.dev/v5/registry"
)
@@ -60,7 +61,7 @@ func sub(b *testing.B, c int) {
b.StopTimer()
m := newTestRegistry()
brker := broker.NewBroker(broker.Registry(m))
brker := http.NewBroker(broker.Registry(m))
topic := uuid.New().String()
if err := brker.Init(); err != nil {
@@ -121,7 +122,7 @@ func sub(b *testing.B, c int) {
func pub(b *testing.B, c int) {
b.StopTimer()
m := newTestRegistry()
brk := broker.NewBroker(broker.Registry(m))
brk := http.NewBroker(broker.Registry(m))
topic := uuid.New().String()
if err := brk.Init(); err != nil {
@@ -190,7 +191,7 @@ func pub(b *testing.B, c int) {
func TestBroker(t *testing.T) {
m := newTestRegistry()
b := broker.NewBroker(broker.Registry(m))
b := http.NewBroker(broker.Registry(m))
if err := b.Init(); err != nil {
t.Fatalf("Unexpected init error: %v", err)
@@ -239,7 +240,7 @@ func TestBroker(t *testing.T) {
func TestConcurrentSubBroker(t *testing.T) {
m := newTestRegistry()
b := broker.NewBroker(broker.Registry(m))
b := http.NewBroker(broker.Registry(m))
if err := b.Init(); err != nil {
t.Fatalf("Unexpected init error: %v", err)
@@ -298,7 +299,7 @@ func TestConcurrentSubBroker(t *testing.T) {
func TestConcurrentPubBroker(t *testing.T) {
m := newTestRegistry()
b := broker.NewBroker(broker.Registry(m))
b := http.NewBroker(broker.Registry(m))
if err := b.Init(); err != nil {
t.Fatalf("Unexpected init error: %v", err)

View File

@@ -9,6 +9,8 @@ import (
"github.com/urfave/cli/v2"
"go-micro.dev/v5/auth"
hbroker "go-micro.dev/v5/broker/http"
"go-micro.dev/v5/broker"
"go-micro.dev/v5/cache"
"go-micro.dev/v5/client"
@@ -230,7 +232,10 @@ var (
},
}
DefaultBrokers = map[string]func(...broker.Option) broker.Broker{}
DefaultBrokers = map[string]func(...broker.Option) broker.Broker{
"memory": broker.NewMemoryBroker,
"http": hbroker.NewBroker,
}
DefaultClients = map[string]func(...client.Option) client.Client{}