From 7e1bba2baf324c08ea93a9e353bec74b6d311d78 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 18 Jun 2025 17:12:02 +0100 Subject: [PATCH] Re-add events package (#2761) * Re-add events package * run redis as a dep * remove redis events * fix: data race on event subscriber * fix: data race in tests * fix: store errors * fix: lint issues * feat: default stream * Update file.go --------- Co-authored-by: Brian Ketelsen --- cmd/cmd.go | 29 +++- cmd/options.go | 17 +++ events/events.go | 92 ++++++++++++ events/memory.go | 246 +++++++++++++++++++++++++++++++ events/natsjs/README.md | 48 ++++++ events/natsjs/helpers_test.go | 94 ++++++++++++ events/natsjs/nats.go | 270 ++++++++++++++++++++++++++++++++++ events/natsjs/nats_test.go | 112 ++++++++++++++ events/natsjs/options.go | 116 +++++++++++++++ events/options.go | 144 ++++++++++++++++++ events/store.go | 127 ++++++++++++++++ events/store_test.go | 47 ++++++ events/stream_test.go | 241 ++++++++++++++++++++++++++++++ go.mod | 12 +- go.sum | 24 +-- profile/profile.go | 25 +++- store/file.go | 9 +- 17 files changed, 1621 insertions(+), 32 deletions(-) create mode 100644 events/events.go create mode 100644 events/memory.go create mode 100644 events/natsjs/README.md create mode 100644 events/natsjs/helpers_test.go create mode 100644 events/natsjs/nats.go create mode 100644 events/natsjs/nats_test.go create mode 100644 events/natsjs/options.go create mode 100644 events/options.go create mode 100644 events/store.go create mode 100644 events/store_test.go create mode 100644 events/stream_test.go diff --git a/cmd/cmd.go b/cmd/cmd.go index 185d509b..e10333d9 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -23,6 +23,7 @@ import ( "go-micro.dev/v5/debug/profile/http" "go-micro.dev/v5/debug/profile/pprof" "go-micro.dev/v5/debug/trace" + "go-micro.dev/v5/events" "go-micro.dev/v5/logger" mprofile "go-micro.dev/v5/profile" "go-micro.dev/v5/registry" @@ -293,6 +294,7 @@ var ( DefaultCaches = map[string]func(...cache.Option) cache.Cache{ "redis": redis.NewRedisCache, } + DefaultStreams = map[string]func(...events.Option) (events.Stream, error){} ) func init() { @@ -313,6 +315,7 @@ func newCmd(opts ...Option) Cmd { DebugProfile: &profile.DefaultProfile, Config: &config.DefaultConfig, Cache: &cache.DefaultCache, + Stream: &events.DefaultStream, Brokers: DefaultBrokers, Clients: DefaultClients, @@ -376,7 +379,10 @@ func (c *cmd) Before(ctx *cli.Context) error { if profileName != "" { switch profileName { case "local": - imported := mprofile.LocalProfile() + imported, ierr := mprofile.LocalProfile() + if ierr != nil { + return fmt.Errorf("failed to load local profile: %v", ierr) + } *c.opts.Registry = imported.Registry registry.DefaultRegistry = imported.Registry *c.opts.Broker = imported.Broker @@ -386,7 +392,10 @@ func (c *cmd) Before(ctx *cli.Context) error { *c.opts.Transport = imported.Transport transport.DefaultTransport = imported.Transport case "nats": - imported := mprofile.NatsProfile() + imported, ierr := mprofile.NatsProfile() + if ierr != nil { + return fmt.Errorf("failed to load nats profile: %v", ierr) + } // Set the registry sopts, clopts := c.setRegistry(imported.Registry) serverOpts = append(serverOpts, sopts...) @@ -407,6 +416,11 @@ func (c *cmd) Before(ctx *cli.Context) error { serverOpts = append(serverOpts, sopts...) clientOpts = append(clientOpts, clopts...) + // Set the stream + sopts, clopts = c.setStream(imported.Stream) + serverOpts = append(serverOpts, sopts...) + clientOpts = append(clientOpts, clopts...) + // Add more profiles as needed default: return fmt.Errorf("unsupported profile: %s", profileName) @@ -701,6 +715,17 @@ func (c *cmd) setRegistry(r registry.Registry) ([]server.Option, []client.Option registry.DefaultRegistry = *c.opts.Registry return serverOpts, clientOpts } +func (c *cmd) setStream(s events.Stream) ([]server.Option, []client.Option) { + var serverOpts []server.Option + var clientOpts []client.Option + *c.opts.Stream = s + // TODO: do server and client need a Stream? + // serverOpts = append(serverOpts, server.Registry(*c.opts.Registry)) + // clientOpts = append(clientOpts, client.Registry(*c.opts.Registry)) + + events.DefaultStream = *c.opts.Stream + return serverOpts, clientOpts +} func (c *cmd) setBroker(b broker.Broker) ([]server.Option, []client.Option) { var serverOpts []server.Option diff --git a/cmd/options.go b/cmd/options.go index 221ed434..28949a07 100644 --- a/cmd/options.go +++ b/cmd/options.go @@ -10,6 +10,7 @@ import ( "go-micro.dev/v5/config" "go-micro.dev/v5/debug/profile" "go-micro.dev/v5/debug/trace" + "go-micro.dev/v5/events" "go-micro.dev/v5/registry" "go-micro.dev/v5/selector" "go-micro.dev/v5/server" @@ -42,6 +43,7 @@ type Options struct { Broker *broker.Broker Auths map[string]func(...auth.Option) auth.Auth Store *store.Store + Stream *events.Stream Configs map[string]func(...config.Option) (config.Config, error) Clients map[string]func(...client.Option) client.Client Registries map[string]func(...registry.Option) registry.Registry @@ -49,6 +51,7 @@ type Options struct { Servers map[string]func(...server.Option) server.Server Transports map[string]func(...transport.Option) transport.Transport Stores map[string]func(...store.Option) store.Store + Streams map[string]func(...events.Option) events.Stream Tracers map[string]func(...trace.Option) trace.Tracer Version string @@ -141,6 +144,13 @@ func Store(s *store.Store) Option { } } +func Stream(s *events.Stream) Option { + return func(o *Options) { + o.Stream = s + events.DefaultStream = *s + } +} + func Tracer(t *trace.Tracer) Option { return func(o *Options) { o.Tracer = t @@ -169,6 +179,13 @@ func NewBroker(name string, b func(...broker.Option) broker.Broker) Option { } } +// New stream func. +func NewStream(name string, b func(...events.Option) events.Stream) Option { + return func(o *Options) { + o.Streams[name] = b + } +} + // New cache func. func NewCache(name string, c func(...cache.Option) cache.Cache) Option { return func(o *Options) { diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..2be98c64 --- /dev/null +++ b/events/events.go @@ -0,0 +1,92 @@ +// Package events is for event streaming and storage +package events + +import ( + "encoding/json" + "errors" + "time" +) + +var ( + // DefaultStream is the default events stream implementation + DefaultStream Stream + // DefaultStore is the default events store implementation + DefaultStore Store +) + +var ( + // ErrMissingTopic is returned if a blank topic was provided to publish + ErrMissingTopic = errors.New("missing topic") + // ErrEncodingMessage is returned from publish if there was an error encoding the message option + ErrEncodingMessage = errors.New("error encoding message") +) + +// Stream is an event streaming interface +type Stream interface { + Publish(topic string, msg interface{}, opts ...PublishOption) error + Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) +} + +// Store is an event store interface +type Store interface { + Read(topic string, opts ...ReadOption) ([]*Event, error) + Write(event *Event, opts ...WriteOption) error +} + +type AckFunc func() error +type NackFunc func() error + +// Event is the object returned by the broker when you subscribe to a topic +type Event struct { + // ID to uniquely identify the event + ID string + // Topic of event, e.g. "registry.service.created" + Topic string + // Timestamp of the event + Timestamp time.Time + // Metadata contains the values the event was indexed by + Metadata map[string]string + // Payload contains the encoded message + Payload []byte + + ackFunc AckFunc + nackFunc NackFunc +} + +// Unmarshal the events message into an object +func (e *Event) Unmarshal(v interface{}) error { + return json.Unmarshal(e.Payload, v) +} + +// Ack acknowledges successful processing of the event in ManualAck mode +func (e *Event) Ack() error { + return e.ackFunc() +} + +func (e *Event) SetAckFunc(f AckFunc) { + e.ackFunc = f +} + +// Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode +func (e *Event) Nack() error { + return e.nackFunc() +} + +func (e *Event) SetNackFunc(f NackFunc) { + e.nackFunc = f +} + +// Publish an event to a topic +func Publish(topic string, msg interface{}, opts ...PublishOption) error { + return DefaultStream.Publish(topic, msg, opts...) +} + +// Consume to events +func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) { + return DefaultStream.Consume(topic, opts...) +} + +// Read events for a topic +func Read(topic string, opts ...ReadOption) ([]*Event, error) { + return DefaultStore.Read(topic, opts...) +} diff --git a/events/memory.go b/events/memory.go new file mode 100644 index 00000000..6456fe30 --- /dev/null +++ b/events/memory.go @@ -0,0 +1,246 @@ +package events + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" + "go-micro.dev/v5/logger" + "go-micro.dev/v5/store" +) + +// NewStream returns an initialized memory stream +func NewStream(opts ...Option) (Stream, error) { + // parse the options + var options Options + for _, o := range opts { + o(&options) + } + return &mem{store: store.NewMemoryStore()}, nil +} + +type subscriber struct { + Group string + Topic string + Channel chan Event + + sync.RWMutex + retryMap map[string]int + retryLimit int + autoAck bool + ackWait time.Duration +} + +type mem struct { + store store.Store + + subs []*subscriber + sync.RWMutex +} + +func (m *mem) Publish(topic string, msg interface{}, opts ...PublishOption) error { + // validate the topic + if len(topic) == 0 { + return ErrMissingTopic + } + + // parse the options + options := PublishOptions{ + Timestamp: time.Now(), + } + for _, o := range opts { + o(&options) + } + + // encode the message if it's not already encoded + var payload []byte + if p, ok := msg.([]byte); ok { + payload = p + } else { + p, err := json.Marshal(msg) + if err != nil { + return ErrEncodingMessage + } + payload = p + } + + // construct the event + event := &Event{ + ID: uuid.New().String(), + Topic: topic, + Timestamp: options.Timestamp, + Metadata: options.Metadata, + Payload: payload, + } + + // serialize the event to bytes + bytes, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "Error encoding event") + } + + // write to the store + key := fmt.Sprintf("%v/%v", event.Topic, event.ID) + if err := m.store.Write(&store.Record{Key: key, Value: bytes}); err != nil { + return errors.Wrap(err, "Error writing event to store") + } + + // send to the subscribers async + go m.handleEvent(event) + + return nil +} + +func (m *mem) Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) { + // validate the topic + if len(topic) == 0 { + return nil, ErrMissingTopic + } + + // parse the options + options := ConsumeOptions{ + Group: uuid.New().String(), + AutoAck: true, + } + for _, o := range opts { + o(&options) + } + // TODO RetryLimit + + // setup the subscriber + sub := &subscriber{ + Channel: make(chan Event), + Topic: topic, + Group: options.Group, + retryMap: map[string]int{}, + autoAck: true, + retryLimit: options.GetRetryLimit(), + } + + if !options.AutoAck { + if options.AckWait == 0 { + return nil, fmt.Errorf("invalid AckWait passed, should be positive integer") + } + sub.autoAck = options.AutoAck + sub.ackWait = options.AckWait + } + + // register the subscriber + m.Lock() + m.subs = append(m.subs, sub) + m.Unlock() + + // lookup previous events if the start time option was passed + if options.Offset.Unix() > 0 { + go m.lookupPreviousEvents(sub, options.Offset) + } + + // return the channel + return sub.Channel, nil +} + +// lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends +// them into the subscribers channel +func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { + // lookup all events which match the topic (a blank topic will return all results) + recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix()) + if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error looking up previous events: %v", err) + return + } else if err != nil { + return + } + + // loop through the records and send it to the channel if it matches + for _, r := range recs { + var ev Event + if err := json.Unmarshal(r.Value, &ev); err != nil { + continue + } + if ev.Timestamp.Unix() < startTime.Unix() { + continue + } + sendEvent(&ev, sub) + } +} + +// handleEvents sends the event to any registered subscribers. +func (m *mem) handleEvent(ev *Event) { + m.RLock() + subs := m.subs + m.RUnlock() + + // filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message + // being sent to two subscribers with the same queue. + filteredSubs := map[string]*subscriber{} + + // filter down to subscribers who are interested in this topic + for _, sub := range subs { + if len(sub.Topic) == 0 || sub.Topic == ev.Topic { + filteredSubs[sub.Group] = sub + } + } + + // send the message to each channel async (since one channel might be blocked) + for _, sub := range filteredSubs { + sendEvent(ev, sub) + } +} + +func sendEvent(ev *Event, sub *subscriber) { + go func(s *subscriber) { + evCopy := *ev + if s.autoAck { + s.Channel <- evCopy + return + } + evCopy.SetAckFunc(ackFunc(s, evCopy)) + evCopy.SetNackFunc(nackFunc(s, evCopy)) + s.Lock() + s.retryMap[evCopy.ID] = 0 + s.Unlock() + tick := time.NewTicker(s.ackWait) + defer tick.Stop() + for range tick.C { + s.Lock() + count, ok := s.retryMap[evCopy.ID] + s.Unlock() + if !ok { + // success + break + } + + if s.retryLimit > -1 && count > s.retryLimit { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit) + } + s.Lock() + delete(s.retryMap, evCopy.ID) + s.Unlock() + return + } + s.Channel <- evCopy + s.Lock() + s.retryMap[evCopy.ID] = count + 1 + s.Unlock() + } + }(sub) +} + +func ackFunc(s *subscriber, evCopy Event) func() error { + return func() error { + s.Lock() + delete(s.retryMap, evCopy.ID) + s.Unlock() + return nil + } +} + +func nackFunc(_ *subscriber, _ Event) func() error { + return func() error { + return nil + } +} diff --git a/events/natsjs/README.md b/events/natsjs/README.md new file mode 100644 index 00000000..d340fd4c --- /dev/null +++ b/events/natsjs/README.md @@ -0,0 +1,48 @@ +# NATS JetStream + +This plugin uses NATS with JetStream to send and receive events. + +## Create a stream + +```go +ev, err := natsjs.NewStream( + natsjs.Address("nats://10.0.1.46:4222"), + natsjs.MaxAge(24*160*time.Minute), +) +``` + +## Consume a stream + +```go +ee, err := events.Consume("test", + events.WithAutoAck(false, time.Second*30), + events.WithGroup("testgroup"), +) +if err != nil { + panic(err) +} +go func() { + for { + msg := <-ee + // Process the message + logger.Info("Received message:", string(msg.Payload)) + err := msg.Ack() + if err != nil { + logger.Error("Error acknowledging message:", err) + } else { + logger.Info("Message acknowledged") + } + } +}() + +``` + +## Publish an Event to the stream + +```go +err = ev.Publish("test", []byte("hello world")) +if err != nil { + panic(err) +} +``` + diff --git a/events/natsjs/helpers_test.go b/events/natsjs/helpers_test.go new file mode 100644 index 00000000..87fe9397 --- /dev/null +++ b/events/natsjs/helpers_test.go @@ -0,0 +1,94 @@ +package natsjs_test + +import ( + "context" + "fmt" + "net" + "path/filepath" + "testing" + "time" + + nserver "github.com/nats-io/nats-server/v2/server" + "github.com/test-go/testify/require" +) + +func getFreeLocalhostAddress() string { + l, _ := net.Listen("tcp", "127.0.0.1:0") + defer l.Close() + return l.Addr().String() +} + +func natsServer(ctx context.Context, t *testing.T, opts *nserver.Options) { + t.Helper() + + server, err := nserver.NewServer( + opts, + ) + require.NoError(t, err) + if err != nil { + return + } + + server.SetLoggerV2( + NewLogWrapper(), + true, true, false, + ) + + // first start NATS + go server.Start() + ready := server.ReadyForConnections(time.Second * 10) + if !ready { + t.Fatalf("NATS server not ready") + } + jsConf := &nserver.JetStreamConfig{ + StoreDir: filepath.Join(t.TempDir(), "nats-js"), + } + + // second start JetStream + err = server.EnableJetStream(jsConf) + require.NoError(t, err) + if err != nil { + return + } + + <-ctx.Done() + + server.Shutdown() +} + +func NewLogWrapper() *LogWrapper { + return &LogWrapper{} +} + +type LogWrapper struct { +} + +// Noticef logs a notice statement. +func (l *LogWrapper) Noticef(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} + +// Warnf logs a warning statement. +func (l *LogWrapper) Warnf(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} + +// Fatalf logs a fatal statement. +func (l *LogWrapper) Fatalf(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} + +// Errorf logs an error statement. +func (l *LogWrapper) Errorf(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} + +// Debugf logs a debug statement. +func (l *LogWrapper) Debugf(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} + +// Tracef logs a trace statement. +func (l *LogWrapper) Tracef(format string, v ...interface{}) { + fmt.Printf(format+"\n", v...) +} diff --git a/events/natsjs/nats.go b/events/natsjs/nats.go new file mode 100644 index 00000000..c8234886 --- /dev/null +++ b/events/natsjs/nats.go @@ -0,0 +1,270 @@ +// Package natsjs provides a NATS Jetstream implementation of the events.Stream interface. +package natsjs + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/google/uuid" + nats "github.com/nats-io/nats.go" + "github.com/pkg/errors" + + "go-micro.dev/v5/events" + "go-micro.dev/v5/logger" +) + +const ( + defaultClusterID = "micro" +) + +// NewStream returns an initialized nats stream or an error if the connection to the nats +// server could not be established. +func NewStream(opts ...Option) (events.Stream, error) { + // parse the options + options := Options{ + ClientID: uuid.New().String(), + ClusterID: defaultClusterID, + Logger: logger.DefaultLogger, + } + for _, o := range opts { + o(&options) + } + + s := &stream{opts: options} + + natsJetStreamCtx, err := connectToNatsJetStream(options) + if err != nil { + return nil, fmt.Errorf("error connecting to nats cluster %v: %w", options.ClusterID, err) + } + + s.natsJetStreamCtx = natsJetStreamCtx + + return s, nil +} + +type stream struct { + opts Options + natsJetStreamCtx nats.JetStreamContext +} + +func connectToNatsJetStream(options Options) (nats.JetStreamContext, error) { + nopts := nats.GetDefaultOptions() + if options.TLSConfig != nil { + nopts.Secure = true + nopts.TLSConfig = options.TLSConfig + } + + if options.NkeyConfig != "" { + nopts.Nkey = options.NkeyConfig + } + + if len(options.Address) > 0 { + nopts.Servers = strings.Split(options.Address, ",") + } + + if options.Name != "" { + nopts.Name = options.Name + } + + if options.Username != "" && options.Password != "" { + nopts.User = options.Username + nopts.Password = options.Password + } + + conn, err := nopts.Connect() + if err != nil { + tls := nopts.TLSConfig != nil + return nil, fmt.Errorf("error connecting to nats at %v with tls enabled (%v): %w", options.Address, tls, err) + } + + js, err := conn.JetStream() + if err != nil { + return nil, fmt.Errorf("error while obtaining JetStream context: %w", err) + } + + return js, nil +} + +// Publish a message to a topic. +func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error { + // validate the topic + if len(topic) == 0 { + return events.ErrMissingTopic + } + + // parse the options + options := events.PublishOptions{ + Timestamp: time.Now(), + } + for _, o := range opts { + o(&options) + } + + // encode the message if it's not already encoded + var payload []byte + if p, ok := msg.([]byte); ok { + payload = p + } else { + p, err := json.Marshal(msg) + if err != nil { + return events.ErrEncodingMessage + } + payload = p + } + + // construct the event + event := &events.Event{ + ID: uuid.New().String(), + Topic: topic, + Timestamp: options.Timestamp, + Metadata: options.Metadata, + Payload: payload, + } + + // serialize the event to bytes + bytes, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "Error encoding event") + } + + // publish the event to the topic's channel + // publish synchronously if configured + if s.opts.SyncPublish { + _, err := s.natsJetStreamCtx.Publish(event.Topic, bytes) + if err != nil { + err = errors.Wrap(err, "Error publishing message to topic") + } + + return err + } + + // publish asynchronously by default + if _, err := s.natsJetStreamCtx.PublishAsync(event.Topic, bytes); err != nil { + return errors.Wrap(err, "Error publishing message to topic") + } + + return nil +} + +// Consume from a topic. +func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan events.Event, error) { + // validate the topic + if len(topic) == 0 { + return nil, events.ErrMissingTopic + } + + log := s.opts.Logger + + // parse the options + options := events.ConsumeOptions{ + Group: uuid.New().String(), + } + for _, o := range opts { + o(&options) + } + + // setup the subscriber + channel := make(chan events.Event) + handleMsg := func(msg *nats.Msg) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // decode the message + var evt events.Event + if err := json.Unmarshal(msg.Data, &evt); err != nil { + log.Logf(logger.ErrorLevel, "Error decoding message: %v", err) + // not acknowledging the message is the way to indicate an error occurred + return + } + if options.AutoAck { + // set up the ack funcs + evt.SetAckFunc(func() error { + return msg.Ack() + }) + + evt.SetNackFunc(func() error { + return msg.Nak() + }) + } else { + // set up the ack funcs + evt.SetAckFunc(func() error { + return nil + }) + evt.SetNackFunc(func() error { + return nil + }) + } + + // push onto the channel and wait for the consumer to take the event off before we acknowledge it. + channel <- evt + + if !options.AutoAck { + return + } + + if err := msg.Ack(nats.Context(ctx)); err != nil { + + log.Logf(logger.ErrorLevel, "Error acknowledging message: %v", err) + } + } + + // ensure that a stream exists for that topic + _, err := s.natsJetStreamCtx.StreamInfo(topic) + if err != nil { + cfg := &nats.StreamConfig{ + Name: topic, + } + if s.opts.RetentionPolicy != 0 { + cfg.Retention = nats.RetentionPolicy(s.opts.RetentionPolicy) + } + if s.opts.MaxAge > 0 { + cfg.MaxAge = s.opts.MaxAge + } + + _, err = s.natsJetStreamCtx.AddStream(cfg) + if err != nil { + return nil, errors.Wrap(err, "Stream did not exist and adding a stream failed") + } + } + + // setup the options + subOpts := []nats.SubOpt{} + + if options.CustomRetries { + subOpts = append(subOpts, nats.MaxDeliver(options.GetRetryLimit())) + } + + if options.AutoAck { + subOpts = append(subOpts, nats.AckAll()) + } else { + subOpts = append(subOpts, nats.AckExplicit()) + } + + if !options.Offset.IsZero() { + subOpts = append(subOpts, nats.StartTime(options.Offset)) + } else { + subOpts = append(subOpts, nats.DeliverNew()) + } + + if options.AckWait > 0 { + subOpts = append(subOpts, nats.AckWait(options.AckWait)) + } + + // connect the subscriber via a queue group only if durable streams are enabled + if !s.opts.DisableDurableStreams { + subOpts = append(subOpts, nats.Durable(options.Group)) + _, err = s.natsJetStreamCtx.QueueSubscribe(topic, options.Group, handleMsg, subOpts...) + } else { + subOpts = append(subOpts, nats.ConsumerName(options.Group)) + _, err = s.natsJetStreamCtx.Subscribe(topic, handleMsg, subOpts...) + } + + if err != nil { + return nil, errors.Wrap(err, "Error subscribing to topic") + } + + return channel, nil +} diff --git a/events/natsjs/nats_test.go b/events/natsjs/nats_test.go new file mode 100644 index 00000000..cff42adc --- /dev/null +++ b/events/natsjs/nats_test.go @@ -0,0 +1,112 @@ +package natsjs_test + +import ( + "context" + "encoding/json" + "strconv" + "strings" + "testing" + "time" + + "go-micro.dev/v5/events/natsjs" + nserver "github.com/nats-io/nats-server/v2/server" + "github.com/stretchr/testify/assert" + "github.com/test-go/testify/require" + "go-micro.dev/v5/events" +) + +type Payload struct { + ID string `json:"id"` + Name string `json:"name"` +} + +func TestSingleEvent(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + // variables + demoPayload := Payload{ + ID: "123", + Name: "Hello World", + } + topic := "foobar" + + clusterName := "test-cluster" + + natsAddr := getFreeLocalhostAddress() + natsPort, _ := strconv.Atoi(strings.Split(natsAddr, ":")[1]) + + // start the NATS with JetStream server + go natsServer(ctx, + t, + &nserver.Options{ + Host: strings.Split(natsAddr, ":")[0], + Port: natsPort, + Cluster: nserver.ClusterOpts{ + Name: clusterName, + }, + }, + ) + + time.Sleep(1 * time.Second) + + // consumer + consumerClient, err := natsjs.NewStream( + natsjs.Address(natsAddr), + natsjs.ClusterID(clusterName), + ) + require.NoError(t, err) + if err != nil { + return + } + + consumer := func(_ context.Context, t *testing.T, client events.Stream, cancel context.CancelFunc) { + t.Helper() + defer cancel() + + foobarEvents, err := client.Consume(topic) + require.Nil(t, err) + if err != nil { + return + } + + // wait for the event + event := <-foobarEvents + + p := Payload{} + err = json.Unmarshal(event.Payload, &p) + + require.NoError(t, err) + if err != nil { + return + } + + assert.Equal(t, demoPayload.ID, p.ID) + assert.Equal(t, demoPayload.Name, p.Name) + } + + go consumer(ctx, t, consumerClient, cancel) + + // publisher + time.Sleep(1 * time.Second) + + publisherClient, err := natsjs.NewStream( + natsjs.Address(natsAddr), + natsjs.ClusterID(clusterName), + ) + require.NoError(t, err) + if err != nil { + return + } + + publisher := func(_ context.Context, t *testing.T, client events.Stream) { + t.Helper() + err := client.Publish(topic, demoPayload) + require.NoError(t, err) + } + + go publisher(ctx, t, publisherClient) + + // wait until consumer received the event + <-ctx.Done() +} diff --git a/events/natsjs/options.go b/events/natsjs/options.go new file mode 100644 index 00000000..04867368 --- /dev/null +++ b/events/natsjs/options.go @@ -0,0 +1,116 @@ +package natsjs + +import ( + "crypto/tls" + "time" + + "go-micro.dev/v5/logger" +) + +// Options which are used to configure the nats stream. +type Options struct { + ClusterID string + ClientID string + Address string + NkeyConfig string + TLSConfig *tls.Config + Logger logger.Logger + SyncPublish bool + Name string + DisableDurableStreams bool + Username string + Password string + RetentionPolicy int + MaxAge time.Duration + MaxMsgSize int +} + +// Option is a function which configures options. +type Option func(o *Options) + +// ClusterID sets the cluster id for the nats connection. +func ClusterID(id string) Option { + return func(o *Options) { + o.ClusterID = id + } +} + +// ClientID sets the client id for the nats connection. +func ClientID(id string) Option { + return func(o *Options) { + o.ClientID = id + } +} + +// Address of the nats cluster. +func Address(addr string) Option { + return func(o *Options) { + o.Address = addr + } +} + +// TLSConfig to use when connecting to the cluster. +func TLSConfig(t *tls.Config) Option { + return func(o *Options) { + o.TLSConfig = t + } +} + +// NkeyConfig string to use when connecting to the cluster. +func NkeyConfig(nkey string) Option { + return func(o *Options) { + o.NkeyConfig = nkey + } +} + +// Logger sets the underlying logger. +func Logger(log logger.Logger) Option { + return func(o *Options) { + o.Logger = log + } +} + +// SynchronousPublish allows using a synchronous publishing instead of the default asynchronous. +func SynchronousPublish(sync bool) Option { + return func(o *Options) { + o.SyncPublish = sync + } +} + +// Name allows to add a name to the natsjs connection. +func Name(name string) Option { + return func(o *Options) { + o.Name = name + } +} + +// DisableDurableStreams will disable durable streams. +func DisableDurableStreams() Option { + return func(o *Options) { + o.DisableDurableStreams = true + } +} + +// Authenticate authenticates the connection with the given username and password. +func Authenticate(username, password string) Option { + return func(o *Options) { + o.Username = username + o.Password = password + } +} +func RetentionPolicy(rp int) Option { + return func(o *Options) { + o.RetentionPolicy = rp + } +} + +func MaxMsgSize(size int) Option { + return func(o *Options) { + o.MaxMsgSize = size + } +} +func MaxAge(age time.Duration) Option { + return func(o *Options) { + o.MaxAge = age + } +} diff --git a/events/options.go b/events/options.go new file mode 100644 index 00000000..4a6fbf51 --- /dev/null +++ b/events/options.go @@ -0,0 +1,144 @@ +package events + +import "time" + +type Options struct{} + +type Option func(o *Options) + +type StoreOptions struct { + TTL time.Duration + Backup Backup +} + +type StoreOption func(o *StoreOptions) + +// PublishOptions contains all the options which can be provided when publishing an event +type PublishOptions struct { + // Metadata contains any keys which can be used to query the data, for example a customer id + Metadata map[string]string + // Timestamp to set for the event, if the timestamp is a zero value, the current time will be used + Timestamp time.Time +} + +// PublishOption sets attributes on PublishOptions +type PublishOption func(o *PublishOptions) + +// WithMetadata sets the Metadata field on PublishOptions +func WithMetadata(md map[string]string) PublishOption { + return func(o *PublishOptions) { + o.Metadata = md + } +} + +// WithTimestamp sets the timestamp field on PublishOptions +func WithTimestamp(t time.Time) PublishOption { + return func(o *PublishOptions) { + o.Timestamp = t + } +} + +// ConsumeOptions contains all the options which can be provided when subscribing to a topic +type ConsumeOptions struct { + // Group is the name of the consumer group, if two consumers have the same group the events + // are distributed between them + Group string + // Offset is the time from which the messages should be consumed from. If not provided then + // the messages will be consumed starting from the moment the Subscription starts. + Offset time.Time + // AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered. + // If false specifies that each message need ts to be manually acknowledged by the subscriber. + // If processing is successful the message should be ack'ed to remove the message from the stream. + // If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will + // remain on the stream to be processed again. + AutoAck bool + AckWait time.Duration + // RetryLimit indicates number of times a message is retried + RetryLimit int + // CustomRetries indicates whether to use RetryLimit + CustomRetries bool +} + +// ConsumeOption sets attributes on ConsumeOptions +type ConsumeOption func(o *ConsumeOptions) + +// WithGroup sets the consumer group to be part of when consuming events +func WithGroup(q string) ConsumeOption { + return func(o *ConsumeOptions) { + o.Group = q + } +} + +// WithOffset sets the offset time at which to start consuming events +func WithOffset(t time.Time) ConsumeOption { + return func(o *ConsumeOptions) { + o.Offset = t + } +} + +// WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received +// the message is requeued in case auto ack is turned off +func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption { + return func(o *ConsumeOptions) { + o.AutoAck = ack + o.AckWait = ackWait + } +} + +// WithRetryLimit sets the RetryLimit field on ConsumeOptions. +// Set to -1 for infinite retries (default) +func WithRetryLimit(retries int) ConsumeOption { + return func(o *ConsumeOptions) { + o.RetryLimit = retries + o.CustomRetries = true + } +} + +func (s ConsumeOptions) GetRetryLimit() int { + if !s.CustomRetries { + return -1 + } + return s.RetryLimit +} + +// WriteOptions contains all the options which can be provided when writing an event to a store +type WriteOptions struct { + // TTL is the duration the event should be recorded for, a zero value TTL indicates the event should + // be stored indefinately + TTL time.Duration +} + +// WriteOption sets attributes on WriteOptions +type WriteOption func(o *WriteOptions) + +// WithTTL sets the TTL attribute on WriteOptions +func WithTTL(d time.Duration) WriteOption { + return func(o *WriteOptions) { + o.TTL = d + } +} + +// ReadOptions contains all the options which can be provided when reading events from a store +type ReadOptions struct { + // Limit the number of results to return + Limit uint + // Offset the results by this number, useful for paginated queries + Offset uint +} + +// ReadOption sets attributes on ReadOptions +type ReadOption func(o *ReadOptions) + +// ReadLimit sets the limit attribute on ReadOptions +func ReadLimit(l uint) ReadOption { + return func(o *ReadOptions) { + o.Limit = 1 + } +} + +// ReadOffset sets the offset attribute on ReadOptions +func ReadOffset(l uint) ReadOption { + return func(o *ReadOptions) { + o.Offset = 1 + } +} diff --git a/events/store.go b/events/store.go new file mode 100644 index 00000000..b820bb99 --- /dev/null +++ b/events/store.go @@ -0,0 +1,127 @@ +package events + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" + "go-micro.dev/v5/logger" + "go-micro.dev/v5/store" +) + +const joinKey = "/" + +// NewStore returns an initialized events store +func NewStore(opts ...StoreOption) Store { + // parse the options + var options StoreOptions + for _, o := range opts { + o(&options) + } + if options.TTL.Seconds() == 0 { + options.TTL = time.Hour * 24 + } + + // return the store + evs := &evStore{ + opts: options, + store: store.NewMemoryStore(), + } + if options.Backup != nil { + go evs.backupLoop() + } + return evs +} + +type evStore struct { + opts StoreOptions + store store.Store +} + +// Read events for a topic +func (s *evStore) Read(topic string, opts ...ReadOption) ([]*Event, error) { + // validate the topic + if len(topic) == 0 { + return nil, ErrMissingTopic + } + + // parse the options + options := ReadOptions{ + Offset: 0, + Limit: 250, + } + for _, o := range opts { + o(&options) + } + + // execute the request + recs, err := s.store.Read(topic+joinKey, + store.ReadPrefix(), + store.ReadLimit(options.Limit), + store.ReadOffset(options.Offset), + ) + if err != nil { + return nil, errors.Wrap(err, "Error reading from store") + } + + // unmarshal the result + result := make([]*Event, len(recs)) + for i, r := range recs { + var e Event + if err := json.Unmarshal(r.Value, &e); err != nil { + return nil, errors.Wrap(err, "Invalid event returned from stroe") + } + result[i] = &e + } + + return result, nil +} + +// Write an event to the store +func (s *evStore) Write(event *Event, opts ...WriteOption) error { + // parse the options + options := WriteOptions{ + TTL: s.opts.TTL, + } + for _, o := range opts { + o(&options) + } + + // construct the store record + bytes, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "Error mashaling event to JSON") + } + // suffix event ID with hour resolution for easy retrieval in batches + timeSuffix := time.Now().Format("2006010215") + + record := &store.Record{ + // key is such that reading by prefix indexes by topic and reading by suffix indexes by time + Key: event.Topic + joinKey + event.ID + joinKey + timeSuffix, + Value: bytes, + Expiry: options.TTL, + } + + // write the record to the store + if err := s.store.Write(record); err != nil { + return errors.Wrap(err, "Error writing to the store") + } + + return nil +} + +func (s *evStore) backupLoop() { + for { + err := s.opts.Backup.Snapshot(s.store) + if err != nil { + logger.Errorf("Error running backup %s", err) + } + + time.Sleep(1 * time.Hour) + } +} + +// Backup is an interface for snapshotting the events store to long term storage +type Backup interface { + Snapshot(st store.Store) error +} diff --git a/events/store_test.go b/events/store_test.go new file mode 100644 index 00000000..2c1142eb --- /dev/null +++ b/events/store_test.go @@ -0,0 +1,47 @@ +package events + +import ( + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestStore(t *testing.T) { + store := NewStore() + + testData := []Event{ + {ID: uuid.New().String(), Topic: "foo"}, + {ID: uuid.New().String(), Topic: "foo"}, + {ID: uuid.New().String(), Topic: "bar"}, + } + + // write the records to the store + t.Run("Write", func(t *testing.T) { + for _, event := range testData { + err := store.Write(&event) + assert.Nilf(t, err, "Writing an event should not return an error") + } + }) + + // should not be able to read events from a blank topic + t.Run("ReadMissingTopic", func(t *testing.T) { + evs, err := store.Read("") + assert.Equal(t, err, ErrMissingTopic, "Reading a blank topic should return an error") + assert.Nil(t, evs, "No events should be returned") + }) + + // should only get the events from the topic requested + t.Run("ReadTopic", func(t *testing.T) { + evs, err := store.Read("foo") + assert.Nilf(t, err, "No error should be returned") + assert.Len(t, evs, 2, "Only the events for this topic should be returned") + }) + + // limits should be honoured + t.Run("ReadTopicLimit", func(t *testing.T) { + evs, err := store.Read("foo", ReadLimit(1)) + assert.Nilf(t, err, "No error should be returned") + assert.Len(t, evs, 1, "The result should include no more than the read limit") + }) +} diff --git a/events/stream_test.go b/events/stream_test.go new file mode 100644 index 00000000..55b15aea --- /dev/null +++ b/events/stream_test.go @@ -0,0 +1,241 @@ +package events + +import ( + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +type testPayload struct { + Message string +} + +type testCase struct { + str Stream + name string +} + +func TestStream(t *testing.T) { + tcs := []testCase{} + + stream, err := NewStream() + assert.Nilf(t, err, "NewStream should not return an error") + assert.NotNilf(t, stream, "NewStream should return a stream object") + tcs = append(tcs, testCase{str: stream, name: "memory"}) + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + runTestStream(t, tc.str) + }) + } + +} + +func runTestStream(t *testing.T, stream Stream) { + // TestMissingTopic will test the topic validation on publish + t.Run("TestMissingTopic", func(t *testing.T) { + err := stream.Publish("", nil) + assert.Equalf(t, err, ErrMissingTopic, "Publishing to a blank topic should return an error") + }) + + // TestConsumeTopic will publish a message to the test topic. The subscriber will subscribe to the + // same test topic. + t.Run("TestConsumeTopic", func(t *testing.T) { + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the subscriber + evChan, err := stream.Consume("test") + assert.Nilf(t, err, "Consume should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err := event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish("test", payload, WithMetadata(metadata)) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(1) + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + // TestConsumeGroup will publish a message to a random topic. Two subscribers will then consume + // the message from the firehose topic with different queues. The second subscriber will be registered + // after the message is published to test durability. + t.Run("TestConsumeGroup", func(t *testing.T) { + topic := uuid.New().String() + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the first subscriber + evChan1, err := stream.Consume(topic) + assert.Nilf(t, err, "Consume should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan1: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err := event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish(topic, payload, WithMetadata(metadata)) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(2) + + // create the second subscriber + evChan2, err := stream.Consume(topic, + WithGroup("second_queue"), + WithOffset(time.Now().Add(time.Minute*-1)), + ) + assert.Nilf(t, err, "Consume should not return an error") + + go func() { + timeout := time.NewTimer(time.Second * 1) + + select { + case event, _ := <-evChan2: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err := event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + t.Run("AckingNacking", func(t *testing.T) { + ch, err := stream.Consume("foobarAck", WithAutoAck(false, 5*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing") + assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 1"})) + assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 2"})) + + ev := <-ch + ev.Ack() + ev = <-ch + nacked := ev.ID + ev.Nack() + select { + case ev = <-ch: + assert.Equal(t, ev.ID, nacked, "Nacked message should have been received again") + assert.NoError(t, ev.Ack()) + case <-time.After(7 * time.Second): + t.Fatalf("Timed out waiting for message to be put back on queue") + } + + }) + + t.Run("Retries", func(t *testing.T) { + ch, err := stream.Consume("foobarRetries", WithAutoAck(false, 5*time.Second), WithRetryLimit(1)) + assert.NoError(t, err, "Unexpected error subscribing") + assert.NoError(t, stream.Publish("foobarRetries", map[string]string{"foo": "message 1"})) + + ev := <-ch + id := ev.ID + ev.Nack() + ev = <-ch + assert.Equal(t, id, ev.ID, "Nacked message should have been received again") + ev.Nack() + select { + case ev = <-ch: + t.Fatalf("Unexpected event received") + case <-time.After(7 * time.Second): + } + + }) + + t.Run("InfiniteRetries", func(t *testing.T) { + ch, err := stream.Consume("foobarRetriesInf", WithAutoAck(false, 2*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing") + assert.NoError(t, stream.Publish("foobarRetriesInf", map[string]string{"foo": "message 1"})) + + count := 0 + id := "" + for { + select { + case ev := <-ch: + if id != "" { + assert.Equal(t, id, ev.ID, "Nacked message should have been received again") + } + id = ev.ID + case <-time.After(3 * time.Second): + t.Fatalf("Unexpected event received") + } + + count++ + if count == 11 { + break + } + } + + }) + + t.Run("twoSubs", func(t *testing.T) { + ch1, err := stream.Consume("foobarTwoSubs1", WithAutoAck(false, 5*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing to topic 1") + ch2, err := stream.Consume("foobarTwoSubs2", WithAutoAck(false, 5*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing to topic 2") + + assert.NoError(t, stream.Publish("foobarTwoSubs2", map[string]string{"foo": "message 1"})) + assert.NoError(t, stream.Publish("foobarTwoSubs1", map[string]string{"foo": "message 1"})) + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + ev := <-ch1 + assert.Equal(t, "foobarTwoSubs1", ev.Topic, "Received message from unexpected topic") + wg.Done() + }() + go func() { + ev := <-ch2 + assert.Equal(t, "foobarTwoSubs2", ev.Topic, "Received message from unexpected topic") + wg.Done() + }() + wg.Wait() + }) +} diff --git a/go.mod b/go.mod index 89e32c48..2f8e2ade 100644 --- a/go.mod +++ b/go.mod @@ -30,7 +30,7 @@ require ( github.com/streadway/amqp v1.1.0 github.com/stretchr/testify v1.10.0 github.com/test-go/testify v1.1.4 - github.com/urfave/cli/v2 v2.25.7 + github.com/urfave/cli/v2 v2.27.6 go.etcd.io/bbolt v1.4.0 go.etcd.io/etcd/api/v3 v3.5.21 go.etcd.io/etcd/client/v3 v3.5.21 @@ -40,8 +40,8 @@ require ( golang.org/x/crypto v0.37.0 golang.org/x/net v0.38.0 golang.org/x/sync v0.13.0 - google.golang.org/grpc v1.72.1 - google.golang.org/grpc/examples v0.0.0-20250514161145-5c0d55244474 + google.golang.org/grpc v1.71.1 + google.golang.org/grpc/examples v0.0.0-20250515150734-f2d3e11f3057 google.golang.org/protobuf v1.36.6 ) @@ -52,7 +52,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/color v1.16.0 // indirect github.com/go-logr/logr v1.4.2 // indirect @@ -65,7 +65,7 @@ require ( github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect - github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/serf v0.10.1 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.14.3 // indirect @@ -88,7 +88,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect + github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.21 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel/metric v1.35.0 // indirect diff --git a/go.sum b/go.sum index f0295213..4c582d0c 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,8 @@ github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzA github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= -github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= -github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= +github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -133,8 +133,8 @@ github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI= github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= -github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= +github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR/prTM= @@ -337,10 +337,10 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= -github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= -github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= -github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= +github.com/urfave/cli/v2 v2.27.6 h1:VdRdS98FNhKZ8/Az8B7MTyGQmpIr36O1EHybx/LaZ4g= +github.com/urfave/cli/v2 v2.27.6/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= +github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -496,10 +496,10 @@ google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 h1: google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463/go.mod h1:U90ffi8eUL9MwPcrJylN5+Mk2v3vuPDptd5yyNUiRR8= google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g= google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= -google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA= -google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= -google.golang.org/grpc/examples v0.0.0-20250514161145-5c0d55244474 h1:7B8e8jJRSI3lfKIzwRDNU6f0yzNFhiPXFGbOuNETfIw= -google.golang.org/grpc/examples v0.0.0-20250514161145-5c0d55244474/go.mod h1:WPWnet+nYurNGpV0rVYHI1YuOJwVHeM3t8f76m410XM= +google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI= +google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/grpc/examples v0.0.0-20250515150734-f2d3e11f3057 h1:lPv+iqlAyiKMjbL3ivJlAASixPknLv806R6zaoE4PUM= +google.golang.org/grpc/examples v0.0.0-20250515150734-f2d3e11f3057/go.mod h1:WPWnet+nYurNGpV0rVYHI1YuOJwVHeM3t8f76m410XM= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/profile/profile.go b/profile/profile.go index 1d8f62cf..f6930ec1 100644 --- a/profile/profile.go +++ b/profile/profile.go @@ -8,6 +8,8 @@ import ( natslib "github.com/nats-io/nats.go" "go-micro.dev/v5/broker" "go-micro.dev/v5/broker/nats" + "go-micro.dev/v5/events" + nevents "go-micro.dev/v5/events/natsjs" "go-micro.dev/v5/registry" nreg "go-micro.dev/v5/registry/nats" "go-micro.dev/v5/store" @@ -22,34 +24,46 @@ type Profile struct { Broker broker.Broker Store store.Store Transport transport.Transport + Stream events.Stream } // LocalProfile returns a profile with local mDNS as the registry, HTTP as the broker, file as the store, and HTTP as the transport // It is used for local development and testing -func LocalProfile() Profile { +func LocalProfile() (Profile, error) { + stream, err := events.NewStream() return Profile{ Registry: registry.NewMDNSRegistry(), Broker: broker.NewHttpBroker(), Store: store.NewFileStore(), Transport: transport.NewHTTPTransport(), - } + Stream: stream, + }, err } // NatsProfile returns a profile with NATS as the registry, broker, store, and transport // It uses the environment variable MICR_NATS_ADDRESS to set the NATS server address // If the variable is not set, it defaults to nats://0.0.0.0:4222 which will connect to a local NATS server -func NatsProfile() Profile { +func NatsProfile() (Profile, error) { addr := os.Getenv("MICRO_NATS_ADDRESS") if addr == "" { addr = "nats://0.0.0.0:4222" } // Split the address by comma, trim whitespace, and convert to a slice of strings addrs := splitNatsAdressList(addr) + reg := nreg.NewNatsRegistry(registry.Addrs(addrs...)) - brok := nats.NewNatsBroker(broker.Addrs(addrs...)) + + nopts := natslib.GetDefaultOptions() + nopts.Servers = addrs + brok := nats.NewNatsBroker(broker.Addrs(addrs...), nats.Options(nopts)) + st := nstore.NewStore(nstore.NatsOptions(natslib.Options{Servers: addrs})) tx := ntx.NewTransport(ntx.Options(natslib.Options{Servers: addrs})) + stream, err := nevents.NewStream( + nevents.Address(addr), + ) + registry.DefaultRegistry = reg broker.DefaultBroker = brok store.DefaultStore = st @@ -59,7 +73,8 @@ func NatsProfile() Profile { Broker: brok, Store: st, Transport: tx, - } + Stream: stream, + }, err } func splitNatsAdressList(addr string) []string { diff --git a/store/file.go b/store/file.go index 54396658..2fd7b1e1 100644 --- a/store/file.go +++ b/store/file.go @@ -96,13 +96,8 @@ func (m *fileStore) init(opts ...Option) error { if m.dir == "" { m.dir = DefaultDir } - - // Ignoring this as the folder might exist. - // Reads/Writes updates will return with sensible error messages - // about the dir not existing in case this cannot create the path anyway - os.MkdirAll(m.dir, 0700) - - return nil + // create the directory + return os.MkdirAll(m.dir, 0700) } func (f *fileStore) getDB(database, table string) (*fileHandle, error) {