From 0c2041e43908b24f7cb1c788af4f787a84fefffe Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Mon, 8 Nov 2021 08:52:39 +0000 Subject: [PATCH] add events package (#2341) * add events package * update go version --- .github/workflows/pr.yml | 4 +- .github/workflows/tests.yml | 4 +- api/handler/api/util.go | 2 +- api/handler/event/event.go | 4 +- api/handler/rpc/rpc.go | 4 +- api/handler/rpc/rpc_test.go | 2 +- api/handler/rpc/stream.go | 6 +- api/router/registry/registry_test.go | 2 +- api/server/http/http.go | 2 +- broker/http.go | 2 +- broker/http_test.go | 2 +- client/cache.go | 2 +- cmd/cmd.go | 2 +- codec/grpc/grpc.go | 2 +- codec/json/json.go | 2 +- codec/proto/marshaler.go | 2 +- codec/proto/proto.go | 2 +- codec/protorpc/protorpc.go | 2 +- config/reader/json/json.go | 2 +- config/secrets/box/box.go | 2 +- config/secrets/secretbox/secretbox.go | 2 +- config/source/cli/cli.go | 4 +- config/source/cli/cli_test.go | 2 +- config/source/cli/options.go | 2 +- config/source/env/env.go | 2 +- config/source/file/watcher.go | 5 +- config/source/file/watcher_linux.go | 5 +- config/source/flag/flag.go | 2 +- config/source/memory/memory.go | 2 +- debug/log/os.go | 2 +- debug/trace/default.go | 2 +- errors/errors.go | 2 +- errors/errors_test.go | 26 +-- events/events.go | 92 ++++++++++ events/memory.go | 244 ++++++++++++++++++++++++++ events/options.go | 144 +++++++++++++++ events/store.go | 127 ++++++++++++++ events/store_test.go | 47 +++++ events/stream_test.go | 241 +++++++++++++++++++++++++ options.go | 2 +- registry/memory.go | 2 +- runtime/default.go | 2 +- runtime/local/build/docker/docker.go | 2 +- runtime/local/process/os/os.go | 1 + runtime/local/source/git/git.go | 2 +- server/mock/mock.go | 2 +- server/rpc_codec.go | 4 +- server/rpc_stream_test.go | 2 +- server/server.go | 2 +- store/memory.go | 16 +- util/mdns/server.go | 2 +- util/pool/default.go | 2 +- util/sync/manager.go | 2 +- util/sync/sync.go | 2 +- web/options.go | 2 +- web/web_test.go | 2 +- 56 files changed, 982 insertions(+), 72 deletions(-) create mode 100644 events/events.go create mode 100644 events/memory.go create mode 100644 events/options.go create mode 100644 events/store.go create mode 100644 events/store_test.go create mode 100644 events/stream_test.go diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 61205eb5..b5d6037b 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -8,10 +8,10 @@ jobs: runs-on: ubuntu-latest steps: - - name: Set up Go 1.16 + - name: Set up Go 1.17 uses: actions/setup-go@v1 with: - go-version: 1.16 + go-version: 1.17 id: go - name: Check out code into the Go module directory diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 58e83e35..b5345415 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -8,10 +8,10 @@ jobs: runs-on: ubuntu-latest steps: - - name: Set up Go 1.16 + - name: Set up Go 1.17 uses: actions/setup-go@v1 with: - go-version: 1.16 + go-version: 1.17 id: go - name: Check out code into the Go module directory diff --git a/api/handler/api/util.go b/api/handler/api/util.go index 586b7609..72824805 100644 --- a/api/handler/api/util.go +++ b/api/handler/api/util.go @@ -7,10 +7,10 @@ import ( "net/http" "strings" + "github.com/oxtoacart/bpool" api "go-micro.dev/v4/api/proto" "go-micro.dev/v4/registry" "go-micro.dev/v4/selector" - "github.com/oxtoacart/bpool" ) var ( diff --git a/api/handler/event/event.go b/api/handler/event/event.go index 5846fdb3..54ce76b7 100644 --- a/api/handler/event/event.go +++ b/api/handler/event/event.go @@ -10,11 +10,11 @@ import ( "strings" "time" + "github.com/google/uuid" + "github.com/oxtoacart/bpool" "go-micro.dev/v4/api/handler" proto "go-micro.dev/v4/api/proto" "go-micro.dev/v4/util/ctx" - "github.com/google/uuid" - "github.com/oxtoacart/bpool" ) var ( diff --git a/api/handler/rpc/rpc.go b/api/handler/rpc/rpc.go index a24c5a67..97fef58f 100644 --- a/api/handler/rpc/rpc.go +++ b/api/handler/rpc/rpc.go @@ -9,6 +9,8 @@ import ( "strconv" "strings" + jsonpatch "github.com/evanphx/json-patch/v5" + "github.com/oxtoacart/bpool" "go-micro.dev/v4/api" "go-micro.dev/v4/api/handler" "go-micro.dev/v4/api/internal/proto" @@ -23,8 +25,6 @@ import ( "go-micro.dev/v4/selector" "go-micro.dev/v4/util/ctx" "go-micro.dev/v4/util/qson" - jsonpatch "github.com/evanphx/json-patch/v5" - "github.com/oxtoacart/bpool" ) const ( diff --git a/api/handler/rpc/rpc_test.go b/api/handler/rpc/rpc_test.go index e49345b2..d93f5a4d 100644 --- a/api/handler/rpc/rpc_test.go +++ b/api/handler/rpc/rpc_test.go @@ -7,8 +7,8 @@ import ( "reflect" "testing" - go_api "go-micro.dev/v4/api/proto" "github.com/golang/protobuf/proto" + go_api "go-micro.dev/v4/api/proto" ) func TestRequestPayloadFromRequest(t *testing.T) { diff --git a/api/handler/rpc/stream.go b/api/handler/rpc/stream.go index 36c1a9eb..049084c3 100644 --- a/api/handler/rpc/stream.go +++ b/api/handler/rpc/stream.go @@ -9,14 +9,14 @@ import ( "strings" "time" + "github.com/gobwas/httphead" + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" "go-micro.dev/v4/api" "go-micro.dev/v4/client" raw "go-micro.dev/v4/codec/bytes" "go-micro.dev/v4/logger" "go-micro.dev/v4/selector" - "github.com/gobwas/httphead" - "github.com/gobwas/ws" - "github.com/gobwas/ws/wsutil" ) // serveWebsocket will stream rpc back over websockets assuming json diff --git a/api/router/registry/registry_test.go b/api/router/registry/registry_test.go index f49f483a..2df6b57c 100644 --- a/api/router/registry/registry_test.go +++ b/api/router/registry/registry_test.go @@ -3,8 +3,8 @@ package registry import ( "testing" - "go-micro.dev/v4/registry" "github.com/stretchr/testify/assert" + "go-micro.dev/v4/registry" ) func TestStoreRegex(t *testing.T) { diff --git a/api/server/http/http.go b/api/server/http/http.go index c025b19f..40c11619 100644 --- a/api/server/http/http.go +++ b/api/server/http/http.go @@ -8,10 +8,10 @@ import ( "os" "sync" + "github.com/gorilla/handlers" "go-micro.dev/v4/api/server" "go-micro.dev/v4/api/server/cors" "go-micro.dev/v4/logger" - "github.com/gorilla/handlers" ) type httpServer struct { diff --git a/broker/http.go b/broker/http.go index 84d6c0cf..64388f24 100644 --- a/broker/http.go +++ b/broker/http.go @@ -16,6 +16,7 @@ import ( "sync" "time" + "github.com/google/uuid" "go-micro.dev/v4/codec/json" merr "go-micro.dev/v4/errors" "go-micro.dev/v4/registry" @@ -23,7 +24,6 @@ import ( maddr "go-micro.dev/v4/util/addr" mnet "go-micro.dev/v4/util/net" mls "go-micro.dev/v4/util/tls" - "github.com/google/uuid" "golang.org/x/net/http2" ) diff --git a/broker/http_test.go b/broker/http_test.go index b39c8d41..3a0d87a8 100644 --- a/broker/http_test.go +++ b/broker/http_test.go @@ -5,9 +5,9 @@ import ( "testing" "time" + "github.com/google/uuid" "go-micro.dev/v4/broker" "go-micro.dev/v4/registry" - "github.com/google/uuid" ) var ( diff --git a/client/cache.go b/client/cache.go index 69c3da5b..8667c6d0 100644 --- a/client/cache.go +++ b/client/cache.go @@ -7,8 +7,8 @@ import ( "hash/fnv" "time" - "go-micro.dev/v4/metadata" cache "github.com/patrickmn/go-cache" + "go-micro.dev/v4/metadata" ) // NewCache returns an initialised cache. diff --git a/cmd/cmd.go b/cmd/cmd.go index 6373ec57..96f3aea3 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/urfave/cli/v2" "go-micro.dev/v4/auth" "go-micro.dev/v4/broker" "go-micro.dev/v4/cache" @@ -23,7 +24,6 @@ import ( "go-micro.dev/v4/server" "go-micro.dev/v4/store" "go-micro.dev/v4/transport" - "github.com/urfave/cli/v2" ) type Cmd interface { diff --git a/codec/grpc/grpc.go b/codec/grpc/grpc.go index 038819f3..0e364257 100644 --- a/codec/grpc/grpc.go +++ b/codec/grpc/grpc.go @@ -8,8 +8,8 @@ import ( "io" "strings" - "go-micro.dev/v4/codec" "github.com/golang/protobuf/proto" + "go-micro.dev/v4/codec" ) type Codec struct { diff --git a/codec/json/json.go b/codec/json/json.go index 45b127c1..8ed8c3e4 100644 --- a/codec/json/json.go +++ b/codec/json/json.go @@ -5,9 +5,9 @@ import ( "encoding/json" "io" - "go-micro.dev/v4/codec" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" + "go-micro.dev/v4/codec" ) type Codec struct { diff --git a/codec/proto/marshaler.go b/codec/proto/marshaler.go index b7a042e3..bb1eaea7 100644 --- a/codec/proto/marshaler.go +++ b/codec/proto/marshaler.go @@ -3,9 +3,9 @@ package proto import ( "bytes" - "go-micro.dev/v4/codec" "github.com/golang/protobuf/proto" "github.com/oxtoacart/bpool" + "go-micro.dev/v4/codec" ) // create buffer pool with 16 instances each preallocated with 256 bytes diff --git a/codec/proto/proto.go b/codec/proto/proto.go index 0aa7c04f..e3d3b361 100644 --- a/codec/proto/proto.go +++ b/codec/proto/proto.go @@ -4,8 +4,8 @@ package proto import ( "io" - "go-micro.dev/v4/codec" "github.com/golang/protobuf/proto" + "go-micro.dev/v4/codec" ) type Codec struct { diff --git a/codec/protorpc/protorpc.go b/codec/protorpc/protorpc.go index 23749d21..165e0855 100644 --- a/codec/protorpc/protorpc.go +++ b/codec/protorpc/protorpc.go @@ -8,8 +8,8 @@ import ( "strconv" "sync" - "go-micro.dev/v4/codec" "github.com/golang/protobuf/proto" + "go-micro.dev/v4/codec" ) type flusher interface { diff --git a/config/reader/json/json.go b/config/reader/json/json.go index ebcec372..aa891ff6 100644 --- a/config/reader/json/json.go +++ b/config/reader/json/json.go @@ -4,11 +4,11 @@ import ( "errors" "time" + "github.com/imdario/mergo" "go-micro.dev/v4/config/encoder" "go-micro.dev/v4/config/encoder/json" "go-micro.dev/v4/config/reader" "go-micro.dev/v4/config/source" - "github.com/imdario/mergo" ) type jsonReader struct { diff --git a/config/secrets/box/box.go b/config/secrets/box/box.go index 96cbfee0..97140f4b 100644 --- a/config/secrets/box/box.go +++ b/config/secrets/box/box.go @@ -2,8 +2,8 @@ package box import ( - "go-micro.dev/v4/config/secrets" "github.com/pkg/errors" + "go-micro.dev/v4/config/secrets" naclbox "golang.org/x/crypto/nacl/box" "crypto/rand" diff --git a/config/secrets/secretbox/secretbox.go b/config/secrets/secretbox/secretbox.go index df957a5b..9e49a028 100644 --- a/config/secrets/secretbox/secretbox.go +++ b/config/secrets/secretbox/secretbox.go @@ -3,8 +3,8 @@ package secretbox import ( - "go-micro.dev/v4/config/secrets" "github.com/pkg/errors" + "go-micro.dev/v4/config/secrets" "golang.org/x/crypto/nacl/secretbox" "crypto/rand" diff --git a/config/source/cli/cli.go b/config/source/cli/cli.go index 58b3ea8c..5b333424 100644 --- a/config/source/cli/cli.go +++ b/config/source/cli/cli.go @@ -7,10 +7,10 @@ import ( "strings" "time" - "go-micro.dev/v4/cmd" - "go-micro.dev/v4/config/source" "github.com/imdario/mergo" "github.com/urfave/cli/v2" + "go-micro.dev/v4/cmd" + "go-micro.dev/v4/config/source" ) type cliSource struct { diff --git a/config/source/cli/cli_test.go b/config/source/cli/cli_test.go index 3258459b..2866c03c 100644 --- a/config/source/cli/cli_test.go +++ b/config/source/cli/cli_test.go @@ -5,11 +5,11 @@ import ( "os" "testing" + "github.com/urfave/cli/v2" "go-micro.dev/v4" "go-micro.dev/v4/cmd" "go-micro.dev/v4/config" "go-micro.dev/v4/config/source" - "github.com/urfave/cli/v2" ) func TestCliSourceDefault(t *testing.T) { diff --git a/config/source/cli/options.go b/config/source/cli/options.go index 329cdb30..9f75b0a2 100644 --- a/config/source/cli/options.go +++ b/config/source/cli/options.go @@ -3,8 +3,8 @@ package cli import ( "context" - "go-micro.dev/v4/config/source" "github.com/urfave/cli/v2" + "go-micro.dev/v4/config/source" ) type contextKey struct{} diff --git a/config/source/env/env.go b/config/source/env/env.go index 2a524007..45aa3848 100644 --- a/config/source/env/env.go +++ b/config/source/env/env.go @@ -6,8 +6,8 @@ import ( "strings" "time" - "go-micro.dev/v4/config/source" "github.com/imdario/mergo" + "go-micro.dev/v4/config/source" ) var ( diff --git a/config/source/file/watcher.go b/config/source/file/watcher.go index 15274542..4ae16736 100644 --- a/config/source/file/watcher.go +++ b/config/source/file/watcher.go @@ -1,12 +1,13 @@ -//+build !linux +//go:build !linux +// +build !linux package file import ( "os" - "go-micro.dev/v4/config/source" "github.com/fsnotify/fsnotify" + "go-micro.dev/v4/config/source" ) type watcher struct { diff --git a/config/source/file/watcher_linux.go b/config/source/file/watcher_linux.go index c006d35f..36b0ed39 100644 --- a/config/source/file/watcher_linux.go +++ b/config/source/file/watcher_linux.go @@ -1,12 +1,13 @@ -//+build linux +//go:build linux +// +build linux package file import ( "os" - "go-micro.dev/v4/config/source" "github.com/fsnotify/fsnotify" + "go-micro.dev/v4/config/source" ) type watcher struct { diff --git a/config/source/flag/flag.go b/config/source/flag/flag.go index c6903a6f..6c1b97ff 100644 --- a/config/source/flag/flag.go +++ b/config/source/flag/flag.go @@ -3,8 +3,8 @@ package flag import ( "errors" "flag" - "go-micro.dev/v4/config/source" "github.com/imdario/mergo" + "go-micro.dev/v4/config/source" "strings" "time" ) diff --git a/config/source/memory/memory.go b/config/source/memory/memory.go index e6c67f8c..8aa0776b 100644 --- a/config/source/memory/memory.go +++ b/config/source/memory/memory.go @@ -5,8 +5,8 @@ import ( "sync" "time" - "go-micro.dev/v4/config/source" "github.com/google/uuid" + "go-micro.dev/v4/config/source" ) type memory struct { diff --git a/debug/log/os.go b/debug/log/os.go index b0d79cd8..3a6d3252 100644 --- a/debug/log/os.go +++ b/debug/log/os.go @@ -3,8 +3,8 @@ package log import ( "sync" - "go-micro.dev/v4/util/ring" "github.com/google/uuid" + "go-micro.dev/v4/util/ring" ) // Should stream from OS diff --git a/debug/trace/default.go b/debug/trace/default.go index d8443b4f..18c81e25 100644 --- a/debug/trace/default.go +++ b/debug/trace/default.go @@ -4,8 +4,8 @@ import ( "context" "time" - "go-micro.dev/v4/util/ring" "github.com/google/uuid" + "go-micro.dev/v4/util/ring" ) type memTracer struct { diff --git a/errors/errors.go b/errors/errors.go index ac6df6c9..01563208 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -178,4 +178,4 @@ func (e *MultiError) HasErrors() bool { func (e *MultiError) Error() string { b, _ := json.Marshal(e) return string(b) -} \ No newline at end of file +} diff --git a/errors/errors_test.go b/errors/errors_test.go index 3a533df5..d6122c31 100644 --- a/errors/errors_test.go +++ b/errors/errors_test.go @@ -109,23 +109,23 @@ func TestAppend(t *testing.T) { Status: http.StatusText(500), }, { - Id: "test2", - Code: 400, - Detail: "Bad Request", + Id: "test2", + Code: 400, + Detail: "Bad Request", Status: http.StatusText(400), }, { Id: "test3", Code: 404, Detail: "Not Found", - Status: http.StatusText(404), + Status: http.StatusText(404), }, } for _, e := range testData { mError.Append(&Error{ - Id: e.Id, - Code: e.Code, + Id: e.Id, + Code: e.Code, Detail: e.Detail, Status: e.Status, }) @@ -146,16 +146,16 @@ func TestHasErrors(t *testing.T) { Status: http.StatusText(500), }, { - Id: "test2", - Code: 400, - Detail: "Bad Request", + Id: "test2", + Code: 400, + Detail: "Bad Request", Status: http.StatusText(400), }, { Id: "test3", Code: 404, Detail: "Not Found", - Status: http.StatusText(404), + Status: http.StatusText(404), }, } @@ -165,8 +165,8 @@ func TestHasErrors(t *testing.T) { for _, e := range testData { mError.Errors = append(mError.Errors, &Error{ - Id: e.Id, - Code: e.Code, + Id: e.Id, + Code: e.Code, Detail: e.Detail, Status: e.Status, }) @@ -175,4 +175,4 @@ func TestHasErrors(t *testing.T) { if !mError.HasErrors() { t.Fatal("Expected errors") } -} \ No newline at end of file +} diff --git a/events/events.go b/events/events.go new file mode 100644 index 00000000..77bb3df7 --- /dev/null +++ b/events/events.go @@ -0,0 +1,92 @@ +// Package events is for event streaming and storage +package events + +import ( + "encoding/json" + "errors" + "time" +) + +var ( + // DefaultStream is the default events stream implementation + DefaultStream Stream + // DefaultStore is the default events store implementation + DefaultStore Store +) + +var ( + // ErrMissingTopic is returned if a blank topic was provided to publish + ErrMissingTopic = errors.New("Missing topic") + // ErrEncodingMessage is returned from publish if there was an error encoding the message option + ErrEncodingMessage = errors.New("Error encoding message") +) + +// Stream is an event streaming interface +type Stream interface { + Publish(topic string, msg interface{}, opts ...PublishOption) error + Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) +} + +// Store is an event store interface +type Store interface { + Read(topic string, opts ...ReadOption) ([]*Event, error) + Write(event *Event, opts ...WriteOption) error +} + +type AckFunc func() error +type NackFunc func() error + +// Event is the object returned by the broker when you subscribe to a topic +type Event struct { + // ID to uniquely identify the event + ID string + // Topic of event, e.g. "registry.service.created" + Topic string + // Timestamp of the event + Timestamp time.Time + // Metadata contains the values the event was indexed by + Metadata map[string]string + // Payload contains the encoded message + Payload []byte + + ackFunc AckFunc + nackFunc NackFunc +} + +// Unmarshal the events message into an object +func (e *Event) Unmarshal(v interface{}) error { + return json.Unmarshal(e.Payload, v) +} + +// Ack acknowledges successful processing of the event in ManualAck mode +func (e *Event) Ack() error { + return e.ackFunc() +} + +func (e *Event) SetAckFunc(f AckFunc) { + e.ackFunc = f +} + +// Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode +func (e *Event) Nack() error { + return e.nackFunc() +} + +func (e *Event) SetNackFunc(f NackFunc) { + e.nackFunc = f +} + +// Publish an event to a topic +func Publish(topic string, msg interface{}, opts ...PublishOption) error { + return DefaultStream.Publish(topic, msg, opts...) +} + +// Consume to events +func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) { + return DefaultStream.Consume(topic, opts...) +} + +// Read events for a topic +func Read(topic string, opts ...ReadOption) ([]*Event, error) { + return DefaultStore.Read(topic, opts...) +} diff --git a/events/memory.go b/events/memory.go new file mode 100644 index 00000000..dc62e660 --- /dev/null +++ b/events/memory.go @@ -0,0 +1,244 @@ +package events + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/google/uuid" + "github.com/pkg/errors" + "go-micro.dev/v4/logger" + "go-micro.dev/v4/store" +) + +// NewStream returns an initialized memory stream +func NewStream(opts ...Option) (Stream, error) { + // parse the options + var options Options + for _, o := range opts { + o(&options) + } + return &mem{store: store.NewMemoryStore()}, nil +} + +type subscriber struct { + Group string + Topic string + Channel chan Event + + sync.RWMutex + retryMap map[string]int + retryLimit int + autoAck bool + ackWait time.Duration +} + +type mem struct { + store store.Store + + subs []*subscriber + sync.RWMutex +} + +func (m *mem) Publish(topic string, msg interface{}, opts ...PublishOption) error { + // validate the topic + if len(topic) == 0 { + return ErrMissingTopic + } + + // parse the options + options := PublishOptions{ + Timestamp: time.Now(), + } + for _, o := range opts { + o(&options) + } + + // encode the message if it's not already encoded + var payload []byte + if p, ok := msg.([]byte); ok { + payload = p + } else { + p, err := json.Marshal(msg) + if err != nil { + return ErrEncodingMessage + } + payload = p + } + + // construct the event + event := &Event{ + ID: uuid.New().String(), + Topic: topic, + Timestamp: options.Timestamp, + Metadata: options.Metadata, + Payload: payload, + } + + // serialize the event to bytes + bytes, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "Error encoding event") + } + + // write to the store + key := fmt.Sprintf("%v/%v", event.Topic, event.ID) + if err := m.store.Write(&store.Record{Key: key, Value: bytes}); err != nil { + return errors.Wrap(err, "Error writing event to store") + } + + // send to the subscribers async + go m.handleEvent(event) + + return nil +} + +func (m *mem) Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) { + // validate the topic + if len(topic) == 0 { + return nil, ErrMissingTopic + } + + // parse the options + options := ConsumeOptions{ + Group: uuid.New().String(), + AutoAck: true, + } + for _, o := range opts { + o(&options) + } + // TODO RetryLimit + + // setup the subscriber + sub := &subscriber{ + Channel: make(chan Event), + Topic: topic, + Group: options.Group, + retryMap: map[string]int{}, + autoAck: true, + retryLimit: options.GetRetryLimit(), + } + + if !options.AutoAck { + if options.AckWait == 0 { + return nil, fmt.Errorf("invalid AckWait passed, should be positive integer") + } + sub.autoAck = options.AutoAck + sub.ackWait = options.AckWait + } + + // register the subscriber + m.Lock() + m.subs = append(m.subs, sub) + m.Unlock() + + // lookup previous events if the start time option was passed + if options.Offset.Unix() > 0 { + go m.lookupPreviousEvents(sub, options.Offset) + } + + // return the channel + return sub.Channel, nil +} + +// lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends +// them into the subscribers channel +func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) { + // lookup all events which match the topic (a blank topic will return all results) + recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix()) + if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Error looking up previous events: %v", err) + return + } else if err != nil { + return + } + + // loop through the records and send it to the channel if it matches + for _, r := range recs { + var ev Event + if err := json.Unmarshal(r.Value, &ev); err != nil { + continue + } + if ev.Timestamp.Unix() < startTime.Unix() { + continue + } + sendEvent(&ev, sub) + } +} + +// handleEvents sends the event to any registered subscribers. +func (m *mem) handleEvent(ev *Event) { + m.RLock() + subs := m.subs + m.RUnlock() + + // filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message + // being sent to two subscribers with the same queue. + filteredSubs := map[string]*subscriber{} + + // filter down to subscribers who are interested in this topic + for _, sub := range subs { + if len(sub.Topic) == 0 || sub.Topic == ev.Topic { + filteredSubs[sub.Group] = sub + } + } + + // send the message to each channel async (since one channel might be blocked) + for _, sub := range filteredSubs { + sendEvent(ev, sub) + } +} + +func sendEvent(ev *Event, sub *subscriber) { + go func(s *subscriber) { + evCopy := *ev + if s.autoAck { + s.Channel <- evCopy + return + } + evCopy.SetAckFunc(ackFunc(s, evCopy)) + evCopy.SetNackFunc(nackFunc(s, evCopy)) + s.retryMap[evCopy.ID] = 0 + tick := time.NewTicker(s.ackWait) + defer tick.Stop() + for range tick.C { + s.Lock() + count, ok := s.retryMap[evCopy.ID] + s.Unlock() + if !ok { + // success + break + } + + if s.retryLimit > -1 && count > s.retryLimit { + if logger.V(logger.ErrorLevel, logger.DefaultLogger) { + logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit) + } + s.Lock() + delete(s.retryMap, evCopy.ID) + s.Unlock() + return + } + s.Channel <- evCopy + s.Lock() + s.retryMap[evCopy.ID] = count + 1 + s.Unlock() + } + }(sub) +} + +func ackFunc(s *subscriber, evCopy Event) func() error { + return func() error { + s.Lock() + delete(s.retryMap, evCopy.ID) + s.Unlock() + return nil + } +} + +func nackFunc(s *subscriber, evCopy Event) func() error { + return func() error { + return nil + } +} diff --git a/events/options.go b/events/options.go new file mode 100644 index 00000000..4a6fbf51 --- /dev/null +++ b/events/options.go @@ -0,0 +1,144 @@ +package events + +import "time" + +type Options struct{} + +type Option func(o *Options) + +type StoreOptions struct { + TTL time.Duration + Backup Backup +} + +type StoreOption func(o *StoreOptions) + +// PublishOptions contains all the options which can be provided when publishing an event +type PublishOptions struct { + // Metadata contains any keys which can be used to query the data, for example a customer id + Metadata map[string]string + // Timestamp to set for the event, if the timestamp is a zero value, the current time will be used + Timestamp time.Time +} + +// PublishOption sets attributes on PublishOptions +type PublishOption func(o *PublishOptions) + +// WithMetadata sets the Metadata field on PublishOptions +func WithMetadata(md map[string]string) PublishOption { + return func(o *PublishOptions) { + o.Metadata = md + } +} + +// WithTimestamp sets the timestamp field on PublishOptions +func WithTimestamp(t time.Time) PublishOption { + return func(o *PublishOptions) { + o.Timestamp = t + } +} + +// ConsumeOptions contains all the options which can be provided when subscribing to a topic +type ConsumeOptions struct { + // Group is the name of the consumer group, if two consumers have the same group the events + // are distributed between them + Group string + // Offset is the time from which the messages should be consumed from. If not provided then + // the messages will be consumed starting from the moment the Subscription starts. + Offset time.Time + // AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered. + // If false specifies that each message need ts to be manually acknowledged by the subscriber. + // If processing is successful the message should be ack'ed to remove the message from the stream. + // If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will + // remain on the stream to be processed again. + AutoAck bool + AckWait time.Duration + // RetryLimit indicates number of times a message is retried + RetryLimit int + // CustomRetries indicates whether to use RetryLimit + CustomRetries bool +} + +// ConsumeOption sets attributes on ConsumeOptions +type ConsumeOption func(o *ConsumeOptions) + +// WithGroup sets the consumer group to be part of when consuming events +func WithGroup(q string) ConsumeOption { + return func(o *ConsumeOptions) { + o.Group = q + } +} + +// WithOffset sets the offset time at which to start consuming events +func WithOffset(t time.Time) ConsumeOption { + return func(o *ConsumeOptions) { + o.Offset = t + } +} + +// WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received +// the message is requeued in case auto ack is turned off +func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption { + return func(o *ConsumeOptions) { + o.AutoAck = ack + o.AckWait = ackWait + } +} + +// WithRetryLimit sets the RetryLimit field on ConsumeOptions. +// Set to -1 for infinite retries (default) +func WithRetryLimit(retries int) ConsumeOption { + return func(o *ConsumeOptions) { + o.RetryLimit = retries + o.CustomRetries = true + } +} + +func (s ConsumeOptions) GetRetryLimit() int { + if !s.CustomRetries { + return -1 + } + return s.RetryLimit +} + +// WriteOptions contains all the options which can be provided when writing an event to a store +type WriteOptions struct { + // TTL is the duration the event should be recorded for, a zero value TTL indicates the event should + // be stored indefinately + TTL time.Duration +} + +// WriteOption sets attributes on WriteOptions +type WriteOption func(o *WriteOptions) + +// WithTTL sets the TTL attribute on WriteOptions +func WithTTL(d time.Duration) WriteOption { + return func(o *WriteOptions) { + o.TTL = d + } +} + +// ReadOptions contains all the options which can be provided when reading events from a store +type ReadOptions struct { + // Limit the number of results to return + Limit uint + // Offset the results by this number, useful for paginated queries + Offset uint +} + +// ReadOption sets attributes on ReadOptions +type ReadOption func(o *ReadOptions) + +// ReadLimit sets the limit attribute on ReadOptions +func ReadLimit(l uint) ReadOption { + return func(o *ReadOptions) { + o.Limit = 1 + } +} + +// ReadOffset sets the offset attribute on ReadOptions +func ReadOffset(l uint) ReadOption { + return func(o *ReadOptions) { + o.Offset = 1 + } +} diff --git a/events/store.go b/events/store.go new file mode 100644 index 00000000..ac7cca22 --- /dev/null +++ b/events/store.go @@ -0,0 +1,127 @@ +package events + +import ( + "encoding/json" + "time" + + "github.com/pkg/errors" + "go-micro.dev/v4/logger" + "go-micro.dev/v4/store" +) + +const joinKey = "/" + +// NewStore returns an initialized events store +func NewStore(opts ...StoreOption) Store { + // parse the options + var options StoreOptions + for _, o := range opts { + o(&options) + } + if options.TTL.Seconds() == 0 { + options.TTL = time.Hour * 24 + } + + // return the store + evs := &evStore{ + opts: options, + store: store.NewMemoryStore(), + } + if options.Backup != nil { + go evs.backupLoop() + } + return evs +} + +type evStore struct { + opts StoreOptions + store store.Store +} + +// Read events for a topic +func (s *evStore) Read(topic string, opts ...ReadOption) ([]*Event, error) { + // validate the topic + if len(topic) == 0 { + return nil, ErrMissingTopic + } + + // parse the options + options := ReadOptions{ + Offset: 0, + Limit: 250, + } + for _, o := range opts { + o(&options) + } + + // execute the request + recs, err := s.store.Read(topic+joinKey, + store.ReadPrefix(), + store.ReadLimit(options.Limit), + store.ReadOffset(options.Offset), + ) + if err != nil { + return nil, errors.Wrap(err, "Error reading from store") + } + + // unmarshal the result + result := make([]*Event, len(recs)) + for i, r := range recs { + var e Event + if err := json.Unmarshal(r.Value, &e); err != nil { + return nil, errors.Wrap(err, "Invalid event returned from stroe") + } + result[i] = &e + } + + return result, nil +} + +// Write an event to the store +func (s *evStore) Write(event *Event, opts ...WriteOption) error { + // parse the options + options := WriteOptions{ + TTL: s.opts.TTL, + } + for _, o := range opts { + o(&options) + } + + // construct the store record + bytes, err := json.Marshal(event) + if err != nil { + return errors.Wrap(err, "Error mashaling event to JSON") + } + // suffix event ID with hour resolution for easy retrieval in batches + timeSuffix := time.Now().Format("2006010215") + + record := &store.Record{ + // key is such that reading by prefix indexes by topic and reading by suffix indexes by time + Key: event.Topic + joinKey + event.ID + joinKey + timeSuffix, + Value: bytes, + Expiry: options.TTL, + } + + // write the record to the store + if err := s.store.Write(record); err != nil { + return errors.Wrap(err, "Error writing to the store") + } + + return nil +} + +func (s *evStore) backupLoop() { + for { + err := s.opts.Backup.Snapshot(s.store) + if err != nil { + logger.Errorf("Error running backup %s", err) + } + + time.Sleep(1 * time.Hour) + } +} + +// Backup is an interface for snapshotting the events store to long term storage +type Backup interface { + Snapshot(st store.Store) error +} diff --git a/events/store_test.go b/events/store_test.go new file mode 100644 index 00000000..2c1142eb --- /dev/null +++ b/events/store_test.go @@ -0,0 +1,47 @@ +package events + +import ( + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +func TestStore(t *testing.T) { + store := NewStore() + + testData := []Event{ + {ID: uuid.New().String(), Topic: "foo"}, + {ID: uuid.New().String(), Topic: "foo"}, + {ID: uuid.New().String(), Topic: "bar"}, + } + + // write the records to the store + t.Run("Write", func(t *testing.T) { + for _, event := range testData { + err := store.Write(&event) + assert.Nilf(t, err, "Writing an event should not return an error") + } + }) + + // should not be able to read events from a blank topic + t.Run("ReadMissingTopic", func(t *testing.T) { + evs, err := store.Read("") + assert.Equal(t, err, ErrMissingTopic, "Reading a blank topic should return an error") + assert.Nil(t, evs, "No events should be returned") + }) + + // should only get the events from the topic requested + t.Run("ReadTopic", func(t *testing.T) { + evs, err := store.Read("foo") + assert.Nilf(t, err, "No error should be returned") + assert.Len(t, evs, 2, "Only the events for this topic should be returned") + }) + + // limits should be honoured + t.Run("ReadTopicLimit", func(t *testing.T) { + evs, err := store.Read("foo", ReadLimit(1)) + assert.Nilf(t, err, "No error should be returned") + assert.Len(t, evs, 1, "The result should include no more than the read limit") + }) +} diff --git a/events/stream_test.go b/events/stream_test.go new file mode 100644 index 00000000..462b13af --- /dev/null +++ b/events/stream_test.go @@ -0,0 +1,241 @@ +package events + +import ( + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" +) + +type testPayload struct { + Message string +} + +type testCase struct { + str Stream + name string +} + +func TestStream(t *testing.T) { + tcs := []testCase{} + + stream, err := NewStream() + assert.Nilf(t, err, "NewStream should not return an error") + assert.NotNilf(t, stream, "NewStream should return a stream object") + tcs = append(tcs, testCase{str: stream, name: "memory"}) + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + runTestStream(t, tc.str) + }) + } + +} + +func runTestStream(t *testing.T, stream Stream) { + // TestMissingTopic will test the topic validation on publish + t.Run("TestMissingTopic", func(t *testing.T) { + err := stream.Publish("", nil) + assert.Equalf(t, err, ErrMissingTopic, "Publishing to a blank topic should return an error") + }) + + // TestConsumeTopic will publish a message to the test topic. The subscriber will subscribe to the + // same test topic. + t.Run("TestConsumeTopic", func(t *testing.T) { + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the subscriber + evChan, err := stream.Consume("test") + assert.Nilf(t, err, "Consume should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish("test", payload, WithMetadata(metadata)) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(1) + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + // TestConsumeGroup will publish a message to a random topic. Two subscribers will then consume + // the message from the firehose topic with different queues. The second subscriber will be registered + // after the message is published to test durability. + t.Run("TestConsumeGroup", func(t *testing.T) { + topic := uuid.New().String() + payload := &testPayload{Message: "HelloWorld"} + metadata := map[string]string{"foo": "bar"} + + // create the first subscriber + evChan1, err := stream.Consume(topic) + assert.Nilf(t, err, "Consume should not return an error") + + // setup the subscriber async + var wg sync.WaitGroup + + go func() { + timeout := time.NewTimer(time.Millisecond * 250) + + select { + case event, _ := <-evChan1: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + err = stream.Publish(topic, payload, WithMetadata(metadata)) + assert.Nil(t, err, "Publishing a valid message should not return an error") + wg.Add(2) + + // create the second subscriber + evChan2, err := stream.Consume(topic, + WithGroup("second_queue"), + WithOffset(time.Now().Add(time.Minute*-1)), + ) + assert.Nilf(t, err, "Consume should not return an error") + + go func() { + timeout := time.NewTimer(time.Second * 1) + + select { + case event, _ := <-evChan2: + assert.NotNilf(t, event, "The message was nil") + assert.Equal(t, event.Metadata, metadata, "Metadata didn't match") + + var result testPayload + err = event.Unmarshal(&result) + assert.Nil(t, err, "Error decoding result") + assert.Equal(t, result, *payload, "Payload didn't match") + + wg.Done() + case <-timeout.C: + t.Fatalf("Event was not recieved") + } + }() + + // wait for the subscriber to recieve the message or timeout + wg.Wait() + }) + + t.Run("AckingNacking", func(t *testing.T) { + ch, err := stream.Consume("foobarAck", WithAutoAck(false, 5*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing") + assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 1"})) + assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 2"})) + + ev := <-ch + ev.Ack() + ev = <-ch + nacked := ev.ID + ev.Nack() + select { + case ev = <-ch: + assert.Equal(t, ev.ID, nacked, "Nacked message should have been received again") + assert.NoError(t, ev.Ack()) + case <-time.After(7 * time.Second): + t.Fatalf("Timed out waiting for message to be put back on queue") + } + + }) + + t.Run("Retries", func(t *testing.T) { + ch, err := stream.Consume("foobarRetries", WithAutoAck(false, 5*time.Second), WithRetryLimit(1)) + assert.NoError(t, err, "Unexpected error subscribing") + assert.NoError(t, stream.Publish("foobarRetries", map[string]string{"foo": "message 1"})) + + ev := <-ch + id := ev.ID + ev.Nack() + ev = <-ch + assert.Equal(t, id, ev.ID, "Nacked message should have been received again") + ev.Nack() + select { + case ev = <-ch: + t.Fatalf("Unexpected event received") + case <-time.After(7 * time.Second): + } + + }) + + t.Run("InfiniteRetries", func(t *testing.T) { + ch, err := stream.Consume("foobarRetriesInf", WithAutoAck(false, 2*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing") + assert.NoError(t, stream.Publish("foobarRetriesInf", map[string]string{"foo": "message 1"})) + + count := 0 + id := "" + for { + select { + case ev := <-ch: + if id != "" { + assert.Equal(t, id, ev.ID, "Nacked message should have been received again") + } + id = ev.ID + case <-time.After(3 * time.Second): + t.Fatalf("Unexpected event received") + } + + count++ + if count == 11 { + break + } + } + + }) + + t.Run("twoSubs", func(t *testing.T) { + ch1, err := stream.Consume("foobarTwoSubs1", WithAutoAck(false, 5*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing to topic 1") + ch2, err := stream.Consume("foobarTwoSubs2", WithAutoAck(false, 5*time.Second)) + assert.NoError(t, err, "Unexpected error subscribing to topic 2") + + assert.NoError(t, stream.Publish("foobarTwoSubs2", map[string]string{"foo": "message 1"})) + assert.NoError(t, stream.Publish("foobarTwoSubs1", map[string]string{"foo": "message 1"})) + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + ev := <-ch1 + assert.Equal(t, "foobarTwoSubs1", ev.Topic, "Received message from unexpected topic") + wg.Done() + }() + go func() { + ev := <-ch2 + assert.Equal(t, "foobarTwoSubs2", ev.Topic, "Received message from unexpected topic") + wg.Done() + }() + wg.Wait() + }) +} diff --git a/options.go b/options.go index d6270626..74c69385 100644 --- a/options.go +++ b/options.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/urfave/cli/v2" "go-micro.dev/v4/auth" "go-micro.dev/v4/broker" "go-micro.dev/v4/cache" @@ -18,7 +19,6 @@ import ( "go-micro.dev/v4/server" "go-micro.dev/v4/store" "go-micro.dev/v4/transport" - "github.com/urfave/cli/v2" ) // Options for micro service diff --git a/registry/memory.go b/registry/memory.go index bb06391e..04b60e20 100644 --- a/registry/memory.go +++ b/registry/memory.go @@ -5,8 +5,8 @@ import ( "sync" "time" - "go-micro.dev/v4/logger" "github.com/google/uuid" + "go-micro.dev/v4/logger" ) var ( diff --git a/runtime/default.go b/runtime/default.go index 447116f8..9686e138 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -13,9 +13,9 @@ import ( "sync" "time" + "github.com/nxadm/tail" "go-micro.dev/v4/logger" "go-micro.dev/v4/runtime/local/git" - "github.com/nxadm/tail" ) // defaultNamespace to use if not provided as an option diff --git a/runtime/local/build/docker/docker.go b/runtime/local/build/docker/docker.go index b92c2cd8..567dfbde 100644 --- a/runtime/local/build/docker/docker.go +++ b/runtime/local/build/docker/docker.go @@ -8,9 +8,9 @@ import ( "os" "path/filepath" + docker "github.com/fsouza/go-dockerclient" "go-micro.dev/v4/logger" "go-micro.dev/v4/runtime/local/build" - docker "github.com/fsouza/go-dockerclient" ) type Builder struct { diff --git a/runtime/local/process/os/os.go b/runtime/local/process/os/os.go index ff08b740..ff38f497 100644 --- a/runtime/local/process/os/os.go +++ b/runtime/local/process/os/os.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows // Package os runs processes locally diff --git a/runtime/local/source/git/git.go b/runtime/local/source/git/git.go index 67b90fec..f46cccf6 100644 --- a/runtime/local/source/git/git.go +++ b/runtime/local/source/git/git.go @@ -6,8 +6,8 @@ import ( "path/filepath" "strings" - "go-micro.dev/v4/runtime/local/source" "github.com/go-git/go-git/v5" + "go-micro.dev/v4/runtime/local/source" ) // Source retrieves source code diff --git a/server/mock/mock.go b/server/mock/mock.go index 3eb31b97..a6e65e84 100644 --- a/server/mock/mock.go +++ b/server/mock/mock.go @@ -4,8 +4,8 @@ import ( "errors" "sync" - "go-micro.dev/v4/server" "github.com/google/uuid" + "go-micro.dev/v4/server" ) type MockServer struct { diff --git a/server/rpc_codec.go b/server/rpc_codec.go index f7ad93e8..78fee607 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -4,6 +4,8 @@ import ( "bytes" "sync" + "github.com/oxtoacart/bpool" + "github.com/pkg/errors" "go-micro.dev/v4/codec" raw "go-micro.dev/v4/codec/bytes" "go-micro.dev/v4/codec/grpc" @@ -12,8 +14,6 @@ import ( "go-micro.dev/v4/codec/proto" "go-micro.dev/v4/codec/protorpc" "go-micro.dev/v4/transport" - "github.com/oxtoacart/bpool" - "github.com/pkg/errors" ) type rpcCodec struct { diff --git a/server/rpc_stream_test.go b/server/rpc_stream_test.go index 6de74d35..f90bda70 100644 --- a/server/rpc_stream_test.go +++ b/server/rpc_stream_test.go @@ -9,9 +9,9 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "go-micro.dev/v4/codec/json" protoCodec "go-micro.dev/v4/codec/proto" - "github.com/golang/protobuf/proto" ) // protoStruct implements proto.Message diff --git a/server/server.go b/server/server.go index 31fa2134..ed9a4832 100644 --- a/server/server.go +++ b/server/server.go @@ -7,11 +7,11 @@ import ( "os/signal" "time" + "github.com/google/uuid" "go-micro.dev/v4/codec" "go-micro.dev/v4/logger" "go-micro.dev/v4/registry" signalutil "go-micro.dev/v4/util/signal" - "github.com/google/uuid" ) // Server is a simple micro server abstraction diff --git a/store/memory.go b/store/memory.go index 9a570da0..57d0b938 100644 --- a/store/memory.go +++ b/store/memory.go @@ -174,9 +174,21 @@ func (m *memoryStore) Read(key string, opts ...ReadOption) ([]*Record, error) { // Handle Prefix / suffix if readOpts.Prefix || readOpts.Suffix { - k := m.list(prefix, readOpts.Limit, readOpts.Offset) + k := m.list(prefix, 0, 0) + limit := int(readOpts.Limit) + offset := int(readOpts.Offset) + + if limit > len(k) { + limit = len(k) + } + + if offset > len(k) { + offset = len(k) + } + + for i := offset; i < limit; i++ { + kk := k[i] - for _, kk := range k { if readOpts.Prefix && !strings.HasPrefix(kk, key) { continue } diff --git a/util/mdns/server.go b/util/mdns/server.go index 96c1a590..b100574a 100644 --- a/util/mdns/server.go +++ b/util/mdns/server.go @@ -8,8 +8,8 @@ import ( "sync/atomic" "time" - log "go-micro.dev/v4/logger" "github.com/miekg/dns" + log "go-micro.dev/v4/logger" "golang.org/x/net/ipv4" "golang.org/x/net/ipv6" ) diff --git a/util/pool/default.go b/util/pool/default.go index 9e11c87a..90cad2f4 100644 --- a/util/pool/default.go +++ b/util/pool/default.go @@ -4,8 +4,8 @@ import ( "sync" "time" - "go-micro.dev/v4/transport" "github.com/google/uuid" + "go-micro.dev/v4/transport" ) type pool struct { diff --git a/util/sync/manager.go b/util/sync/manager.go index 4541a5f1..6d24247e 100644 --- a/util/sync/manager.go +++ b/util/sync/manager.go @@ -3,8 +3,8 @@ package sync import ( "time" - "go-micro.dev/v4/store" "github.com/pkg/errors" + "go-micro.dev/v4/store" ) type operation struct { diff --git a/util/sync/sync.go b/util/sync/sync.go index 0e03c8a3..65468630 100644 --- a/util/sync/sync.go +++ b/util/sync/sync.go @@ -6,9 +6,9 @@ import ( "sync" "time" - "go-micro.dev/v4/store" "github.com/ef-ds/deque" "github.com/pkg/errors" + "go-micro.dev/v4/store" ) // Sync implements a sync in for stores diff --git a/web/options.go b/web/options.go index adc06061..1e510533 100644 --- a/web/options.go +++ b/web/options.go @@ -6,9 +6,9 @@ import ( "net/http" "time" + "github.com/urfave/cli/v2" "go-micro.dev/v4" "go-micro.dev/v4/registry" - "github.com/urfave/cli/v2" ) //Options for web diff --git a/web/web_test.go b/web/web_test.go index a7815acd..435105be 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -7,10 +7,10 @@ import ( "testing" "time" + "github.com/urfave/cli/v2" "go-micro.dev/v4" "go-micro.dev/v4/logger" "go-micro.dev/v4/web" - "github.com/urfave/cli/v2" ) func TestWeb(t *testing.T) {