mirror of
				https://github.com/go-micro/go-micro.git
				synced 2025-10-30 23:27:41 +02:00 
			
		
		
		
	Re-add events package (#2761)
* Re-add events package * run redis as a dep * remove redis events * fix: data race on event subscriber * fix: data race in tests * fix: store errors * fix: lint issues * feat: default stream * Update file.go --------- Co-authored-by: Brian Ketelsen <bketelsen@gmail.com>
This commit is contained in:
		
							
								
								
									
										29
									
								
								cmd/cmd.go
									
									
									
									
									
								
							
							
						
						
									
										29
									
								
								cmd/cmd.go
									
									
									
									
									
								
							| @@ -23,6 +23,7 @@ import ( | ||||
| 	"go-micro.dev/v5/debug/profile/http" | ||||
| 	"go-micro.dev/v5/debug/profile/pprof" | ||||
| 	"go-micro.dev/v5/debug/trace" | ||||
| 	"go-micro.dev/v5/events" | ||||
| 	"go-micro.dev/v5/logger" | ||||
| 	mprofile "go-micro.dev/v5/profile" | ||||
| 	"go-micro.dev/v5/registry" | ||||
| @@ -293,6 +294,7 @@ var ( | ||||
| 	DefaultCaches = map[string]func(...cache.Option) cache.Cache{ | ||||
| 		"redis": redis.NewRedisCache, | ||||
| 	} | ||||
| 	DefaultStreams = map[string]func(...events.Option) (events.Stream, error){} | ||||
| ) | ||||
|  | ||||
| func init() { | ||||
| @@ -313,6 +315,7 @@ func newCmd(opts ...Option) Cmd { | ||||
| 		DebugProfile: &profile.DefaultProfile, | ||||
| 		Config:       &config.DefaultConfig, | ||||
| 		Cache:        &cache.DefaultCache, | ||||
| 		Stream:       &events.DefaultStream, | ||||
|  | ||||
| 		Brokers:       DefaultBrokers, | ||||
| 		Clients:       DefaultClients, | ||||
| @@ -376,7 +379,10 @@ func (c *cmd) Before(ctx *cli.Context) error { | ||||
| 	if profileName != "" { | ||||
| 		switch profileName { | ||||
| 		case "local": | ||||
| 			imported := mprofile.LocalProfile() | ||||
| 			imported, ierr := mprofile.LocalProfile() | ||||
| 			if ierr != nil { | ||||
| 				return fmt.Errorf("failed to load local profile: %v", ierr) | ||||
| 			} | ||||
| 			*c.opts.Registry = imported.Registry | ||||
| 			registry.DefaultRegistry = imported.Registry | ||||
| 			*c.opts.Broker = imported.Broker | ||||
| @@ -386,7 +392,10 @@ func (c *cmd) Before(ctx *cli.Context) error { | ||||
| 			*c.opts.Transport = imported.Transport | ||||
| 			transport.DefaultTransport = imported.Transport | ||||
| 		case "nats": | ||||
| 			imported := mprofile.NatsProfile() | ||||
| 			imported, ierr := mprofile.NatsProfile() | ||||
| 			if ierr != nil { | ||||
| 				return fmt.Errorf("failed to load nats profile: %v", ierr) | ||||
| 			} | ||||
| 			// Set the registry | ||||
| 			sopts, clopts := c.setRegistry(imported.Registry) | ||||
| 			serverOpts = append(serverOpts, sopts...) | ||||
| @@ -407,6 +416,11 @@ func (c *cmd) Before(ctx *cli.Context) error { | ||||
| 			serverOpts = append(serverOpts, sopts...) | ||||
| 			clientOpts = append(clientOpts, clopts...) | ||||
|  | ||||
| 			// Set the stream | ||||
| 			sopts, clopts = c.setStream(imported.Stream) | ||||
| 			serverOpts = append(serverOpts, sopts...) | ||||
| 			clientOpts = append(clientOpts, clopts...) | ||||
|  | ||||
| 		// Add more profiles as needed | ||||
| 		default: | ||||
| 			return fmt.Errorf("unsupported profile: %s", profileName) | ||||
| @@ -701,6 +715,17 @@ func (c *cmd) setRegistry(r registry.Registry) ([]server.Option, []client.Option | ||||
| 	registry.DefaultRegistry = *c.opts.Registry | ||||
| 	return serverOpts, clientOpts | ||||
| } | ||||
| func (c *cmd) setStream(s events.Stream) ([]server.Option, []client.Option) { | ||||
| 	var serverOpts []server.Option | ||||
| 	var clientOpts []client.Option | ||||
| 	*c.opts.Stream = s | ||||
| 	// TODO: do server and client need a Stream? | ||||
| 	// serverOpts = append(serverOpts, server.Registry(*c.opts.Registry)) | ||||
| 	// clientOpts = append(clientOpts, client.Registry(*c.opts.Registry)) | ||||
|  | ||||
| 	events.DefaultStream = *c.opts.Stream | ||||
| 	return serverOpts, clientOpts | ||||
| } | ||||
|  | ||||
| func (c *cmd) setBroker(b broker.Broker) ([]server.Option, []client.Option) { | ||||
| 	var serverOpts []server.Option | ||||
|   | ||||
| @@ -10,6 +10,7 @@ import ( | ||||
| 	"go-micro.dev/v5/config" | ||||
| 	"go-micro.dev/v5/debug/profile" | ||||
| 	"go-micro.dev/v5/debug/trace" | ||||
| 	"go-micro.dev/v5/events" | ||||
| 	"go-micro.dev/v5/registry" | ||||
| 	"go-micro.dev/v5/selector" | ||||
| 	"go-micro.dev/v5/server" | ||||
| @@ -42,6 +43,7 @@ type Options struct { | ||||
| 	Broker     *broker.Broker | ||||
| 	Auths      map[string]func(...auth.Option) auth.Auth | ||||
| 	Store      *store.Store | ||||
| 	Stream     *events.Stream | ||||
| 	Configs    map[string]func(...config.Option) (config.Config, error) | ||||
| 	Clients    map[string]func(...client.Option) client.Client | ||||
| 	Registries map[string]func(...registry.Option) registry.Registry | ||||
| @@ -49,6 +51,7 @@ type Options struct { | ||||
| 	Servers    map[string]func(...server.Option) server.Server | ||||
| 	Transports map[string]func(...transport.Option) transport.Transport | ||||
| 	Stores     map[string]func(...store.Option) store.Store | ||||
| 	Streams    map[string]func(...events.Option) events.Stream | ||||
| 	Tracers    map[string]func(...trace.Option) trace.Tracer | ||||
| 	Version    string | ||||
|  | ||||
| @@ -141,6 +144,13 @@ func Store(s *store.Store) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func Stream(s *events.Stream) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Stream = s | ||||
| 		events.DefaultStream = *s | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func Tracer(t *trace.Tracer) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Tracer = t | ||||
| @@ -169,6 +179,13 @@ func NewBroker(name string, b func(...broker.Option) broker.Broker) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // New stream func. | ||||
| func NewStream(name string, b func(...events.Option) events.Stream) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Streams[name] = b | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // New cache func. | ||||
| func NewCache(name string, c func(...cache.Option) cache.Cache) Option { | ||||
| 	return func(o *Options) { | ||||
|   | ||||
							
								
								
									
										92
									
								
								events/events.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										92
									
								
								events/events.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,92 @@ | ||||
| // Package events is for event streaming and storage | ||||
| package events | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"errors" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	// DefaultStream is the default events stream implementation | ||||
| 	DefaultStream Stream | ||||
| 	// DefaultStore is the default events store implementation | ||||
| 	DefaultStore Store | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	// ErrMissingTopic is returned if a blank topic was provided to publish | ||||
| 	ErrMissingTopic = errors.New("missing topic") | ||||
| 	// ErrEncodingMessage is returned from publish if there was an error encoding the message option | ||||
| 	ErrEncodingMessage = errors.New("error encoding message") | ||||
| ) | ||||
|  | ||||
| // Stream is an event streaming interface | ||||
| type Stream interface { | ||||
| 	Publish(topic string, msg interface{}, opts ...PublishOption) error | ||||
| 	Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) | ||||
| } | ||||
|  | ||||
| // Store is an event store interface | ||||
| type Store interface { | ||||
| 	Read(topic string, opts ...ReadOption) ([]*Event, error) | ||||
| 	Write(event *Event, opts ...WriteOption) error | ||||
| } | ||||
|  | ||||
| type AckFunc func() error | ||||
| type NackFunc func() error | ||||
|  | ||||
| // Event is the object returned by the broker when you subscribe to a topic | ||||
| type Event struct { | ||||
| 	// ID to uniquely identify the event | ||||
| 	ID string | ||||
| 	// Topic of event, e.g. "registry.service.created" | ||||
| 	Topic string | ||||
| 	// Timestamp of the event | ||||
| 	Timestamp time.Time | ||||
| 	// Metadata contains the values the event was indexed by | ||||
| 	Metadata map[string]string | ||||
| 	// Payload contains the encoded message | ||||
| 	Payload []byte | ||||
|  | ||||
| 	ackFunc  AckFunc | ||||
| 	nackFunc NackFunc | ||||
| } | ||||
|  | ||||
| // Unmarshal the events message into an object | ||||
| func (e *Event) Unmarshal(v interface{}) error { | ||||
| 	return json.Unmarshal(e.Payload, v) | ||||
| } | ||||
|  | ||||
| // Ack acknowledges successful processing of the event in ManualAck mode | ||||
| func (e *Event) Ack() error { | ||||
| 	return e.ackFunc() | ||||
| } | ||||
|  | ||||
| func (e *Event) SetAckFunc(f AckFunc) { | ||||
| 	e.ackFunc = f | ||||
| } | ||||
|  | ||||
| // Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode | ||||
| func (e *Event) Nack() error { | ||||
| 	return e.nackFunc() | ||||
| } | ||||
|  | ||||
| func (e *Event) SetNackFunc(f NackFunc) { | ||||
| 	e.nackFunc = f | ||||
| } | ||||
|  | ||||
| // Publish an event to a topic | ||||
| func Publish(topic string, msg interface{}, opts ...PublishOption) error { | ||||
| 	return DefaultStream.Publish(topic, msg, opts...) | ||||
| } | ||||
|  | ||||
| // Consume to events | ||||
| func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) { | ||||
| 	return DefaultStream.Consume(topic, opts...) | ||||
| } | ||||
|  | ||||
| // Read events for a topic | ||||
| func Read(topic string, opts ...ReadOption) ([]*Event, error) { | ||||
| 	return DefaultStore.Read(topic, opts...) | ||||
| } | ||||
							
								
								
									
										246
									
								
								events/memory.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										246
									
								
								events/memory.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,246 @@ | ||||
| package events | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/pkg/errors" | ||||
| 	"go-micro.dev/v5/logger" | ||||
| 	"go-micro.dev/v5/store" | ||||
| ) | ||||
|  | ||||
| // NewStream returns an initialized memory stream | ||||
| func NewStream(opts ...Option) (Stream, error) { | ||||
| 	// parse the options | ||||
| 	var options Options | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
| 	return &mem{store: store.NewMemoryStore()}, nil | ||||
| } | ||||
|  | ||||
| type subscriber struct { | ||||
| 	Group   string | ||||
| 	Topic   string | ||||
| 	Channel chan Event | ||||
|  | ||||
| 	sync.RWMutex | ||||
| 	retryMap   map[string]int | ||||
| 	retryLimit int | ||||
| 	autoAck    bool | ||||
| 	ackWait    time.Duration | ||||
| } | ||||
|  | ||||
| type mem struct { | ||||
| 	store store.Store | ||||
|  | ||||
| 	subs []*subscriber | ||||
| 	sync.RWMutex | ||||
| } | ||||
|  | ||||
| func (m *mem) Publish(topic string, msg interface{}, opts ...PublishOption) error { | ||||
| 	// validate the topic | ||||
| 	if len(topic) == 0 { | ||||
| 		return ErrMissingTopic | ||||
| 	} | ||||
|  | ||||
| 	// parse the options | ||||
| 	options := PublishOptions{ | ||||
| 		Timestamp: time.Now(), | ||||
| 	} | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// encode the message if it's not already encoded | ||||
| 	var payload []byte | ||||
| 	if p, ok := msg.([]byte); ok { | ||||
| 		payload = p | ||||
| 	} else { | ||||
| 		p, err := json.Marshal(msg) | ||||
| 		if err != nil { | ||||
| 			return ErrEncodingMessage | ||||
| 		} | ||||
| 		payload = p | ||||
| 	} | ||||
|  | ||||
| 	// construct the event | ||||
| 	event := &Event{ | ||||
| 		ID:        uuid.New().String(), | ||||
| 		Topic:     topic, | ||||
| 		Timestamp: options.Timestamp, | ||||
| 		Metadata:  options.Metadata, | ||||
| 		Payload:   payload, | ||||
| 	} | ||||
|  | ||||
| 	// serialize the event to bytes | ||||
| 	bytes, err := json.Marshal(event) | ||||
| 	if err != nil { | ||||
| 		return errors.Wrap(err, "Error encoding event") | ||||
| 	} | ||||
|  | ||||
| 	// write to the store | ||||
| 	key := fmt.Sprintf("%v/%v", event.Topic, event.ID) | ||||
| 	if err := m.store.Write(&store.Record{Key: key, Value: bytes}); err != nil { | ||||
| 		return errors.Wrap(err, "Error writing event to store") | ||||
| 	} | ||||
|  | ||||
| 	// send to the subscribers async | ||||
| 	go m.handleEvent(event) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (m *mem) Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) { | ||||
| 	// validate the topic | ||||
| 	if len(topic) == 0 { | ||||
| 		return nil, ErrMissingTopic | ||||
| 	} | ||||
|  | ||||
| 	// parse the options | ||||
| 	options := ConsumeOptions{ | ||||
| 		Group:   uuid.New().String(), | ||||
| 		AutoAck: true, | ||||
| 	} | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
| 	// TODO RetryLimit | ||||
|  | ||||
| 	// setup the subscriber | ||||
| 	sub := &subscriber{ | ||||
| 		Channel:    make(chan Event), | ||||
| 		Topic:      topic, | ||||
| 		Group:      options.Group, | ||||
| 		retryMap:   map[string]int{}, | ||||
| 		autoAck:    true, | ||||
| 		retryLimit: options.GetRetryLimit(), | ||||
| 	} | ||||
|  | ||||
| 	if !options.AutoAck { | ||||
| 		if options.AckWait == 0 { | ||||
| 			return nil, fmt.Errorf("invalid AckWait passed, should be positive integer") | ||||
| 		} | ||||
| 		sub.autoAck = options.AutoAck | ||||
| 		sub.ackWait = options.AckWait | ||||
| 	} | ||||
|  | ||||
| 	// register the subscriber | ||||
| 	m.Lock() | ||||
| 	m.subs = append(m.subs, sub) | ||||
| 	m.Unlock() | ||||
|  | ||||
| 	// lookup previous events if the start time option was passed | ||||
| 	if options.Offset.Unix() > 0 { | ||||
| 		go m.lookupPreviousEvents(sub, options.Offset) | ||||
| 	} | ||||
|  | ||||
| 	// return the channel | ||||
| 	return sub.Channel, nil | ||||
| } | ||||
|  | ||||
| // lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends | ||||
| // them into the subscribers channel | ||||
| func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { | ||||
| 	// lookup all events which match the topic (a blank topic will return all results) | ||||
| 	recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix()) | ||||
| 	if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { | ||||
| 		logger.Errorf("Error looking up previous events: %v", err) | ||||
| 		return | ||||
| 	} else if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// loop through the records and send it to the channel if it matches | ||||
| 	for _, r := range recs { | ||||
| 		var ev Event | ||||
| 		if err := json.Unmarshal(r.Value, &ev); err != nil { | ||||
| 			continue | ||||
| 		} | ||||
| 		if ev.Timestamp.Unix() < startTime.Unix() { | ||||
| 			continue | ||||
| 		} | ||||
| 		sendEvent(&ev, sub) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // handleEvents sends the event to any registered subscribers. | ||||
| func (m *mem) handleEvent(ev *Event) { | ||||
| 	m.RLock() | ||||
| 	subs := m.subs | ||||
| 	m.RUnlock() | ||||
|  | ||||
| 	// filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message | ||||
| 	// being sent to two subscribers with the same queue. | ||||
| 	filteredSubs := map[string]*subscriber{} | ||||
|  | ||||
| 	// filter down to subscribers who are interested in this topic | ||||
| 	for _, sub := range subs { | ||||
| 		if len(sub.Topic) == 0 || sub.Topic == ev.Topic { | ||||
| 			filteredSubs[sub.Group] = sub | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// send the message to each channel async (since one channel might be blocked) | ||||
| 	for _, sub := range filteredSubs { | ||||
| 		sendEvent(ev, sub) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func sendEvent(ev *Event, sub *subscriber) { | ||||
| 	go func(s *subscriber) { | ||||
| 		evCopy := *ev | ||||
| 		if s.autoAck { | ||||
| 			s.Channel <- evCopy | ||||
| 			return | ||||
| 		} | ||||
| 		evCopy.SetAckFunc(ackFunc(s, evCopy)) | ||||
| 		evCopy.SetNackFunc(nackFunc(s, evCopy)) | ||||
| 		s.Lock() | ||||
| 		s.retryMap[evCopy.ID] = 0 | ||||
| 		s.Unlock() | ||||
| 		tick := time.NewTicker(s.ackWait) | ||||
| 		defer tick.Stop() | ||||
| 		for range tick.C { | ||||
| 			s.Lock() | ||||
| 			count, ok := s.retryMap[evCopy.ID] | ||||
| 			s.Unlock() | ||||
| 			if !ok { | ||||
| 				// success | ||||
| 				break | ||||
| 			} | ||||
|  | ||||
| 			if s.retryLimit > -1 && count > s.retryLimit { | ||||
| 				if logger.V(logger.ErrorLevel, logger.DefaultLogger) { | ||||
| 					logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit) | ||||
| 				} | ||||
| 				s.Lock() | ||||
| 				delete(s.retryMap, evCopy.ID) | ||||
| 				s.Unlock() | ||||
| 				return | ||||
| 			} | ||||
| 			s.Channel <- evCopy | ||||
| 			s.Lock() | ||||
| 			s.retryMap[evCopy.ID] = count + 1 | ||||
| 			s.Unlock() | ||||
| 		} | ||||
| 	}(sub) | ||||
| } | ||||
|  | ||||
| func ackFunc(s *subscriber, evCopy Event) func() error { | ||||
| 	return func() error { | ||||
| 		s.Lock() | ||||
| 		delete(s.retryMap, evCopy.ID) | ||||
| 		s.Unlock() | ||||
| 		return nil | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func nackFunc(_ *subscriber, _ Event) func() error { | ||||
| 	return func() error { | ||||
| 		return nil | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										48
									
								
								events/natsjs/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								events/natsjs/README.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,48 @@ | ||||
| # NATS JetStream | ||||
|  | ||||
| This plugin uses NATS with JetStream to send and receive events. | ||||
|  | ||||
| ## Create a stream | ||||
|  | ||||
| ```go | ||||
| ev, err := natsjs.NewStream( | ||||
|   natsjs.Address("nats://10.0.1.46:4222"), | ||||
|   natsjs.MaxAge(24*160*time.Minute), | ||||
| ) | ||||
| ``` | ||||
|  | ||||
| ## Consume a stream | ||||
|  | ||||
| ```go | ||||
| ee, err := events.Consume("test", | ||||
|   events.WithAutoAck(false, time.Second*30), | ||||
|   events.WithGroup("testgroup"), | ||||
| ) | ||||
| if err != nil { | ||||
|   panic(err) | ||||
| } | ||||
| go func() { | ||||
|   for { | ||||
|     msg := <-ee | ||||
|     // Process the message | ||||
|     logger.Info("Received message:", string(msg.Payload)) | ||||
|     err := msg.Ack() | ||||
|     if err != nil { | ||||
|       logger.Error("Error acknowledging message:", err) | ||||
|     } else { | ||||
|       logger.Info("Message acknowledged") | ||||
|     } | ||||
|   } | ||||
| }() | ||||
|  | ||||
| ``` | ||||
|  | ||||
| ## Publish an Event to the stream | ||||
|  | ||||
| ```go | ||||
| err = ev.Publish("test", []byte("hello world")) | ||||
| if err != nil { | ||||
|   panic(err) | ||||
| } | ||||
| ``` | ||||
|  | ||||
							
								
								
									
										94
									
								
								events/natsjs/helpers_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										94
									
								
								events/natsjs/helpers_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,94 @@ | ||||
| package natsjs_test | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"net" | ||||
| 	"path/filepath" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	nserver "github.com/nats-io/nats-server/v2/server" | ||||
| 	"github.com/test-go/testify/require" | ||||
| ) | ||||
|  | ||||
| func getFreeLocalhostAddress() string { | ||||
| 	l, _ := net.Listen("tcp", "127.0.0.1:0") | ||||
| 	defer l.Close() | ||||
| 	return l.Addr().String() | ||||
| } | ||||
|  | ||||
| func natsServer(ctx context.Context, t *testing.T, opts *nserver.Options) { | ||||
| 	t.Helper() | ||||
|  | ||||
| 	server, err := nserver.NewServer( | ||||
| 		opts, | ||||
| 	) | ||||
| 	require.NoError(t, err) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	server.SetLoggerV2( | ||||
| 		NewLogWrapper(), | ||||
| 		true, true, false, | ||||
| 	) | ||||
|  | ||||
| 	// first start NATS | ||||
| 	go server.Start() | ||||
| 	ready := server.ReadyForConnections(time.Second * 10) | ||||
| 	if !ready { | ||||
| 		t.Fatalf("NATS server not ready") | ||||
| 	} | ||||
| 	jsConf := &nserver.JetStreamConfig{ | ||||
| 		StoreDir: filepath.Join(t.TempDir(), "nats-js"), | ||||
| 	} | ||||
|  | ||||
| 	// second start JetStream | ||||
| 	err = server.EnableJetStream(jsConf) | ||||
| 	require.NoError(t, err) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	<-ctx.Done() | ||||
|  | ||||
| 	server.Shutdown() | ||||
| } | ||||
|  | ||||
| func NewLogWrapper() *LogWrapper { | ||||
| 	return &LogWrapper{} | ||||
| } | ||||
|  | ||||
| type LogWrapper struct { | ||||
| } | ||||
|  | ||||
| // Noticef logs a notice statement. | ||||
| func (l *LogWrapper) Noticef(format string, v ...interface{}) { | ||||
| 	fmt.Printf(format+"\n", v...) | ||||
| } | ||||
|  | ||||
| // Warnf logs a warning statement. | ||||
| func (l *LogWrapper) Warnf(format string, v ...interface{}) { | ||||
| 	fmt.Printf(format+"\n", v...) | ||||
| } | ||||
|  | ||||
| // Fatalf logs a fatal statement. | ||||
| func (l *LogWrapper) Fatalf(format string, v ...interface{}) { | ||||
| 	fmt.Printf(format+"\n", v...) | ||||
| } | ||||
|  | ||||
| // Errorf logs an error statement. | ||||
| func (l *LogWrapper) Errorf(format string, v ...interface{}) { | ||||
| 	fmt.Printf(format+"\n", v...) | ||||
| } | ||||
|  | ||||
| // Debugf logs a debug statement. | ||||
| func (l *LogWrapper) Debugf(format string, v ...interface{}) { | ||||
| 	fmt.Printf(format+"\n", v...) | ||||
| } | ||||
|  | ||||
| // Tracef logs a trace statement. | ||||
| func (l *LogWrapper) Tracef(format string, v ...interface{}) { | ||||
| 	fmt.Printf(format+"\n", v...) | ||||
| } | ||||
							
								
								
									
										270
									
								
								events/natsjs/nats.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										270
									
								
								events/natsjs/nats.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,270 @@ | ||||
| // Package natsjs provides a NATS Jetstream implementation of the events.Stream interface. | ||||
| package natsjs | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/google/uuid" | ||||
| 	nats "github.com/nats-io/nats.go" | ||||
| 	"github.com/pkg/errors" | ||||
|  | ||||
| 	"go-micro.dev/v5/events" | ||||
| 	"go-micro.dev/v5/logger" | ||||
| ) | ||||
|  | ||||
| const ( | ||||
| 	defaultClusterID = "micro" | ||||
| ) | ||||
|  | ||||
| // NewStream returns an initialized nats stream or an error if the connection to the nats | ||||
| // server could not be established. | ||||
| func NewStream(opts ...Option) (events.Stream, error) { | ||||
| 	// parse the options | ||||
| 	options := Options{ | ||||
| 		ClientID:  uuid.New().String(), | ||||
| 		ClusterID: defaultClusterID, | ||||
| 		Logger:    logger.DefaultLogger, | ||||
| 	} | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	s := &stream{opts: options} | ||||
|  | ||||
| 	natsJetStreamCtx, err := connectToNatsJetStream(options) | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error connecting to nats cluster %v: %w", options.ClusterID, err) | ||||
| 	} | ||||
|  | ||||
| 	s.natsJetStreamCtx = natsJetStreamCtx | ||||
|  | ||||
| 	return s, nil | ||||
| } | ||||
|  | ||||
| type stream struct { | ||||
| 	opts             Options | ||||
| 	natsJetStreamCtx nats.JetStreamContext | ||||
| } | ||||
|  | ||||
| func connectToNatsJetStream(options Options) (nats.JetStreamContext, error) { | ||||
| 	nopts := nats.GetDefaultOptions() | ||||
| 	if options.TLSConfig != nil { | ||||
| 		nopts.Secure = true | ||||
| 		nopts.TLSConfig = options.TLSConfig | ||||
| 	} | ||||
|  | ||||
| 	if options.NkeyConfig != "" { | ||||
| 		nopts.Nkey = options.NkeyConfig | ||||
| 	} | ||||
|  | ||||
| 	if len(options.Address) > 0 { | ||||
| 		nopts.Servers = strings.Split(options.Address, ",") | ||||
| 	} | ||||
|  | ||||
| 	if options.Name != "" { | ||||
| 		nopts.Name = options.Name | ||||
| 	} | ||||
|  | ||||
| 	if options.Username != "" && options.Password != "" { | ||||
| 		nopts.User = options.Username | ||||
| 		nopts.Password = options.Password | ||||
| 	} | ||||
|  | ||||
| 	conn, err := nopts.Connect() | ||||
| 	if err != nil { | ||||
| 		tls := nopts.TLSConfig != nil | ||||
| 		return nil, fmt.Errorf("error connecting to nats at %v with tls enabled (%v): %w", options.Address, tls, err) | ||||
| 	} | ||||
|  | ||||
| 	js, err := conn.JetStream() | ||||
| 	if err != nil { | ||||
| 		return nil, fmt.Errorf("error while obtaining JetStream context: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	return js, nil | ||||
| } | ||||
|  | ||||
| // Publish a message to a topic. | ||||
| func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error { | ||||
| 	// validate the topic | ||||
| 	if len(topic) == 0 { | ||||
| 		return events.ErrMissingTopic | ||||
| 	} | ||||
|  | ||||
| 	// parse the options | ||||
| 	options := events.PublishOptions{ | ||||
| 		Timestamp: time.Now(), | ||||
| 	} | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// encode the message if it's not already encoded | ||||
| 	var payload []byte | ||||
| 	if p, ok := msg.([]byte); ok { | ||||
| 		payload = p | ||||
| 	} else { | ||||
| 		p, err := json.Marshal(msg) | ||||
| 		if err != nil { | ||||
| 			return events.ErrEncodingMessage | ||||
| 		} | ||||
| 		payload = p | ||||
| 	} | ||||
|  | ||||
| 	// construct the event | ||||
| 	event := &events.Event{ | ||||
| 		ID:        uuid.New().String(), | ||||
| 		Topic:     topic, | ||||
| 		Timestamp: options.Timestamp, | ||||
| 		Metadata:  options.Metadata, | ||||
| 		Payload:   payload, | ||||
| 	} | ||||
|  | ||||
| 	// serialize the event to bytes | ||||
| 	bytes, err := json.Marshal(event) | ||||
| 	if err != nil { | ||||
| 		return errors.Wrap(err, "Error encoding event") | ||||
| 	} | ||||
|  | ||||
| 	// publish the event to the topic's channel | ||||
| 	// publish synchronously if configured | ||||
| 	if s.opts.SyncPublish { | ||||
| 		_, err := s.natsJetStreamCtx.Publish(event.Topic, bytes) | ||||
| 		if err != nil { | ||||
| 			err = errors.Wrap(err, "Error publishing message to topic") | ||||
| 		} | ||||
|  | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	// publish asynchronously by default | ||||
| 	if _, err := s.natsJetStreamCtx.PublishAsync(event.Topic, bytes); err != nil { | ||||
| 		return errors.Wrap(err, "Error publishing message to topic") | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Consume from a topic. | ||||
| func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan events.Event, error) { | ||||
| 	// validate the topic | ||||
| 	if len(topic) == 0 { | ||||
| 		return nil, events.ErrMissingTopic | ||||
| 	} | ||||
|  | ||||
| 	log := s.opts.Logger | ||||
|  | ||||
| 	// parse the options | ||||
| 	options := events.ConsumeOptions{ | ||||
| 		Group: uuid.New().String(), | ||||
| 	} | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// setup the subscriber | ||||
| 	channel := make(chan events.Event) | ||||
| 	handleMsg := func(msg *nats.Msg) { | ||||
| 		ctx, cancel := context.WithCancel(context.TODO()) | ||||
| 		defer cancel() | ||||
|  | ||||
| 		// decode the message | ||||
| 		var evt events.Event | ||||
| 		if err := json.Unmarshal(msg.Data, &evt); err != nil { | ||||
| 			log.Logf(logger.ErrorLevel, "Error decoding message: %v", err) | ||||
| 			// not acknowledging the message is the way to indicate an error occurred | ||||
| 			return | ||||
| 		} | ||||
| 		if options.AutoAck { | ||||
| 			// set up the ack funcs | ||||
| 			evt.SetAckFunc(func() error { | ||||
| 				return msg.Ack() | ||||
| 			}) | ||||
|  | ||||
| 			evt.SetNackFunc(func() error { | ||||
| 				return msg.Nak() | ||||
| 			}) | ||||
| 		} else { | ||||
| 			// set up the ack funcs | ||||
| 			evt.SetAckFunc(func() error { | ||||
| 				return nil | ||||
| 			}) | ||||
| 			evt.SetNackFunc(func() error { | ||||
| 				return nil | ||||
| 			}) | ||||
| 		} | ||||
|  | ||||
| 		// push onto the channel and wait for the consumer to take the event off before we acknowledge it. | ||||
| 		channel <- evt | ||||
|  | ||||
| 		if !options.AutoAck { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		if err := msg.Ack(nats.Context(ctx)); err != nil { | ||||
|  | ||||
| 			log.Logf(logger.ErrorLevel, "Error acknowledging message: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// ensure that a stream exists for that topic | ||||
| 	_, err := s.natsJetStreamCtx.StreamInfo(topic) | ||||
| 	if err != nil { | ||||
| 		cfg := &nats.StreamConfig{ | ||||
| 			Name: topic, | ||||
| 		} | ||||
| 		if s.opts.RetentionPolicy != 0 { | ||||
| 			cfg.Retention = nats.RetentionPolicy(s.opts.RetentionPolicy) | ||||
| 		} | ||||
| 		if s.opts.MaxAge > 0 { | ||||
| 			cfg.MaxAge = s.opts.MaxAge | ||||
| 		} | ||||
|  | ||||
| 		_, err = s.natsJetStreamCtx.AddStream(cfg) | ||||
| 		if err != nil { | ||||
| 			return nil, errors.Wrap(err, "Stream did not exist and adding a stream failed") | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// setup the options | ||||
| 	subOpts := []nats.SubOpt{} | ||||
|  | ||||
| 	if options.CustomRetries { | ||||
| 		subOpts = append(subOpts, nats.MaxDeliver(options.GetRetryLimit())) | ||||
| 	} | ||||
|  | ||||
| 	if options.AutoAck { | ||||
| 		subOpts = append(subOpts, nats.AckAll()) | ||||
| 	} else { | ||||
| 		subOpts = append(subOpts, nats.AckExplicit()) | ||||
| 	} | ||||
|  | ||||
| 	if !options.Offset.IsZero() { | ||||
| 		subOpts = append(subOpts, nats.StartTime(options.Offset)) | ||||
| 	} else { | ||||
| 		subOpts = append(subOpts, nats.DeliverNew()) | ||||
| 	} | ||||
|  | ||||
| 	if options.AckWait > 0 { | ||||
| 		subOpts = append(subOpts, nats.AckWait(options.AckWait)) | ||||
| 	} | ||||
|  | ||||
| 	// connect the subscriber via a queue group only if durable streams are enabled | ||||
| 	if !s.opts.DisableDurableStreams { | ||||
| 		subOpts = append(subOpts, nats.Durable(options.Group)) | ||||
| 		_, err = s.natsJetStreamCtx.QueueSubscribe(topic, options.Group, handleMsg, subOpts...) | ||||
| 	} else { | ||||
| 		subOpts = append(subOpts, nats.ConsumerName(options.Group)) | ||||
| 		_, err = s.natsJetStreamCtx.Subscribe(topic, handleMsg, subOpts...) | ||||
| 	} | ||||
|  | ||||
| 	if err != nil { | ||||
| 		return nil, errors.Wrap(err, "Error subscribing to topic") | ||||
| 	} | ||||
|  | ||||
| 	return channel, nil | ||||
| } | ||||
							
								
								
									
										112
									
								
								events/natsjs/nats_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										112
									
								
								events/natsjs/nats_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,112 @@ | ||||
| package natsjs_test | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"encoding/json" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"go-micro.dev/v5/events/natsjs" | ||||
| 	nserver "github.com/nats-io/nats-server/v2/server" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| 	"github.com/test-go/testify/require" | ||||
| 	"go-micro.dev/v5/events" | ||||
| ) | ||||
|  | ||||
| type Payload struct { | ||||
| 	ID   string `json:"id"` | ||||
| 	Name string `json:"name"` | ||||
| } | ||||
|  | ||||
| func TestSingleEvent(t *testing.T) { | ||||
| 	ctx, cancel := context.WithCancel(context.TODO()) | ||||
| 	defer cancel() | ||||
|  | ||||
| 	// variables | ||||
| 	demoPayload := Payload{ | ||||
| 		ID:   "123", | ||||
| 		Name: "Hello World", | ||||
| 	} | ||||
| 	topic := "foobar" | ||||
|  | ||||
| 	clusterName := "test-cluster" | ||||
|  | ||||
| 	natsAddr := getFreeLocalhostAddress() | ||||
| 	natsPort, _ := strconv.Atoi(strings.Split(natsAddr, ":")[1]) | ||||
|  | ||||
| 	// start the NATS with JetStream server | ||||
| 	go natsServer(ctx, | ||||
| 		t, | ||||
| 		&nserver.Options{ | ||||
| 			Host: strings.Split(natsAddr, ":")[0], | ||||
| 			Port: natsPort, | ||||
| 			Cluster: nserver.ClusterOpts{ | ||||
| 				Name: clusterName, | ||||
| 			}, | ||||
| 		}, | ||||
| 	) | ||||
|  | ||||
| 	time.Sleep(1 * time.Second) | ||||
|  | ||||
| 	// consumer | ||||
| 	consumerClient, err := natsjs.NewStream( | ||||
| 		natsjs.Address(natsAddr), | ||||
| 		natsjs.ClusterID(clusterName), | ||||
| 	) | ||||
| 	require.NoError(t, err) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	consumer := func(_ context.Context, t *testing.T, client events.Stream, cancel context.CancelFunc) { | ||||
| 		t.Helper() | ||||
| 		defer cancel() | ||||
|  | ||||
| 		foobarEvents, err := client.Consume(topic) | ||||
| 		require.Nil(t, err) | ||||
| 		if err != nil { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		// wait for the event | ||||
| 		event := <-foobarEvents | ||||
|  | ||||
| 		p := Payload{} | ||||
| 		err = json.Unmarshal(event.Payload, &p) | ||||
|  | ||||
| 		require.NoError(t, err) | ||||
| 		if err != nil { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		assert.Equal(t, demoPayload.ID, p.ID) | ||||
| 		assert.Equal(t, demoPayload.Name, p.Name) | ||||
| 	} | ||||
|  | ||||
| 	go consumer(ctx, t, consumerClient, cancel) | ||||
|  | ||||
| 	// publisher | ||||
| 	time.Sleep(1 * time.Second) | ||||
|  | ||||
| 	publisherClient, err := natsjs.NewStream( | ||||
| 		natsjs.Address(natsAddr), | ||||
| 		natsjs.ClusterID(clusterName), | ||||
| 	) | ||||
| 	require.NoError(t, err) | ||||
| 	if err != nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	publisher := func(_ context.Context, t *testing.T, client events.Stream) { | ||||
| 		t.Helper() | ||||
| 		err := client.Publish(topic, demoPayload) | ||||
| 		require.NoError(t, err) | ||||
| 	} | ||||
|  | ||||
| 	go publisher(ctx, t, publisherClient) | ||||
|  | ||||
| 	// wait until consumer received the event | ||||
| 	<-ctx.Done() | ||||
| } | ||||
							
								
								
									
										116
									
								
								events/natsjs/options.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										116
									
								
								events/natsjs/options.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,116 @@ | ||||
| package natsjs | ||||
|  | ||||
| import ( | ||||
| 	"crypto/tls" | ||||
| 	"time" | ||||
|  | ||||
| 	"go-micro.dev/v5/logger" | ||||
| ) | ||||
|  | ||||
| // Options which are used to configure the nats stream. | ||||
| type Options struct { | ||||
| 	ClusterID             string | ||||
| 	ClientID              string | ||||
| 	Address               string | ||||
| 	NkeyConfig            string | ||||
| 	TLSConfig             *tls.Config | ||||
| 	Logger                logger.Logger | ||||
| 	SyncPublish           bool | ||||
| 	Name                  string | ||||
| 	DisableDurableStreams bool | ||||
| 	Username              string | ||||
| 	Password              string | ||||
| 	RetentionPolicy       int | ||||
| 	MaxAge                time.Duration | ||||
| 	MaxMsgSize            int | ||||
| } | ||||
|  | ||||
| // Option is a function which configures options. | ||||
| type Option func(o *Options) | ||||
|  | ||||
| // ClusterID sets the cluster id for the nats connection. | ||||
| func ClusterID(id string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.ClusterID = id | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // ClientID sets the client id for the nats connection. | ||||
| func ClientID(id string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.ClientID = id | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Address of the nats cluster. | ||||
| func Address(addr string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Address = addr | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // TLSConfig to use when connecting to the cluster. | ||||
| func TLSConfig(t *tls.Config) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.TLSConfig = t | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // NkeyConfig string to use when connecting to the cluster. | ||||
| func NkeyConfig(nkey string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.NkeyConfig = nkey | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Logger sets the underlying logger. | ||||
| func Logger(log logger.Logger) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Logger = log | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // SynchronousPublish allows using a synchronous publishing instead of the default asynchronous. | ||||
| func SynchronousPublish(sync bool) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.SyncPublish = sync | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Name allows to add a name to the natsjs connection. | ||||
| func Name(name string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Name = name | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // DisableDurableStreams will disable durable streams. | ||||
| func DisableDurableStreams() Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.DisableDurableStreams = true | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Authenticate authenticates the connection with the given username and password. | ||||
| func Authenticate(username, password string) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.Username = username | ||||
| 		o.Password = password | ||||
| 	} | ||||
| } | ||||
| func RetentionPolicy(rp int) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.RetentionPolicy = rp | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func MaxMsgSize(size int) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.MaxMsgSize = size | ||||
| 	} | ||||
| } | ||||
| func MaxAge(age time.Duration) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.MaxAge = age | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										144
									
								
								events/options.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										144
									
								
								events/options.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,144 @@ | ||||
| package events | ||||
|  | ||||
| import "time" | ||||
|  | ||||
| type Options struct{} | ||||
|  | ||||
| type Option func(o *Options) | ||||
|  | ||||
| type StoreOptions struct { | ||||
| 	TTL    time.Duration | ||||
| 	Backup Backup | ||||
| } | ||||
|  | ||||
| type StoreOption func(o *StoreOptions) | ||||
|  | ||||
| // PublishOptions contains all the options which can be provided when publishing an event | ||||
| type PublishOptions struct { | ||||
| 	// Metadata contains any keys which can be used to query the data, for example a customer id | ||||
| 	Metadata map[string]string | ||||
| 	// Timestamp to set for the event, if the timestamp is a zero value, the current time will be used | ||||
| 	Timestamp time.Time | ||||
| } | ||||
|  | ||||
| // PublishOption sets attributes on PublishOptions | ||||
| type PublishOption func(o *PublishOptions) | ||||
|  | ||||
| // WithMetadata sets the Metadata field on PublishOptions | ||||
| func WithMetadata(md map[string]string) PublishOption { | ||||
| 	return func(o *PublishOptions) { | ||||
| 		o.Metadata = md | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithTimestamp sets the timestamp field on PublishOptions | ||||
| func WithTimestamp(t time.Time) PublishOption { | ||||
| 	return func(o *PublishOptions) { | ||||
| 		o.Timestamp = t | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // ConsumeOptions contains all the options which can be provided when subscribing to a topic | ||||
| type ConsumeOptions struct { | ||||
| 	// Group is the name of the consumer group, if two consumers have the same group the events | ||||
| 	// are distributed between them | ||||
| 	Group string | ||||
| 	// Offset is the time from which the messages should be consumed from. If not provided then | ||||
| 	// the messages will be consumed starting from the moment the Subscription starts. | ||||
| 	Offset time.Time | ||||
| 	// AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered. | ||||
| 	// If false specifies that each message need ts to be manually acknowledged by the subscriber. | ||||
| 	// If processing is successful the message should be ack'ed to remove the message from the stream. | ||||
| 	// If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will | ||||
| 	// remain on the stream to be processed again. | ||||
| 	AutoAck bool | ||||
| 	AckWait time.Duration | ||||
| 	// RetryLimit indicates number of times a message is retried | ||||
| 	RetryLimit int | ||||
| 	// CustomRetries indicates whether to use RetryLimit | ||||
| 	CustomRetries bool | ||||
| } | ||||
|  | ||||
| // ConsumeOption sets attributes on ConsumeOptions | ||||
| type ConsumeOption func(o *ConsumeOptions) | ||||
|  | ||||
| // WithGroup sets the consumer group to be part of when consuming events | ||||
| func WithGroup(q string) ConsumeOption { | ||||
| 	return func(o *ConsumeOptions) { | ||||
| 		o.Group = q | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithOffset sets the offset time at which to start consuming events | ||||
| func WithOffset(t time.Time) ConsumeOption { | ||||
| 	return func(o *ConsumeOptions) { | ||||
| 		o.Offset = t | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received | ||||
| // the message is requeued in case auto ack is turned off | ||||
| func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption { | ||||
| 	return func(o *ConsumeOptions) { | ||||
| 		o.AutoAck = ack | ||||
| 		o.AckWait = ackWait | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithRetryLimit sets the RetryLimit field on ConsumeOptions. | ||||
| // Set to -1 for infinite retries (default) | ||||
| func WithRetryLimit(retries int) ConsumeOption { | ||||
| 	return func(o *ConsumeOptions) { | ||||
| 		o.RetryLimit = retries | ||||
| 		o.CustomRetries = true | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s ConsumeOptions) GetRetryLimit() int { | ||||
| 	if !s.CustomRetries { | ||||
| 		return -1 | ||||
| 	} | ||||
| 	return s.RetryLimit | ||||
| } | ||||
|  | ||||
| // WriteOptions contains all the options which can be provided when writing an event to a store | ||||
| type WriteOptions struct { | ||||
| 	// TTL is the duration the event should be recorded for, a zero value TTL indicates the event should | ||||
| 	// be stored indefinately | ||||
| 	TTL time.Duration | ||||
| } | ||||
|  | ||||
| // WriteOption sets attributes on WriteOptions | ||||
| type WriteOption func(o *WriteOptions) | ||||
|  | ||||
| // WithTTL sets the TTL attribute on WriteOptions | ||||
| func WithTTL(d time.Duration) WriteOption { | ||||
| 	return func(o *WriteOptions) { | ||||
| 		o.TTL = d | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // ReadOptions contains all the options which can be provided when reading events from a store | ||||
| type ReadOptions struct { | ||||
| 	// Limit the number of results to return | ||||
| 	Limit uint | ||||
| 	// Offset the results by this number, useful for paginated queries | ||||
| 	Offset uint | ||||
| } | ||||
|  | ||||
| // ReadOption sets attributes on ReadOptions | ||||
| type ReadOption func(o *ReadOptions) | ||||
|  | ||||
| // ReadLimit sets the limit attribute on ReadOptions | ||||
| func ReadLimit(l uint) ReadOption { | ||||
| 	return func(o *ReadOptions) { | ||||
| 		o.Limit = 1 | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // ReadOffset sets the offset attribute on ReadOptions | ||||
| func ReadOffset(l uint) ReadOption { | ||||
| 	return func(o *ReadOptions) { | ||||
| 		o.Offset = 1 | ||||
| 	} | ||||
| } | ||||
							
								
								
									
										127
									
								
								events/store.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										127
									
								
								events/store.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,127 @@ | ||||
| package events | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/pkg/errors" | ||||
| 	"go-micro.dev/v5/logger" | ||||
| 	"go-micro.dev/v5/store" | ||||
| ) | ||||
|  | ||||
| const joinKey = "/" | ||||
|  | ||||
| // NewStore returns an initialized events store | ||||
| func NewStore(opts ...StoreOption) Store { | ||||
| 	// parse the options | ||||
| 	var options StoreOptions | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
| 	if options.TTL.Seconds() == 0 { | ||||
| 		options.TTL = time.Hour * 24 | ||||
| 	} | ||||
|  | ||||
| 	// return the store | ||||
| 	evs := &evStore{ | ||||
| 		opts:  options, | ||||
| 		store: store.NewMemoryStore(), | ||||
| 	} | ||||
| 	if options.Backup != nil { | ||||
| 		go evs.backupLoop() | ||||
| 	} | ||||
| 	return evs | ||||
| } | ||||
|  | ||||
| type evStore struct { | ||||
| 	opts  StoreOptions | ||||
| 	store store.Store | ||||
| } | ||||
|  | ||||
| // Read events for a topic | ||||
| func (s *evStore) Read(topic string, opts ...ReadOption) ([]*Event, error) { | ||||
| 	// validate the topic | ||||
| 	if len(topic) == 0 { | ||||
| 		return nil, ErrMissingTopic | ||||
| 	} | ||||
|  | ||||
| 	// parse the options | ||||
| 	options := ReadOptions{ | ||||
| 		Offset: 0, | ||||
| 		Limit:  250, | ||||
| 	} | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// execute the request | ||||
| 	recs, err := s.store.Read(topic+joinKey, | ||||
| 		store.ReadPrefix(), | ||||
| 		store.ReadLimit(options.Limit), | ||||
| 		store.ReadOffset(options.Offset), | ||||
| 	) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.Wrap(err, "Error reading from store") | ||||
| 	} | ||||
|  | ||||
| 	// unmarshal the result | ||||
| 	result := make([]*Event, len(recs)) | ||||
| 	for i, r := range recs { | ||||
| 		var e Event | ||||
| 		if err := json.Unmarshal(r.Value, &e); err != nil { | ||||
| 			return nil, errors.Wrap(err, "Invalid event returned from stroe") | ||||
| 		} | ||||
| 		result[i] = &e | ||||
| 	} | ||||
|  | ||||
| 	return result, nil | ||||
| } | ||||
|  | ||||
| // Write an event to the store | ||||
| func (s *evStore) Write(event *Event, opts ...WriteOption) error { | ||||
| 	// parse the options | ||||
| 	options := WriteOptions{ | ||||
| 		TTL: s.opts.TTL, | ||||
| 	} | ||||
| 	for _, o := range opts { | ||||
| 		o(&options) | ||||
| 	} | ||||
|  | ||||
| 	// construct the store record | ||||
| 	bytes, err := json.Marshal(event) | ||||
| 	if err != nil { | ||||
| 		return errors.Wrap(err, "Error mashaling event to JSON") | ||||
| 	} | ||||
| 	// suffix event ID with hour resolution for easy retrieval in batches | ||||
| 	timeSuffix := time.Now().Format("2006010215") | ||||
|  | ||||
| 	record := &store.Record{ | ||||
| 		// key is such that reading by prefix indexes by topic and reading by suffix indexes by time | ||||
| 		Key:    event.Topic + joinKey + event.ID + joinKey + timeSuffix, | ||||
| 		Value:  bytes, | ||||
| 		Expiry: options.TTL, | ||||
| 	} | ||||
|  | ||||
| 	// write the record to the store | ||||
| 	if err := s.store.Write(record); err != nil { | ||||
| 		return errors.Wrap(err, "Error writing to the store") | ||||
| 	} | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *evStore) backupLoop() { | ||||
| 	for { | ||||
| 		err := s.opts.Backup.Snapshot(s.store) | ||||
| 		if err != nil { | ||||
| 			logger.Errorf("Error running backup %s", err) | ||||
| 		} | ||||
|  | ||||
| 		time.Sleep(1 * time.Hour) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Backup is an interface for snapshotting the events store to long term storage | ||||
| type Backup interface { | ||||
| 	Snapshot(st store.Store) error | ||||
| } | ||||
							
								
								
									
										47
									
								
								events/store_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								events/store_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,47 @@ | ||||
| package events | ||||
|  | ||||
| import ( | ||||
| 	"testing" | ||||
|  | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
|  | ||||
| func TestStore(t *testing.T) { | ||||
| 	store := NewStore() | ||||
|  | ||||
| 	testData := []Event{ | ||||
| 		{ID: uuid.New().String(), Topic: "foo"}, | ||||
| 		{ID: uuid.New().String(), Topic: "foo"}, | ||||
| 		{ID: uuid.New().String(), Topic: "bar"}, | ||||
| 	} | ||||
|  | ||||
| 	// write the records to the store | ||||
| 	t.Run("Write", func(t *testing.T) { | ||||
| 		for _, event := range testData { | ||||
| 			err := store.Write(&event) | ||||
| 			assert.Nilf(t, err, "Writing an event should not return an error") | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	// should not be able to read events from a blank topic | ||||
| 	t.Run("ReadMissingTopic", func(t *testing.T) { | ||||
| 		evs, err := store.Read("") | ||||
| 		assert.Equal(t, err, ErrMissingTopic, "Reading a blank topic should return an error") | ||||
| 		assert.Nil(t, evs, "No events should be returned") | ||||
| 	}) | ||||
|  | ||||
| 	// should only get the events from the topic requested | ||||
| 	t.Run("ReadTopic", func(t *testing.T) { | ||||
| 		evs, err := store.Read("foo") | ||||
| 		assert.Nilf(t, err, "No error should be returned") | ||||
| 		assert.Len(t, evs, 2, "Only the events for this topic should be returned") | ||||
| 	}) | ||||
|  | ||||
| 	// limits should be honoured | ||||
| 	t.Run("ReadTopicLimit", func(t *testing.T) { | ||||
| 		evs, err := store.Read("foo", ReadLimit(1)) | ||||
| 		assert.Nilf(t, err, "No error should be returned") | ||||
| 		assert.Len(t, evs, 1, "The result should include no more than the read limit") | ||||
| 	}) | ||||
| } | ||||
							
								
								
									
										241
									
								
								events/stream_test.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										241
									
								
								events/stream_test.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,241 @@ | ||||
| package events | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/google/uuid" | ||||
| 	"github.com/stretchr/testify/assert" | ||||
| ) | ||||
|  | ||||
| type testPayload struct { | ||||
| 	Message string | ||||
| } | ||||
|  | ||||
| type testCase struct { | ||||
| 	str  Stream | ||||
| 	name string | ||||
| } | ||||
|  | ||||
| func TestStream(t *testing.T) { | ||||
| 	tcs := []testCase{} | ||||
|  | ||||
| 	stream, err := NewStream() | ||||
| 	assert.Nilf(t, err, "NewStream should not return an error") | ||||
| 	assert.NotNilf(t, stream, "NewStream should return a stream object") | ||||
| 	tcs = append(tcs, testCase{str: stream, name: "memory"}) | ||||
|  | ||||
| 	for _, tc := range tcs { | ||||
| 		t.Run(tc.name, func(t *testing.T) { | ||||
| 			runTestStream(t, tc.str) | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| func runTestStream(t *testing.T, stream Stream) { | ||||
| 	// TestMissingTopic will test the topic validation on publish | ||||
| 	t.Run("TestMissingTopic", func(t *testing.T) { | ||||
| 		err := stream.Publish("", nil) | ||||
| 		assert.Equalf(t, err, ErrMissingTopic, "Publishing to a blank topic should return an error") | ||||
| 	}) | ||||
|  | ||||
| 	// TestConsumeTopic will publish a message to the test topic. The subscriber will subscribe to the | ||||
| 	// same test topic. | ||||
| 	t.Run("TestConsumeTopic", func(t *testing.T) { | ||||
| 		payload := &testPayload{Message: "HelloWorld"} | ||||
| 		metadata := map[string]string{"foo": "bar"} | ||||
|  | ||||
| 		// create the subscriber | ||||
| 		evChan, err := stream.Consume("test") | ||||
| 		assert.Nilf(t, err, "Consume should not return an error") | ||||
|  | ||||
| 		// setup the subscriber async | ||||
| 		var wg sync.WaitGroup | ||||
|  | ||||
| 		go func() { | ||||
| 			timeout := time.NewTimer(time.Millisecond * 250) | ||||
|  | ||||
| 			select { | ||||
| 			case event, _ := <-evChan: | ||||
| 				assert.NotNilf(t, event, "The message was nil") | ||||
| 				assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") | ||||
|  | ||||
| 				var result testPayload | ||||
| 				err := event.Unmarshal(&result) | ||||
| 				assert.Nil(t, err, "Error decoding result") | ||||
| 				assert.Equal(t, result, *payload, "Payload didn't match") | ||||
|  | ||||
| 				wg.Done() | ||||
| 			case <-timeout.C: | ||||
| 				t.Fatalf("Event was not recieved") | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		err = stream.Publish("test", payload, WithMetadata(metadata)) | ||||
| 		assert.Nil(t, err, "Publishing a valid message should not return an error") | ||||
| 		wg.Add(1) | ||||
|  | ||||
| 		// wait for the subscriber to recieve the message or timeout | ||||
| 		wg.Wait() | ||||
| 	}) | ||||
|  | ||||
| 	// TestConsumeGroup will publish a message to a random topic. Two subscribers will then consume | ||||
| 	// the message from the firehose topic with different queues. The second subscriber will be registered | ||||
| 	// after the message is published to test durability. | ||||
| 	t.Run("TestConsumeGroup", func(t *testing.T) { | ||||
| 		topic := uuid.New().String() | ||||
| 		payload := &testPayload{Message: "HelloWorld"} | ||||
| 		metadata := map[string]string{"foo": "bar"} | ||||
|  | ||||
| 		// create the first subscriber | ||||
| 		evChan1, err := stream.Consume(topic) | ||||
| 		assert.Nilf(t, err, "Consume should not return an error") | ||||
|  | ||||
| 		// setup the subscriber async | ||||
| 		var wg sync.WaitGroup | ||||
|  | ||||
| 		go func() { | ||||
| 			timeout := time.NewTimer(time.Millisecond * 250) | ||||
|  | ||||
| 			select { | ||||
| 			case event, _ := <-evChan1: | ||||
| 				assert.NotNilf(t, event, "The message was nil") | ||||
| 				assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") | ||||
|  | ||||
| 				var result testPayload | ||||
| 				err := event.Unmarshal(&result) | ||||
| 				assert.Nil(t, err, "Error decoding result") | ||||
| 				assert.Equal(t, result, *payload, "Payload didn't match") | ||||
|  | ||||
| 				wg.Done() | ||||
| 			case <-timeout.C: | ||||
| 				t.Fatalf("Event was not recieved") | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		err = stream.Publish(topic, payload, WithMetadata(metadata)) | ||||
| 		assert.Nil(t, err, "Publishing a valid message should not return an error") | ||||
| 		wg.Add(2) | ||||
|  | ||||
| 		// create the second subscriber | ||||
| 		evChan2, err := stream.Consume(topic, | ||||
| 			WithGroup("second_queue"), | ||||
| 			WithOffset(time.Now().Add(time.Minute*-1)), | ||||
| 		) | ||||
| 		assert.Nilf(t, err, "Consume should not return an error") | ||||
|  | ||||
| 		go func() { | ||||
| 			timeout := time.NewTimer(time.Second * 1) | ||||
|  | ||||
| 			select { | ||||
| 			case event, _ := <-evChan2: | ||||
| 				assert.NotNilf(t, event, "The message was nil") | ||||
| 				assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") | ||||
|  | ||||
| 				var result testPayload | ||||
| 				err := event.Unmarshal(&result) | ||||
| 				assert.Nil(t, err, "Error decoding result") | ||||
| 				assert.Equal(t, result, *payload, "Payload didn't match") | ||||
|  | ||||
| 				wg.Done() | ||||
| 			case <-timeout.C: | ||||
| 				t.Fatalf("Event was not recieved") | ||||
| 			} | ||||
| 		}() | ||||
|  | ||||
| 		// wait for the subscriber to recieve the message or timeout | ||||
| 		wg.Wait() | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("AckingNacking", func(t *testing.T) { | ||||
| 		ch, err := stream.Consume("foobarAck", WithAutoAck(false, 5*time.Second)) | ||||
| 		assert.NoError(t, err, "Unexpected error subscribing") | ||||
| 		assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 1"})) | ||||
| 		assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 2"})) | ||||
|  | ||||
| 		ev := <-ch | ||||
| 		ev.Ack() | ||||
| 		ev = <-ch | ||||
| 		nacked := ev.ID | ||||
| 		ev.Nack() | ||||
| 		select { | ||||
| 		case ev = <-ch: | ||||
| 			assert.Equal(t, ev.ID, nacked, "Nacked message should have been received again") | ||||
| 			assert.NoError(t, ev.Ack()) | ||||
| 		case <-time.After(7 * time.Second): | ||||
| 			t.Fatalf("Timed out waiting for message to be put back on queue") | ||||
| 		} | ||||
|  | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("Retries", func(t *testing.T) { | ||||
| 		ch, err := stream.Consume("foobarRetries", WithAutoAck(false, 5*time.Second), WithRetryLimit(1)) | ||||
| 		assert.NoError(t, err, "Unexpected error subscribing") | ||||
| 		assert.NoError(t, stream.Publish("foobarRetries", map[string]string{"foo": "message 1"})) | ||||
|  | ||||
| 		ev := <-ch | ||||
| 		id := ev.ID | ||||
| 		ev.Nack() | ||||
| 		ev = <-ch | ||||
| 		assert.Equal(t, id, ev.ID, "Nacked message should have been received again") | ||||
| 		ev.Nack() | ||||
| 		select { | ||||
| 		case ev = <-ch: | ||||
| 			t.Fatalf("Unexpected event received") | ||||
| 		case <-time.After(7 * time.Second): | ||||
| 		} | ||||
|  | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("InfiniteRetries", func(t *testing.T) { | ||||
| 		ch, err := stream.Consume("foobarRetriesInf", WithAutoAck(false, 2*time.Second)) | ||||
| 		assert.NoError(t, err, "Unexpected error subscribing") | ||||
| 		assert.NoError(t, stream.Publish("foobarRetriesInf", map[string]string{"foo": "message 1"})) | ||||
|  | ||||
| 		count := 0 | ||||
| 		id := "" | ||||
| 		for { | ||||
| 			select { | ||||
| 			case ev := <-ch: | ||||
| 				if id != "" { | ||||
| 					assert.Equal(t, id, ev.ID, "Nacked message should have been received again") | ||||
| 				} | ||||
| 				id = ev.ID | ||||
| 			case <-time.After(3 * time.Second): | ||||
| 				t.Fatalf("Unexpected event received") | ||||
| 			} | ||||
|  | ||||
| 			count++ | ||||
| 			if count == 11 { | ||||
| 				break | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 	}) | ||||
|  | ||||
| 	t.Run("twoSubs", func(t *testing.T) { | ||||
| 		ch1, err := stream.Consume("foobarTwoSubs1", WithAutoAck(false, 5*time.Second)) | ||||
| 		assert.NoError(t, err, "Unexpected error subscribing to topic 1") | ||||
| 		ch2, err := stream.Consume("foobarTwoSubs2", WithAutoAck(false, 5*time.Second)) | ||||
| 		assert.NoError(t, err, "Unexpected error subscribing to topic 2") | ||||
|  | ||||
| 		assert.NoError(t, stream.Publish("foobarTwoSubs2", map[string]string{"foo": "message 1"})) | ||||
| 		assert.NoError(t, stream.Publish("foobarTwoSubs1", map[string]string{"foo": "message 1"})) | ||||
|  | ||||
| 		wg := sync.WaitGroup{} | ||||
| 		wg.Add(2) | ||||
| 		go func() { | ||||
| 			ev := <-ch1 | ||||
| 			assert.Equal(t, "foobarTwoSubs1", ev.Topic, "Received message from unexpected topic") | ||||
| 			wg.Done() | ||||
| 		}() | ||||
| 		go func() { | ||||
| 			ev := <-ch2 | ||||
| 			assert.Equal(t, "foobarTwoSubs2", ev.Topic, "Received message from unexpected topic") | ||||
| 			wg.Done() | ||||
| 		}() | ||||
| 		wg.Wait() | ||||
| 	}) | ||||
| } | ||||
							
								
								
									
										12
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								go.mod
									
									
									
									
									
								
							| @@ -30,7 +30,7 @@ require ( | ||||
| 	github.com/streadway/amqp v1.1.0 | ||||
| 	github.com/stretchr/testify v1.10.0 | ||||
| 	github.com/test-go/testify v1.1.4 | ||||
| 	github.com/urfave/cli/v2 v2.25.7 | ||||
| 	github.com/urfave/cli/v2 v2.27.6 | ||||
| 	go.etcd.io/bbolt v1.4.0 | ||||
| 	go.etcd.io/etcd/api/v3 v3.5.21 | ||||
| 	go.etcd.io/etcd/client/v3 v3.5.21 | ||||
| @@ -40,8 +40,8 @@ require ( | ||||
| 	golang.org/x/crypto v0.37.0 | ||||
| 	golang.org/x/net v0.38.0 | ||||
| 	golang.org/x/sync v0.13.0 | ||||
| 	google.golang.org/grpc v1.72.1 | ||||
| 	google.golang.org/grpc/examples v0.0.0-20250514161145-5c0d55244474 | ||||
| 	google.golang.org/grpc v1.71.1 | ||||
| 	google.golang.org/grpc/examples v0.0.0-20250515150734-f2d3e11f3057 | ||||
| 	google.golang.org/protobuf v1.36.6 | ||||
| ) | ||||
|  | ||||
| @@ -52,7 +52,7 @@ require ( | ||||
| 	github.com/cespare/xxhash/v2 v2.3.0 // indirect | ||||
| 	github.com/coreos/go-semver v0.3.0 // indirect | ||||
| 	github.com/coreos/go-systemd/v22 v22.3.2 // indirect | ||||
| 	github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect | ||||
| 	github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect | ||||
| 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect | ||||
| 	github.com/fatih/color v1.16.0 // indirect | ||||
| 	github.com/go-logr/logr v1.4.2 // indirect | ||||
| @@ -65,7 +65,7 @@ require ( | ||||
| 	github.com/hashicorp/go-immutable-radix v1.3.1 // indirect | ||||
| 	github.com/hashicorp/go-multierror v1.1.1 // indirect | ||||
| 	github.com/hashicorp/go-rootcerts v1.0.2 // indirect | ||||
| 	github.com/hashicorp/golang-lru v0.5.4 // indirect | ||||
| 	github.com/hashicorp/golang-lru v1.0.2 // indirect | ||||
| 	github.com/hashicorp/serf v0.10.1 // indirect | ||||
| 	github.com/jackc/chunkreader/v2 v2.0.1 // indirect | ||||
| 	github.com/jackc/pgconn v1.14.3 // indirect | ||||
| @@ -88,7 +88,7 @@ require ( | ||||
| 	github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect | ||||
| 	github.com/rogpeppe/go-internal v1.13.1 // indirect | ||||
| 	github.com/russross/blackfriday/v2 v2.1.0 // indirect | ||||
| 	github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect | ||||
| 	github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect | ||||
| 	go.etcd.io/etcd/client/pkg/v3 v3.5.21 // indirect | ||||
| 	go.opentelemetry.io/auto/sdk v1.1.0 // indirect | ||||
| 	go.opentelemetry.io/otel/metric v1.35.0 // indirect | ||||
|   | ||||
							
								
								
									
										24
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										24
									
								
								go.sum
									
									
									
									
									
								
							| @@ -38,8 +38,8 @@ github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzA | ||||
| github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= | ||||
| github.com/cornelk/hashmap v1.0.8 h1:nv0AWgw02n+iDcawr5It4CjQIAcdMMKRrs10HOJYlrc= | ||||
| github.com/cornelk/hashmap v1.0.8/go.mod h1:RfZb7JO3RviW/rT6emczVuC/oxpdz4UsSB2LJSclR1k= | ||||
| github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= | ||||
| github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= | ||||
| github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= | ||||
| github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= | ||||
| github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= | ||||
| github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= | ||||
| github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||||
| @@ -133,8 +133,8 @@ github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b | ||||
| github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI= | ||||
| github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= | ||||
| github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= | ||||
| github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= | ||||
| github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= | ||||
| github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= | ||||
| github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= | ||||
| github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= | ||||
| github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= | ||||
| github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR/prTM= | ||||
| @@ -337,10 +337,10 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf | ||||
| github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= | ||||
| github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= | ||||
| github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= | ||||
| github.com/urfave/cli/v2 v2.25.7 h1:VAzn5oq403l5pHjc4OhD54+XGO9cdKVL/7lDjF+iKUs= | ||||
| github.com/urfave/cli/v2 v2.25.7/go.mod h1:8qnjx1vcq5s2/wpsqoZFndg2CE5tNFyrTvS6SinrnYQ= | ||||
| github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 h1:bAn7/zixMGCfxrRTfdpNzjtPYqr8smhKouy9mxVdGPU= | ||||
| github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673/go.mod h1:N3UwUGtsrSj3ccvlPHLoLsHnpR27oXr4ZE984MbSER8= | ||||
| github.com/urfave/cli/v2 v2.27.6 h1:VdRdS98FNhKZ8/Az8B7MTyGQmpIr36O1EHybx/LaZ4g= | ||||
| github.com/urfave/cli/v2 v2.27.6/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= | ||||
| github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= | ||||
| github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= | ||||
| github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | ||||
| github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= | ||||
| github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= | ||||
| @@ -496,10 +496,10 @@ google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 h1: | ||||
| google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463/go.mod h1:U90ffi8eUL9MwPcrJylN5+Mk2v3vuPDptd5yyNUiRR8= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 h1:e0AIkUUhxyBKh6ssZNrAMeqhA7RKUj42346d1y02i2g= | ||||
| google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= | ||||
| google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA= | ||||
| google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= | ||||
| google.golang.org/grpc/examples v0.0.0-20250514161145-5c0d55244474 h1:7B8e8jJRSI3lfKIzwRDNU6f0yzNFhiPXFGbOuNETfIw= | ||||
| google.golang.org/grpc/examples v0.0.0-20250514161145-5c0d55244474/go.mod h1:WPWnet+nYurNGpV0rVYHI1YuOJwVHeM3t8f76m410XM= | ||||
| google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI= | ||||
| google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= | ||||
| google.golang.org/grpc/examples v0.0.0-20250515150734-f2d3e11f3057 h1:lPv+iqlAyiKMjbL3ivJlAASixPknLv806R6zaoE4PUM= | ||||
| google.golang.org/grpc/examples v0.0.0-20250515150734-f2d3e11f3057/go.mod h1:WPWnet+nYurNGpV0rVYHI1YuOJwVHeM3t8f76m410XM= | ||||
| google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= | ||||
| google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= | ||||
| gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= | ||||
|   | ||||
| @@ -8,6 +8,8 @@ import ( | ||||
| 	natslib "github.com/nats-io/nats.go" | ||||
| 	"go-micro.dev/v5/broker" | ||||
| 	"go-micro.dev/v5/broker/nats" | ||||
| 	"go-micro.dev/v5/events" | ||||
| 	nevents "go-micro.dev/v5/events/natsjs" | ||||
| 	"go-micro.dev/v5/registry" | ||||
| 	nreg "go-micro.dev/v5/registry/nats" | ||||
| 	"go-micro.dev/v5/store" | ||||
| @@ -22,34 +24,46 @@ type Profile struct { | ||||
| 	Broker    broker.Broker | ||||
| 	Store     store.Store | ||||
| 	Transport transport.Transport | ||||
| 	Stream    events.Stream | ||||
| } | ||||
|  | ||||
| // LocalProfile returns a profile with local mDNS as the registry, HTTP as the broker, file as the store, and HTTP as the transport | ||||
| // It is used for local development and testing | ||||
| func LocalProfile() Profile { | ||||
| func LocalProfile() (Profile, error) { | ||||
| 	stream, err := events.NewStream() | ||||
| 	return Profile{ | ||||
| 		Registry:  registry.NewMDNSRegistry(), | ||||
| 		Broker:    broker.NewHttpBroker(), | ||||
| 		Store:     store.NewFileStore(), | ||||
| 		Transport: transport.NewHTTPTransport(), | ||||
| 	} | ||||
| 		Stream:    stream, | ||||
| 	}, err | ||||
| } | ||||
|  | ||||
| // NatsProfile returns a profile with NATS as the registry, broker, store, and transport | ||||
| // It uses the environment variable MICR_NATS_ADDRESS to set the NATS server address | ||||
| // If the variable is not set, it defaults to nats://0.0.0.0:4222 which will connect to a local NATS server | ||||
| func NatsProfile() Profile { | ||||
| func NatsProfile() (Profile, error) { | ||||
| 	addr := os.Getenv("MICRO_NATS_ADDRESS") | ||||
| 	if addr == "" { | ||||
| 		addr = "nats://0.0.0.0:4222" | ||||
| 	} | ||||
| 	// Split the address by comma, trim whitespace, and convert to a slice of strings | ||||
| 	addrs := splitNatsAdressList(addr) | ||||
|  | ||||
| 	reg := nreg.NewNatsRegistry(registry.Addrs(addrs...)) | ||||
| 	brok := nats.NewNatsBroker(broker.Addrs(addrs...)) | ||||
|  | ||||
| 	nopts := natslib.GetDefaultOptions() | ||||
| 	nopts.Servers = addrs | ||||
| 	brok := nats.NewNatsBroker(broker.Addrs(addrs...), nats.Options(nopts)) | ||||
|  | ||||
| 	st := nstore.NewStore(nstore.NatsOptions(natslib.Options{Servers: addrs})) | ||||
| 	tx := ntx.NewTransport(ntx.Options(natslib.Options{Servers: addrs})) | ||||
|  | ||||
| 	stream, err := nevents.NewStream( | ||||
| 		nevents.Address(addr), | ||||
| 	) | ||||
|  | ||||
| 	registry.DefaultRegistry = reg | ||||
| 	broker.DefaultBroker = brok | ||||
| 	store.DefaultStore = st | ||||
| @@ -59,7 +73,8 @@ func NatsProfile() Profile { | ||||
| 		Broker:    brok, | ||||
| 		Store:     st, | ||||
| 		Transport: tx, | ||||
| 	} | ||||
| 		Stream:    stream, | ||||
| 	}, err | ||||
| } | ||||
|  | ||||
| func splitNatsAdressList(addr string) []string { | ||||
|   | ||||
| @@ -96,13 +96,8 @@ func (m *fileStore) init(opts ...Option) error { | ||||
| 	if m.dir == "" { | ||||
| 		m.dir = DefaultDir | ||||
| 	} | ||||
|  | ||||
| 	// Ignoring this as the folder might exist. | ||||
| 	// Reads/Writes updates will return with sensible error messages | ||||
| 	// about the dir not existing in case this cannot create the path anyway | ||||
| 	os.MkdirAll(m.dir, 0700) | ||||
|  | ||||
| 	return nil | ||||
| 	// create the directory | ||||
| 	return os.MkdirAll(m.dir, 0700) | ||||
| } | ||||
|  | ||||
| func (f *fileStore) getDB(database, table string) (*fileHandle, error) { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user