1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-05 10:20:53 +02:00
go-micro/events/store.go
Asim Aslam 0c2041e439
add events package (#2341)
* add events package

* update go version
2021-11-08 08:52:39 +00:00

128 lines
2.6 KiB
Go

package events
import (
"encoding/json"
"time"
"github.com/pkg/errors"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
)
const joinKey = "/"
// NewStore returns an initialized events store
func NewStore(opts ...StoreOption) Store {
// parse the options
var options StoreOptions
for _, o := range opts {
o(&options)
}
if options.TTL.Seconds() == 0 {
options.TTL = time.Hour * 24
}
// return the store
evs := &evStore{
opts: options,
store: store.NewMemoryStore(),
}
if options.Backup != nil {
go evs.backupLoop()
}
return evs
}
type evStore struct {
opts StoreOptions
store store.Store
}
// Read events for a topic
func (s *evStore) Read(topic string, opts ...ReadOption) ([]*Event, error) {
// validate the topic
if len(topic) == 0 {
return nil, ErrMissingTopic
}
// parse the options
options := ReadOptions{
Offset: 0,
Limit: 250,
}
for _, o := range opts {
o(&options)
}
// execute the request
recs, err := s.store.Read(topic+joinKey,
store.ReadPrefix(),
store.ReadLimit(options.Limit),
store.ReadOffset(options.Offset),
)
if err != nil {
return nil, errors.Wrap(err, "Error reading from store")
}
// unmarshal the result
result := make([]*Event, len(recs))
for i, r := range recs {
var e Event
if err := json.Unmarshal(r.Value, &e); err != nil {
return nil, errors.Wrap(err, "Invalid event returned from stroe")
}
result[i] = &e
}
return result, nil
}
// Write an event to the store
func (s *evStore) Write(event *Event, opts ...WriteOption) error {
// parse the options
options := WriteOptions{
TTL: s.opts.TTL,
}
for _, o := range opts {
o(&options)
}
// construct the store record
bytes, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "Error mashaling event to JSON")
}
// suffix event ID with hour resolution for easy retrieval in batches
timeSuffix := time.Now().Format("2006010215")
record := &store.Record{
// key is such that reading by prefix indexes by topic and reading by suffix indexes by time
Key: event.Topic + joinKey + event.ID + joinKey + timeSuffix,
Value: bytes,
Expiry: options.TTL,
}
// write the record to the store
if err := s.store.Write(record); err != nil {
return errors.Wrap(err, "Error writing to the store")
}
return nil
}
func (s *evStore) backupLoop() {
for {
err := s.opts.Backup.Snapshot(s.store)
if err != nil {
logger.Errorf("Error running backup %s", err)
}
time.Sleep(1 * time.Hour)
}
}
// Backup is an interface for snapshotting the events store to long term storage
type Backup interface {
Snapshot(st store.Store) error
}