1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-18 08:26:38 +02:00

add events package (#2341)

* add events package

* update go version
This commit is contained in:
Asim Aslam 2021-11-08 08:52:39 +00:00 committed by GitHub
parent c5be9f560c
commit 0c2041e439
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 982 additions and 72 deletions

View File

@ -8,10 +8,10 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Set up Go 1.16 - name: Set up Go 1.17
uses: actions/setup-go@v1 uses: actions/setup-go@v1
with: with:
go-version: 1.16 go-version: 1.17
id: go id: go
- name: Check out code into the Go module directory - name: Check out code into the Go module directory

View File

@ -8,10 +8,10 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Set up Go 1.16 - name: Set up Go 1.17
uses: actions/setup-go@v1 uses: actions/setup-go@v1
with: with:
go-version: 1.16 go-version: 1.17
id: go id: go
- name: Check out code into the Go module directory - name: Check out code into the Go module directory

View File

@ -7,10 +7,10 @@ import (
"net/http" "net/http"
"strings" "strings"
"github.com/oxtoacart/bpool"
api "go-micro.dev/v4/api/proto" api "go-micro.dev/v4/api/proto"
"go-micro.dev/v4/registry" "go-micro.dev/v4/registry"
"go-micro.dev/v4/selector" "go-micro.dev/v4/selector"
"github.com/oxtoacart/bpool"
) )
var ( var (

View File

@ -10,11 +10,11 @@ import (
"strings" "strings"
"time" "time"
"github.com/google/uuid"
"github.com/oxtoacart/bpool"
"go-micro.dev/v4/api/handler" "go-micro.dev/v4/api/handler"
proto "go-micro.dev/v4/api/proto" proto "go-micro.dev/v4/api/proto"
"go-micro.dev/v4/util/ctx" "go-micro.dev/v4/util/ctx"
"github.com/google/uuid"
"github.com/oxtoacart/bpool"
) )
var ( var (

View File

@ -9,6 +9,8 @@ import (
"strconv" "strconv"
"strings" "strings"
jsonpatch "github.com/evanphx/json-patch/v5"
"github.com/oxtoacart/bpool"
"go-micro.dev/v4/api" "go-micro.dev/v4/api"
"go-micro.dev/v4/api/handler" "go-micro.dev/v4/api/handler"
"go-micro.dev/v4/api/internal/proto" "go-micro.dev/v4/api/internal/proto"
@ -23,8 +25,6 @@ import (
"go-micro.dev/v4/selector" "go-micro.dev/v4/selector"
"go-micro.dev/v4/util/ctx" "go-micro.dev/v4/util/ctx"
"go-micro.dev/v4/util/qson" "go-micro.dev/v4/util/qson"
jsonpatch "github.com/evanphx/json-patch/v5"
"github.com/oxtoacart/bpool"
) )
const ( const (

View File

@ -7,8 +7,8 @@ import (
"reflect" "reflect"
"testing" "testing"
go_api "go-micro.dev/v4/api/proto"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
go_api "go-micro.dev/v4/api/proto"
) )
func TestRequestPayloadFromRequest(t *testing.T) { func TestRequestPayloadFromRequest(t *testing.T) {

View File

@ -9,14 +9,14 @@ import (
"strings" "strings"
"time" "time"
"github.com/gobwas/httphead"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"go-micro.dev/v4/api" "go-micro.dev/v4/api"
"go-micro.dev/v4/client" "go-micro.dev/v4/client"
raw "go-micro.dev/v4/codec/bytes" raw "go-micro.dev/v4/codec/bytes"
"go-micro.dev/v4/logger" "go-micro.dev/v4/logger"
"go-micro.dev/v4/selector" "go-micro.dev/v4/selector"
"github.com/gobwas/httphead"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
) )
// serveWebsocket will stream rpc back over websockets assuming json // serveWebsocket will stream rpc back over websockets assuming json

View File

@ -3,8 +3,8 @@ package registry
import ( import (
"testing" "testing"
"go-micro.dev/v4/registry"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go-micro.dev/v4/registry"
) )
func TestStoreRegex(t *testing.T) { func TestStoreRegex(t *testing.T) {

View File

@ -8,10 +8,10 @@ import (
"os" "os"
"sync" "sync"
"github.com/gorilla/handlers"
"go-micro.dev/v4/api/server" "go-micro.dev/v4/api/server"
"go-micro.dev/v4/api/server/cors" "go-micro.dev/v4/api/server/cors"
"go-micro.dev/v4/logger" "go-micro.dev/v4/logger"
"github.com/gorilla/handlers"
) )
type httpServer struct { type httpServer struct {

View File

@ -16,6 +16,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/google/uuid"
"go-micro.dev/v4/codec/json" "go-micro.dev/v4/codec/json"
merr "go-micro.dev/v4/errors" merr "go-micro.dev/v4/errors"
"go-micro.dev/v4/registry" "go-micro.dev/v4/registry"
@ -23,7 +24,6 @@ import (
maddr "go-micro.dev/v4/util/addr" maddr "go-micro.dev/v4/util/addr"
mnet "go-micro.dev/v4/util/net" mnet "go-micro.dev/v4/util/net"
mls "go-micro.dev/v4/util/tls" mls "go-micro.dev/v4/util/tls"
"github.com/google/uuid"
"golang.org/x/net/http2" "golang.org/x/net/http2"
) )

View File

@ -5,9 +5,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/uuid"
"go-micro.dev/v4/broker" "go-micro.dev/v4/broker"
"go-micro.dev/v4/registry" "go-micro.dev/v4/registry"
"github.com/google/uuid"
) )
var ( var (

View File

@ -7,8 +7,8 @@ import (
"hash/fnv" "hash/fnv"
"time" "time"
"go-micro.dev/v4/metadata"
cache "github.com/patrickmn/go-cache" cache "github.com/patrickmn/go-cache"
"go-micro.dev/v4/metadata"
) )
// NewCache returns an initialised cache. // NewCache returns an initialised cache.

View File

@ -7,6 +7,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/urfave/cli/v2"
"go-micro.dev/v4/auth" "go-micro.dev/v4/auth"
"go-micro.dev/v4/broker" "go-micro.dev/v4/broker"
"go-micro.dev/v4/cache" "go-micro.dev/v4/cache"
@ -23,7 +24,6 @@ import (
"go-micro.dev/v4/server" "go-micro.dev/v4/server"
"go-micro.dev/v4/store" "go-micro.dev/v4/store"
"go-micro.dev/v4/transport" "go-micro.dev/v4/transport"
"github.com/urfave/cli/v2"
) )
type Cmd interface { type Cmd interface {

View File

@ -8,8 +8,8 @@ import (
"io" "io"
"strings" "strings"
"go-micro.dev/v4/codec"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"go-micro.dev/v4/codec"
) )
type Codec struct { type Codec struct {

View File

@ -5,9 +5,9 @@ import (
"encoding/json" "encoding/json"
"io" "io"
"go-micro.dev/v4/codec"
"github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"go-micro.dev/v4/codec"
) )
type Codec struct { type Codec struct {

View File

@ -3,9 +3,9 @@ package proto
import ( import (
"bytes" "bytes"
"go-micro.dev/v4/codec"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/oxtoacart/bpool" "github.com/oxtoacart/bpool"
"go-micro.dev/v4/codec"
) )
// create buffer pool with 16 instances each preallocated with 256 bytes // create buffer pool with 16 instances each preallocated with 256 bytes

View File

@ -4,8 +4,8 @@ package proto
import ( import (
"io" "io"
"go-micro.dev/v4/codec"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"go-micro.dev/v4/codec"
) )
type Codec struct { type Codec struct {

View File

@ -8,8 +8,8 @@ import (
"strconv" "strconv"
"sync" "sync"
"go-micro.dev/v4/codec"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"go-micro.dev/v4/codec"
) )
type flusher interface { type flusher interface {

View File

@ -4,11 +4,11 @@ import (
"errors" "errors"
"time" "time"
"github.com/imdario/mergo"
"go-micro.dev/v4/config/encoder" "go-micro.dev/v4/config/encoder"
"go-micro.dev/v4/config/encoder/json" "go-micro.dev/v4/config/encoder/json"
"go-micro.dev/v4/config/reader" "go-micro.dev/v4/config/reader"
"go-micro.dev/v4/config/source" "go-micro.dev/v4/config/source"
"github.com/imdario/mergo"
) )
type jsonReader struct { type jsonReader struct {

View File

@ -2,8 +2,8 @@
package box package box
import ( import (
"go-micro.dev/v4/config/secrets"
"github.com/pkg/errors" "github.com/pkg/errors"
"go-micro.dev/v4/config/secrets"
naclbox "golang.org/x/crypto/nacl/box" naclbox "golang.org/x/crypto/nacl/box"
"crypto/rand" "crypto/rand"

View File

@ -3,8 +3,8 @@
package secretbox package secretbox
import ( import (
"go-micro.dev/v4/config/secrets"
"github.com/pkg/errors" "github.com/pkg/errors"
"go-micro.dev/v4/config/secrets"
"golang.org/x/crypto/nacl/secretbox" "golang.org/x/crypto/nacl/secretbox"
"crypto/rand" "crypto/rand"

View File

@ -7,10 +7,10 @@ import (
"strings" "strings"
"time" "time"
"go-micro.dev/v4/cmd"
"go-micro.dev/v4/config/source"
"github.com/imdario/mergo" "github.com/imdario/mergo"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go-micro.dev/v4/cmd"
"go-micro.dev/v4/config/source"
) )
type cliSource struct { type cliSource struct {

View File

@ -5,11 +5,11 @@ import (
"os" "os"
"testing" "testing"
"github.com/urfave/cli/v2"
"go-micro.dev/v4" "go-micro.dev/v4"
"go-micro.dev/v4/cmd" "go-micro.dev/v4/cmd"
"go-micro.dev/v4/config" "go-micro.dev/v4/config"
"go-micro.dev/v4/config/source" "go-micro.dev/v4/config/source"
"github.com/urfave/cli/v2"
) )
func TestCliSourceDefault(t *testing.T) { func TestCliSourceDefault(t *testing.T) {

View File

@ -3,8 +3,8 @@ package cli
import ( import (
"context" "context"
"go-micro.dev/v4/config/source"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"go-micro.dev/v4/config/source"
) )
type contextKey struct{} type contextKey struct{}

View File

@ -6,8 +6,8 @@ import (
"strings" "strings"
"time" "time"
"go-micro.dev/v4/config/source"
"github.com/imdario/mergo" "github.com/imdario/mergo"
"go-micro.dev/v4/config/source"
) )
var ( var (

View File

@ -1,12 +1,13 @@
//+build !linux //go:build !linux
// +build !linux
package file package file
import ( import (
"os" "os"
"go-micro.dev/v4/config/source"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"go-micro.dev/v4/config/source"
) )
type watcher struct { type watcher struct {

View File

@ -1,12 +1,13 @@
//+build linux //go:build linux
// +build linux
package file package file
import ( import (
"os" "os"
"go-micro.dev/v4/config/source"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"go-micro.dev/v4/config/source"
) )
type watcher struct { type watcher struct {

View File

@ -3,8 +3,8 @@ package flag
import ( import (
"errors" "errors"
"flag" "flag"
"go-micro.dev/v4/config/source"
"github.com/imdario/mergo" "github.com/imdario/mergo"
"go-micro.dev/v4/config/source"
"strings" "strings"
"time" "time"
) )

View File

@ -5,8 +5,8 @@ import (
"sync" "sync"
"time" "time"
"go-micro.dev/v4/config/source"
"github.com/google/uuid" "github.com/google/uuid"
"go-micro.dev/v4/config/source"
) )
type memory struct { type memory struct {

View File

@ -3,8 +3,8 @@ package log
import ( import (
"sync" "sync"
"go-micro.dev/v4/util/ring"
"github.com/google/uuid" "github.com/google/uuid"
"go-micro.dev/v4/util/ring"
) )
// Should stream from OS // Should stream from OS

View File

@ -4,8 +4,8 @@ import (
"context" "context"
"time" "time"
"go-micro.dev/v4/util/ring"
"github.com/google/uuid" "github.com/google/uuid"
"go-micro.dev/v4/util/ring"
) )
type memTracer struct { type memTracer struct {

View File

@ -109,9 +109,9 @@ func TestAppend(t *testing.T) {
Status: http.StatusText(500), Status: http.StatusText(500),
}, },
{ {
Id: "test2", Id: "test2",
Code: 400, Code: 400,
Detail: "Bad Request", Detail: "Bad Request",
Status: http.StatusText(400), Status: http.StatusText(400),
}, },
{ {
@ -124,8 +124,8 @@ func TestAppend(t *testing.T) {
for _, e := range testData { for _, e := range testData {
mError.Append(&Error{ mError.Append(&Error{
Id: e.Id, Id: e.Id,
Code: e.Code, Code: e.Code,
Detail: e.Detail, Detail: e.Detail,
Status: e.Status, Status: e.Status,
}) })
@ -146,9 +146,9 @@ func TestHasErrors(t *testing.T) {
Status: http.StatusText(500), Status: http.StatusText(500),
}, },
{ {
Id: "test2", Id: "test2",
Code: 400, Code: 400,
Detail: "Bad Request", Detail: "Bad Request",
Status: http.StatusText(400), Status: http.StatusText(400),
}, },
{ {
@ -165,8 +165,8 @@ func TestHasErrors(t *testing.T) {
for _, e := range testData { for _, e := range testData {
mError.Errors = append(mError.Errors, &Error{ mError.Errors = append(mError.Errors, &Error{
Id: e.Id, Id: e.Id,
Code: e.Code, Code: e.Code,
Detail: e.Detail, Detail: e.Detail,
Status: e.Status, Status: e.Status,
}) })

92
events/events.go Normal file
View File

@ -0,0 +1,92 @@
// Package events is for event streaming and storage
package events
import (
"encoding/json"
"errors"
"time"
)
var (
// DefaultStream is the default events stream implementation
DefaultStream Stream
// DefaultStore is the default events store implementation
DefaultStore Store
)
var (
// ErrMissingTopic is returned if a blank topic was provided to publish
ErrMissingTopic = errors.New("Missing topic")
// ErrEncodingMessage is returned from publish if there was an error encoding the message option
ErrEncodingMessage = errors.New("Error encoding message")
)
// Stream is an event streaming interface
type Stream interface {
Publish(topic string, msg interface{}, opts ...PublishOption) error
Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
}
// Store is an event store interface
type Store interface {
Read(topic string, opts ...ReadOption) ([]*Event, error)
Write(event *Event, opts ...WriteOption) error
}
type AckFunc func() error
type NackFunc func() error
// Event is the object returned by the broker when you subscribe to a topic
type Event struct {
// ID to uniquely identify the event
ID string
// Topic of event, e.g. "registry.service.created"
Topic string
// Timestamp of the event
Timestamp time.Time
// Metadata contains the values the event was indexed by
Metadata map[string]string
// Payload contains the encoded message
Payload []byte
ackFunc AckFunc
nackFunc NackFunc
}
// Unmarshal the events message into an object
func (e *Event) Unmarshal(v interface{}) error {
return json.Unmarshal(e.Payload, v)
}
// Ack acknowledges successful processing of the event in ManualAck mode
func (e *Event) Ack() error {
return e.ackFunc()
}
func (e *Event) SetAckFunc(f AckFunc) {
e.ackFunc = f
}
// Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode
func (e *Event) Nack() error {
return e.nackFunc()
}
func (e *Event) SetNackFunc(f NackFunc) {
e.nackFunc = f
}
// Publish an event to a topic
func Publish(topic string, msg interface{}, opts ...PublishOption) error {
return DefaultStream.Publish(topic, msg, opts...)
}
// Consume to events
func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) {
return DefaultStream.Consume(topic, opts...)
}
// Read events for a topic
func Read(topic string, opts ...ReadOption) ([]*Event, error) {
return DefaultStore.Read(topic, opts...)
}

244
events/memory.go Normal file
View File

@ -0,0 +1,244 @@
package events
import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/pkg/errors"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
)
// NewStream returns an initialized memory stream
func NewStream(opts ...Option) (Stream, error) {
// parse the options
var options Options
for _, o := range opts {
o(&options)
}
return &mem{store: store.NewMemoryStore()}, nil
}
type subscriber struct {
Group string
Topic string
Channel chan Event
sync.RWMutex
retryMap map[string]int
retryLimit int
autoAck bool
ackWait time.Duration
}
type mem struct {
store store.Store
subs []*subscriber
sync.RWMutex
}
func (m *mem) Publish(topic string, msg interface{}, opts ...PublishOption) error {
// validate the topic
if len(topic) == 0 {
return ErrMissingTopic
}
// parse the options
options := PublishOptions{
Timestamp: time.Now(),
}
for _, o := range opts {
o(&options)
}
// encode the message if it's not already encoded
var payload []byte
if p, ok := msg.([]byte); ok {
payload = p
} else {
p, err := json.Marshal(msg)
if err != nil {
return ErrEncodingMessage
}
payload = p
}
// construct the event
event := &Event{
ID: uuid.New().String(),
Topic: topic,
Timestamp: options.Timestamp,
Metadata: options.Metadata,
Payload: payload,
}
// serialize the event to bytes
bytes, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "Error encoding event")
}
// write to the store
key := fmt.Sprintf("%v/%v", event.Topic, event.ID)
if err := m.store.Write(&store.Record{Key: key, Value: bytes}); err != nil {
return errors.Wrap(err, "Error writing event to store")
}
// send to the subscribers async
go m.handleEvent(event)
return nil
}
func (m *mem) Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) {
// validate the topic
if len(topic) == 0 {
return nil, ErrMissingTopic
}
// parse the options
options := ConsumeOptions{
Group: uuid.New().String(),
AutoAck: true,
}
for _, o := range opts {
o(&options)
}
// TODO RetryLimit
// setup the subscriber
sub := &subscriber{
Channel: make(chan Event),
Topic: topic,
Group: options.Group,
retryMap: map[string]int{},
autoAck: true,
retryLimit: options.GetRetryLimit(),
}
if !options.AutoAck {
if options.AckWait == 0 {
return nil, fmt.Errorf("invalid AckWait passed, should be positive integer")
}
sub.autoAck = options.AutoAck
sub.ackWait = options.AckWait
}
// register the subscriber
m.Lock()
m.subs = append(m.subs, sub)
m.Unlock()
// lookup previous events if the start time option was passed
if options.Offset.Unix() > 0 {
go m.lookupPreviousEvents(sub, options.Offset)
}
// return the channel
return sub.Channel, nil
}
// lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends
// them into the subscribers channel
func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) {
// lookup all events which match the topic (a blank topic will return all results)
recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix())
if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Error looking up previous events: %v", err)
return
} else if err != nil {
return
}
// loop through the records and send it to the channel if it matches
for _, r := range recs {
var ev Event
if err := json.Unmarshal(r.Value, &ev); err != nil {
continue
}
if ev.Timestamp.Unix() < startTime.Unix() {
continue
}
sendEvent(&ev, sub)
}
}
// handleEvents sends the event to any registered subscribers.
func (m *mem) handleEvent(ev *Event) {
m.RLock()
subs := m.subs
m.RUnlock()
// filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message
// being sent to two subscribers with the same queue.
filteredSubs := map[string]*subscriber{}
// filter down to subscribers who are interested in this topic
for _, sub := range subs {
if len(sub.Topic) == 0 || sub.Topic == ev.Topic {
filteredSubs[sub.Group] = sub
}
}
// send the message to each channel async (since one channel might be blocked)
for _, sub := range filteredSubs {
sendEvent(ev, sub)
}
}
func sendEvent(ev *Event, sub *subscriber) {
go func(s *subscriber) {
evCopy := *ev
if s.autoAck {
s.Channel <- evCopy
return
}
evCopy.SetAckFunc(ackFunc(s, evCopy))
evCopy.SetNackFunc(nackFunc(s, evCopy))
s.retryMap[evCopy.ID] = 0
tick := time.NewTicker(s.ackWait)
defer tick.Stop()
for range tick.C {
s.Lock()
count, ok := s.retryMap[evCopy.ID]
s.Unlock()
if !ok {
// success
break
}
if s.retryLimit > -1 && count > s.retryLimit {
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit)
}
s.Lock()
delete(s.retryMap, evCopy.ID)
s.Unlock()
return
}
s.Channel <- evCopy
s.Lock()
s.retryMap[evCopy.ID] = count + 1
s.Unlock()
}
}(sub)
}
func ackFunc(s *subscriber, evCopy Event) func() error {
return func() error {
s.Lock()
delete(s.retryMap, evCopy.ID)
s.Unlock()
return nil
}
}
func nackFunc(s *subscriber, evCopy Event) func() error {
return func() error {
return nil
}
}

144
events/options.go Normal file
View File

@ -0,0 +1,144 @@
package events
import "time"
type Options struct{}
type Option func(o *Options)
type StoreOptions struct {
TTL time.Duration
Backup Backup
}
type StoreOption func(o *StoreOptions)
// PublishOptions contains all the options which can be provided when publishing an event
type PublishOptions struct {
// Metadata contains any keys which can be used to query the data, for example a customer id
Metadata map[string]string
// Timestamp to set for the event, if the timestamp is a zero value, the current time will be used
Timestamp time.Time
}
// PublishOption sets attributes on PublishOptions
type PublishOption func(o *PublishOptions)
// WithMetadata sets the Metadata field on PublishOptions
func WithMetadata(md map[string]string) PublishOption {
return func(o *PublishOptions) {
o.Metadata = md
}
}
// WithTimestamp sets the timestamp field on PublishOptions
func WithTimestamp(t time.Time) PublishOption {
return func(o *PublishOptions) {
o.Timestamp = t
}
}
// ConsumeOptions contains all the options which can be provided when subscribing to a topic
type ConsumeOptions struct {
// Group is the name of the consumer group, if two consumers have the same group the events
// are distributed between them
Group string
// Offset is the time from which the messages should be consumed from. If not provided then
// the messages will be consumed starting from the moment the Subscription starts.
Offset time.Time
// AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered.
// If false specifies that each message need ts to be manually acknowledged by the subscriber.
// If processing is successful the message should be ack'ed to remove the message from the stream.
// If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will
// remain on the stream to be processed again.
AutoAck bool
AckWait time.Duration
// RetryLimit indicates number of times a message is retried
RetryLimit int
// CustomRetries indicates whether to use RetryLimit
CustomRetries bool
}
// ConsumeOption sets attributes on ConsumeOptions
type ConsumeOption func(o *ConsumeOptions)
// WithGroup sets the consumer group to be part of when consuming events
func WithGroup(q string) ConsumeOption {
return func(o *ConsumeOptions) {
o.Group = q
}
}
// WithOffset sets the offset time at which to start consuming events
func WithOffset(t time.Time) ConsumeOption {
return func(o *ConsumeOptions) {
o.Offset = t
}
}
// WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received
// the message is requeued in case auto ack is turned off
func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption {
return func(o *ConsumeOptions) {
o.AutoAck = ack
o.AckWait = ackWait
}
}
// WithRetryLimit sets the RetryLimit field on ConsumeOptions.
// Set to -1 for infinite retries (default)
func WithRetryLimit(retries int) ConsumeOption {
return func(o *ConsumeOptions) {
o.RetryLimit = retries
o.CustomRetries = true
}
}
func (s ConsumeOptions) GetRetryLimit() int {
if !s.CustomRetries {
return -1
}
return s.RetryLimit
}
// WriteOptions contains all the options which can be provided when writing an event to a store
type WriteOptions struct {
// TTL is the duration the event should be recorded for, a zero value TTL indicates the event should
// be stored indefinately
TTL time.Duration
}
// WriteOption sets attributes on WriteOptions
type WriteOption func(o *WriteOptions)
// WithTTL sets the TTL attribute on WriteOptions
func WithTTL(d time.Duration) WriteOption {
return func(o *WriteOptions) {
o.TTL = d
}
}
// ReadOptions contains all the options which can be provided when reading events from a store
type ReadOptions struct {
// Limit the number of results to return
Limit uint
// Offset the results by this number, useful for paginated queries
Offset uint
}
// ReadOption sets attributes on ReadOptions
type ReadOption func(o *ReadOptions)
// ReadLimit sets the limit attribute on ReadOptions
func ReadLimit(l uint) ReadOption {
return func(o *ReadOptions) {
o.Limit = 1
}
}
// ReadOffset sets the offset attribute on ReadOptions
func ReadOffset(l uint) ReadOption {
return func(o *ReadOptions) {
o.Offset = 1
}
}

127
events/store.go Normal file
View File

@ -0,0 +1,127 @@
package events
import (
"encoding/json"
"time"
"github.com/pkg/errors"
"go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
)
const joinKey = "/"
// NewStore returns an initialized events store
func NewStore(opts ...StoreOption) Store {
// parse the options
var options StoreOptions
for _, o := range opts {
o(&options)
}
if options.TTL.Seconds() == 0 {
options.TTL = time.Hour * 24
}
// return the store
evs := &evStore{
opts: options,
store: store.NewMemoryStore(),
}
if options.Backup != nil {
go evs.backupLoop()
}
return evs
}
type evStore struct {
opts StoreOptions
store store.Store
}
// Read events for a topic
func (s *evStore) Read(topic string, opts ...ReadOption) ([]*Event, error) {
// validate the topic
if len(topic) == 0 {
return nil, ErrMissingTopic
}
// parse the options
options := ReadOptions{
Offset: 0,
Limit: 250,
}
for _, o := range opts {
o(&options)
}
// execute the request
recs, err := s.store.Read(topic+joinKey,
store.ReadPrefix(),
store.ReadLimit(options.Limit),
store.ReadOffset(options.Offset),
)
if err != nil {
return nil, errors.Wrap(err, "Error reading from store")
}
// unmarshal the result
result := make([]*Event, len(recs))
for i, r := range recs {
var e Event
if err := json.Unmarshal(r.Value, &e); err != nil {
return nil, errors.Wrap(err, "Invalid event returned from stroe")
}
result[i] = &e
}
return result, nil
}
// Write an event to the store
func (s *evStore) Write(event *Event, opts ...WriteOption) error {
// parse the options
options := WriteOptions{
TTL: s.opts.TTL,
}
for _, o := range opts {
o(&options)
}
// construct the store record
bytes, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "Error mashaling event to JSON")
}
// suffix event ID with hour resolution for easy retrieval in batches
timeSuffix := time.Now().Format("2006010215")
record := &store.Record{
// key is such that reading by prefix indexes by topic and reading by suffix indexes by time
Key: event.Topic + joinKey + event.ID + joinKey + timeSuffix,
Value: bytes,
Expiry: options.TTL,
}
// write the record to the store
if err := s.store.Write(record); err != nil {
return errors.Wrap(err, "Error writing to the store")
}
return nil
}
func (s *evStore) backupLoop() {
for {
err := s.opts.Backup.Snapshot(s.store)
if err != nil {
logger.Errorf("Error running backup %s", err)
}
time.Sleep(1 * time.Hour)
}
}
// Backup is an interface for snapshotting the events store to long term storage
type Backup interface {
Snapshot(st store.Store) error
}

47
events/store_test.go Normal file
View File

@ -0,0 +1,47 @@
package events
import (
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
func TestStore(t *testing.T) {
store := NewStore()
testData := []Event{
{ID: uuid.New().String(), Topic: "foo"},
{ID: uuid.New().String(), Topic: "foo"},
{ID: uuid.New().String(), Topic: "bar"},
}
// write the records to the store
t.Run("Write", func(t *testing.T) {
for _, event := range testData {
err := store.Write(&event)
assert.Nilf(t, err, "Writing an event should not return an error")
}
})
// should not be able to read events from a blank topic
t.Run("ReadMissingTopic", func(t *testing.T) {
evs, err := store.Read("")
assert.Equal(t, err, ErrMissingTopic, "Reading a blank topic should return an error")
assert.Nil(t, evs, "No events should be returned")
})
// should only get the events from the topic requested
t.Run("ReadTopic", func(t *testing.T) {
evs, err := store.Read("foo")
assert.Nilf(t, err, "No error should be returned")
assert.Len(t, evs, 2, "Only the events for this topic should be returned")
})
// limits should be honoured
t.Run("ReadTopicLimit", func(t *testing.T) {
evs, err := store.Read("foo", ReadLimit(1))
assert.Nilf(t, err, "No error should be returned")
assert.Len(t, evs, 1, "The result should include no more than the read limit")
})
}

241
events/stream_test.go Normal file
View File

@ -0,0 +1,241 @@
package events
import (
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
type testPayload struct {
Message string
}
type testCase struct {
str Stream
name string
}
func TestStream(t *testing.T) {
tcs := []testCase{}
stream, err := NewStream()
assert.Nilf(t, err, "NewStream should not return an error")
assert.NotNilf(t, stream, "NewStream should return a stream object")
tcs = append(tcs, testCase{str: stream, name: "memory"})
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
runTestStream(t, tc.str)
})
}
}
func runTestStream(t *testing.T, stream Stream) {
// TestMissingTopic will test the topic validation on publish
t.Run("TestMissingTopic", func(t *testing.T) {
err := stream.Publish("", nil)
assert.Equalf(t, err, ErrMissingTopic, "Publishing to a blank topic should return an error")
})
// TestConsumeTopic will publish a message to the test topic. The subscriber will subscribe to the
// same test topic.
t.Run("TestConsumeTopic", func(t *testing.T) {
payload := &testPayload{Message: "HelloWorld"}
metadata := map[string]string{"foo": "bar"}
// create the subscriber
evChan, err := stream.Consume("test")
assert.Nilf(t, err, "Consume should not return an error")
// setup the subscriber async
var wg sync.WaitGroup
go func() {
timeout := time.NewTimer(time.Millisecond * 250)
select {
case event, _ := <-evChan:
assert.NotNilf(t, event, "The message was nil")
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
var result testPayload
err = event.Unmarshal(&result)
assert.Nil(t, err, "Error decoding result")
assert.Equal(t, result, *payload, "Payload didn't match")
wg.Done()
case <-timeout.C:
t.Fatalf("Event was not recieved")
}
}()
err = stream.Publish("test", payload, WithMetadata(metadata))
assert.Nil(t, err, "Publishing a valid message should not return an error")
wg.Add(1)
// wait for the subscriber to recieve the message or timeout
wg.Wait()
})
// TestConsumeGroup will publish a message to a random topic. Two subscribers will then consume
// the message from the firehose topic with different queues. The second subscriber will be registered
// after the message is published to test durability.
t.Run("TestConsumeGroup", func(t *testing.T) {
topic := uuid.New().String()
payload := &testPayload{Message: "HelloWorld"}
metadata := map[string]string{"foo": "bar"}
// create the first subscriber
evChan1, err := stream.Consume(topic)
assert.Nilf(t, err, "Consume should not return an error")
// setup the subscriber async
var wg sync.WaitGroup
go func() {
timeout := time.NewTimer(time.Millisecond * 250)
select {
case event, _ := <-evChan1:
assert.NotNilf(t, event, "The message was nil")
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
var result testPayload
err = event.Unmarshal(&result)
assert.Nil(t, err, "Error decoding result")
assert.Equal(t, result, *payload, "Payload didn't match")
wg.Done()
case <-timeout.C:
t.Fatalf("Event was not recieved")
}
}()
err = stream.Publish(topic, payload, WithMetadata(metadata))
assert.Nil(t, err, "Publishing a valid message should not return an error")
wg.Add(2)
// create the second subscriber
evChan2, err := stream.Consume(topic,
WithGroup("second_queue"),
WithOffset(time.Now().Add(time.Minute*-1)),
)
assert.Nilf(t, err, "Consume should not return an error")
go func() {
timeout := time.NewTimer(time.Second * 1)
select {
case event, _ := <-evChan2:
assert.NotNilf(t, event, "The message was nil")
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
var result testPayload
err = event.Unmarshal(&result)
assert.Nil(t, err, "Error decoding result")
assert.Equal(t, result, *payload, "Payload didn't match")
wg.Done()
case <-timeout.C:
t.Fatalf("Event was not recieved")
}
}()
// wait for the subscriber to recieve the message or timeout
wg.Wait()
})
t.Run("AckingNacking", func(t *testing.T) {
ch, err := stream.Consume("foobarAck", WithAutoAck(false, 5*time.Second))
assert.NoError(t, err, "Unexpected error subscribing")
assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 1"}))
assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 2"}))
ev := <-ch
ev.Ack()
ev = <-ch
nacked := ev.ID
ev.Nack()
select {
case ev = <-ch:
assert.Equal(t, ev.ID, nacked, "Nacked message should have been received again")
assert.NoError(t, ev.Ack())
case <-time.After(7 * time.Second):
t.Fatalf("Timed out waiting for message to be put back on queue")
}
})
t.Run("Retries", func(t *testing.T) {
ch, err := stream.Consume("foobarRetries", WithAutoAck(false, 5*time.Second), WithRetryLimit(1))
assert.NoError(t, err, "Unexpected error subscribing")
assert.NoError(t, stream.Publish("foobarRetries", map[string]string{"foo": "message 1"}))
ev := <-ch
id := ev.ID
ev.Nack()
ev = <-ch
assert.Equal(t, id, ev.ID, "Nacked message should have been received again")
ev.Nack()
select {
case ev = <-ch:
t.Fatalf("Unexpected event received")
case <-time.After(7 * time.Second):
}
})
t.Run("InfiniteRetries", func(t *testing.T) {
ch, err := stream.Consume("foobarRetriesInf", WithAutoAck(false, 2*time.Second))
assert.NoError(t, err, "Unexpected error subscribing")
assert.NoError(t, stream.Publish("foobarRetriesInf", map[string]string{"foo": "message 1"}))
count := 0
id := ""
for {
select {
case ev := <-ch:
if id != "" {
assert.Equal(t, id, ev.ID, "Nacked message should have been received again")
}
id = ev.ID
case <-time.After(3 * time.Second):
t.Fatalf("Unexpected event received")
}
count++
if count == 11 {
break
}
}
})
t.Run("twoSubs", func(t *testing.T) {
ch1, err := stream.Consume("foobarTwoSubs1", WithAutoAck(false, 5*time.Second))
assert.NoError(t, err, "Unexpected error subscribing to topic 1")
ch2, err := stream.Consume("foobarTwoSubs2", WithAutoAck(false, 5*time.Second))
assert.NoError(t, err, "Unexpected error subscribing to topic 2")
assert.NoError(t, stream.Publish("foobarTwoSubs2", map[string]string{"foo": "message 1"}))
assert.NoError(t, stream.Publish("foobarTwoSubs1", map[string]string{"foo": "message 1"}))
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
ev := <-ch1
assert.Equal(t, "foobarTwoSubs1", ev.Topic, "Received message from unexpected topic")
wg.Done()
}()
go func() {
ev := <-ch2
assert.Equal(t, "foobarTwoSubs2", ev.Topic, "Received message from unexpected topic")
wg.Done()
}()
wg.Wait()
})
}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"time" "time"
"github.com/urfave/cli/v2"
"go-micro.dev/v4/auth" "go-micro.dev/v4/auth"
"go-micro.dev/v4/broker" "go-micro.dev/v4/broker"
"go-micro.dev/v4/cache" "go-micro.dev/v4/cache"
@ -18,7 +19,6 @@ import (
"go-micro.dev/v4/server" "go-micro.dev/v4/server"
"go-micro.dev/v4/store" "go-micro.dev/v4/store"
"go-micro.dev/v4/transport" "go-micro.dev/v4/transport"
"github.com/urfave/cli/v2"
) )
// Options for micro service // Options for micro service

View File

@ -5,8 +5,8 @@ import (
"sync" "sync"
"time" "time"
"go-micro.dev/v4/logger"
"github.com/google/uuid" "github.com/google/uuid"
"go-micro.dev/v4/logger"
) )
var ( var (

View File

@ -13,9 +13,9 @@ import (
"sync" "sync"
"time" "time"
"github.com/nxadm/tail"
"go-micro.dev/v4/logger" "go-micro.dev/v4/logger"
"go-micro.dev/v4/runtime/local/git" "go-micro.dev/v4/runtime/local/git"
"github.com/nxadm/tail"
) )
// defaultNamespace to use if not provided as an option // defaultNamespace to use if not provided as an option

View File

@ -8,9 +8,9 @@ import (
"os" "os"
"path/filepath" "path/filepath"
docker "github.com/fsouza/go-dockerclient"
"go-micro.dev/v4/logger" "go-micro.dev/v4/logger"
"go-micro.dev/v4/runtime/local/build" "go-micro.dev/v4/runtime/local/build"
docker "github.com/fsouza/go-dockerclient"
) )
type Builder struct { type Builder struct {

View File

@ -1,3 +1,4 @@
//go:build !windows
// +build !windows // +build !windows
// Package os runs processes locally // Package os runs processes locally

View File

@ -6,8 +6,8 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"go-micro.dev/v4/runtime/local/source"
"github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5"
"go-micro.dev/v4/runtime/local/source"
) )
// Source retrieves source code // Source retrieves source code

View File

@ -4,8 +4,8 @@ import (
"errors" "errors"
"sync" "sync"
"go-micro.dev/v4/server"
"github.com/google/uuid" "github.com/google/uuid"
"go-micro.dev/v4/server"
) )
type MockServer struct { type MockServer struct {

View File

@ -4,6 +4,8 @@ import (
"bytes" "bytes"
"sync" "sync"
"github.com/oxtoacart/bpool"
"github.com/pkg/errors"
"go-micro.dev/v4/codec" "go-micro.dev/v4/codec"
raw "go-micro.dev/v4/codec/bytes" raw "go-micro.dev/v4/codec/bytes"
"go-micro.dev/v4/codec/grpc" "go-micro.dev/v4/codec/grpc"
@ -12,8 +14,6 @@ import (
"go-micro.dev/v4/codec/proto" "go-micro.dev/v4/codec/proto"
"go-micro.dev/v4/codec/protorpc" "go-micro.dev/v4/codec/protorpc"
"go-micro.dev/v4/transport" "go-micro.dev/v4/transport"
"github.com/oxtoacart/bpool"
"github.com/pkg/errors"
) )
type rpcCodec struct { type rpcCodec struct {

View File

@ -9,9 +9,9 @@ import (
"testing" "testing"
"time" "time"
"github.com/golang/protobuf/proto"
"go-micro.dev/v4/codec/json" "go-micro.dev/v4/codec/json"
protoCodec "go-micro.dev/v4/codec/proto" protoCodec "go-micro.dev/v4/codec/proto"
"github.com/golang/protobuf/proto"
) )
// protoStruct implements proto.Message // protoStruct implements proto.Message

View File

@ -7,11 +7,11 @@ import (
"os/signal" "os/signal"
"time" "time"
"github.com/google/uuid"
"go-micro.dev/v4/codec" "go-micro.dev/v4/codec"
"go-micro.dev/v4/logger" "go-micro.dev/v4/logger"
"go-micro.dev/v4/registry" "go-micro.dev/v4/registry"
signalutil "go-micro.dev/v4/util/signal" signalutil "go-micro.dev/v4/util/signal"
"github.com/google/uuid"
) )
// Server is a simple micro server abstraction // Server is a simple micro server abstraction

View File

@ -174,9 +174,21 @@ func (m *memoryStore) Read(key string, opts ...ReadOption) ([]*Record, error) {
// Handle Prefix / suffix // Handle Prefix / suffix
if readOpts.Prefix || readOpts.Suffix { if readOpts.Prefix || readOpts.Suffix {
k := m.list(prefix, readOpts.Limit, readOpts.Offset) k := m.list(prefix, 0, 0)
limit := int(readOpts.Limit)
offset := int(readOpts.Offset)
if limit > len(k) {
limit = len(k)
}
if offset > len(k) {
offset = len(k)
}
for i := offset; i < limit; i++ {
kk := k[i]
for _, kk := range k {
if readOpts.Prefix && !strings.HasPrefix(kk, key) { if readOpts.Prefix && !strings.HasPrefix(kk, key) {
continue continue
} }

View File

@ -8,8 +8,8 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
log "go-micro.dev/v4/logger"
"github.com/miekg/dns" "github.com/miekg/dns"
log "go-micro.dev/v4/logger"
"golang.org/x/net/ipv4" "golang.org/x/net/ipv4"
"golang.org/x/net/ipv6" "golang.org/x/net/ipv6"
) )

View File

@ -4,8 +4,8 @@ import (
"sync" "sync"
"time" "time"
"go-micro.dev/v4/transport"
"github.com/google/uuid" "github.com/google/uuid"
"go-micro.dev/v4/transport"
) )
type pool struct { type pool struct {

View File

@ -3,8 +3,8 @@ package sync
import ( import (
"time" "time"
"go-micro.dev/v4/store"
"github.com/pkg/errors" "github.com/pkg/errors"
"go-micro.dev/v4/store"
) )
type operation struct { type operation struct {

View File

@ -6,9 +6,9 @@ import (
"sync" "sync"
"time" "time"
"go-micro.dev/v4/store"
"github.com/ef-ds/deque" "github.com/ef-ds/deque"
"github.com/pkg/errors" "github.com/pkg/errors"
"go-micro.dev/v4/store"
) )
// Sync implements a sync in for stores // Sync implements a sync in for stores

View File

@ -6,9 +6,9 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/urfave/cli/v2"
"go-micro.dev/v4" "go-micro.dev/v4"
"go-micro.dev/v4/registry" "go-micro.dev/v4/registry"
"github.com/urfave/cli/v2"
) )
//Options for web //Options for web

View File

@ -7,10 +7,10 @@ import (
"testing" "testing"
"time" "time"
"github.com/urfave/cli/v2"
"go-micro.dev/v4" "go-micro.dev/v4"
"go-micro.dev/v4/logger" "go-micro.dev/v4/logger"
"go-micro.dev/v4/web" "go-micro.dev/v4/web"
"github.com/urfave/cli/v2"
) )
func TestWeb(t *testing.T) { func TestWeb(t *testing.T) {