mirror of
https://github.com/go-micro/go-micro.git
synced 2025-09-16 08:36:30 +02:00
Merge branch 'master' of ssh://github.com/asim/go-micro
This commit is contained in:
4
.github/workflows/pr.yml
vendored
4
.github/workflows/pr.yml
vendored
@@ -8,10 +8,10 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
|
||||
- name: Set up Go 1.16
|
||||
- name: Set up Go 1.17
|
||||
uses: actions/setup-go@v1
|
||||
with:
|
||||
go-version: 1.16
|
||||
go-version: 1.17
|
||||
id: go
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
|
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -8,10 +8,10 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
|
||||
- name: Set up Go 1.16
|
||||
- name: Set up Go 1.17
|
||||
uses: actions/setup-go@v1
|
||||
with:
|
||||
go-version: 1.16
|
||||
go-version: 1.17
|
||||
id: go
|
||||
|
||||
- name: Check out code into the Go module directory
|
||||
|
@@ -7,10 +7,10 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/oxtoacart/bpool"
|
||||
api "go-micro.dev/v4/api/proto"
|
||||
"go-micro.dev/v4/registry"
|
||||
"go-micro.dev/v4/selector"
|
||||
"github.com/oxtoacart/bpool"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@@ -10,11 +10,11 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/oxtoacart/bpool"
|
||||
"go-micro.dev/v4/api/handler"
|
||||
proto "go-micro.dev/v4/api/proto"
|
||||
"go-micro.dev/v4/util/ctx"
|
||||
"github.com/google/uuid"
|
||||
"github.com/oxtoacart/bpool"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@@ -9,6 +9,8 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch/v5"
|
||||
"github.com/oxtoacart/bpool"
|
||||
"go-micro.dev/v4/api"
|
||||
"go-micro.dev/v4/api/handler"
|
||||
"go-micro.dev/v4/api/internal/proto"
|
||||
@@ -23,8 +25,6 @@ import (
|
||||
"go-micro.dev/v4/selector"
|
||||
"go-micro.dev/v4/util/ctx"
|
||||
"go-micro.dev/v4/util/qson"
|
||||
jsonpatch "github.com/evanphx/json-patch/v5"
|
||||
"github.com/oxtoacart/bpool"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@@ -7,8 +7,8 @@ import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
go_api "go-micro.dev/v4/api/proto"
|
||||
"github.com/golang/protobuf/proto"
|
||||
go_api "go-micro.dev/v4/api/proto"
|
||||
)
|
||||
|
||||
func TestRequestPayloadFromRequest(t *testing.T) {
|
||||
|
@@ -9,14 +9,14 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gobwas/httphead"
|
||||
"github.com/gobwas/ws"
|
||||
"github.com/gobwas/ws/wsutil"
|
||||
"go-micro.dev/v4/api"
|
||||
"go-micro.dev/v4/client"
|
||||
raw "go-micro.dev/v4/codec/bytes"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/selector"
|
||||
"github.com/gobwas/httphead"
|
||||
"github.com/gobwas/ws"
|
||||
"github.com/gobwas/ws/wsutil"
|
||||
)
|
||||
|
||||
// serveWebsocket will stream rpc back over websockets assuming json
|
||||
|
@@ -3,8 +3,8 @@ package registry
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"go-micro.dev/v4/registry"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go-micro.dev/v4/registry"
|
||||
)
|
||||
|
||||
func TestStoreRegex(t *testing.T) {
|
||||
|
@@ -8,10 +8,10 @@ import (
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
"go-micro.dev/v4/api/server"
|
||||
"go-micro.dev/v4/api/server/cors"
|
||||
"go-micro.dev/v4/logger"
|
||||
"github.com/gorilla/handlers"
|
||||
)
|
||||
|
||||
type httpServer struct {
|
||||
|
@@ -16,6 +16,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go-micro.dev/v4/codec/json"
|
||||
merr "go-micro.dev/v4/errors"
|
||||
"go-micro.dev/v4/registry"
|
||||
@@ -23,7 +24,6 @@ import (
|
||||
maddr "go-micro.dev/v4/util/addr"
|
||||
mnet "go-micro.dev/v4/util/net"
|
||||
mls "go-micro.dev/v4/util/tls"
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
|
@@ -5,9 +5,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go-micro.dev/v4/broker"
|
||||
"go-micro.dev/v4/registry"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@@ -7,8 +7,8 @@ import (
|
||||
"hash/fnv"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/metadata"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
"go-micro.dev/v4/metadata"
|
||||
)
|
||||
|
||||
// NewCache returns an initialised cache.
|
||||
|
@@ -7,6 +7,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
"go-micro.dev/v4/auth"
|
||||
"go-micro.dev/v4/broker"
|
||||
"go-micro.dev/v4/cache"
|
||||
@@ -23,7 +24,6 @@ import (
|
||||
"go-micro.dev/v4/server"
|
||||
"go-micro.dev/v4/store"
|
||||
"go-micro.dev/v4/transport"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
type Cmd interface {
|
||||
|
@@ -9,7 +9,7 @@ Micro CLI is the command line interface for developing [Go Micro][1] projects.
|
||||
Installation is done by using the [`go install`][3] command.
|
||||
|
||||
```bash
|
||||
go install go-micro.dev/v4/cmd/micro@latest
|
||||
go install go-micro.dev/v4/cmd/micro@master
|
||||
```
|
||||
|
||||
Let's create a new service using the `new` command.
|
||||
|
@@ -8,8 +8,8 @@ import (
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"go-micro.dev/v4/codec"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go-micro.dev/v4/codec"
|
||||
)
|
||||
|
||||
type Codec struct {
|
||||
|
@@ -5,9 +5,9 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"go-micro.dev/v4/codec"
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go-micro.dev/v4/codec"
|
||||
)
|
||||
|
||||
type Codec struct {
|
||||
|
@@ -3,9 +3,9 @@ package proto
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"go-micro.dev/v4/codec"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/oxtoacart/bpool"
|
||||
"go-micro.dev/v4/codec"
|
||||
)
|
||||
|
||||
// create buffer pool with 16 instances each preallocated with 256 bytes
|
||||
|
@@ -4,8 +4,8 @@ package proto
|
||||
import (
|
||||
"io"
|
||||
|
||||
"go-micro.dev/v4/codec"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go-micro.dev/v4/codec"
|
||||
)
|
||||
|
||||
type Codec struct {
|
||||
|
@@ -8,8 +8,8 @@ import (
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"go-micro.dev/v4/codec"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go-micro.dev/v4/codec"
|
||||
)
|
||||
|
||||
type flusher interface {
|
||||
|
@@ -4,11 +4,11 @@ import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/imdario/mergo"
|
||||
"go-micro.dev/v4/config/encoder"
|
||||
"go-micro.dev/v4/config/encoder/json"
|
||||
"go-micro.dev/v4/config/reader"
|
||||
"go-micro.dev/v4/config/source"
|
||||
"github.com/imdario/mergo"
|
||||
)
|
||||
|
||||
type jsonReader struct {
|
||||
|
@@ -2,8 +2,8 @@
|
||||
package box
|
||||
|
||||
import (
|
||||
"go-micro.dev/v4/config/secrets"
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/config/secrets"
|
||||
naclbox "golang.org/x/crypto/nacl/box"
|
||||
|
||||
"crypto/rand"
|
||||
|
@@ -3,8 +3,8 @@
|
||||
package secretbox
|
||||
|
||||
import (
|
||||
"go-micro.dev/v4/config/secrets"
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/config/secrets"
|
||||
"golang.org/x/crypto/nacl/secretbox"
|
||||
|
||||
"crypto/rand"
|
||||
|
@@ -7,10 +7,10 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/cmd"
|
||||
"go-micro.dev/v4/config/source"
|
||||
"github.com/imdario/mergo"
|
||||
"github.com/urfave/cli/v2"
|
||||
"go-micro.dev/v4/cmd"
|
||||
"go-micro.dev/v4/config/source"
|
||||
)
|
||||
|
||||
type cliSource struct {
|
||||
|
@@ -5,11 +5,11 @@ import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
"go-micro.dev/v4"
|
||||
"go-micro.dev/v4/cmd"
|
||||
"go-micro.dev/v4/config"
|
||||
"go-micro.dev/v4/config/source"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
func TestCliSourceDefault(t *testing.T) {
|
||||
|
@@ -3,8 +3,8 @@ package cli
|
||||
import (
|
||||
"context"
|
||||
|
||||
"go-micro.dev/v4/config/source"
|
||||
"github.com/urfave/cli/v2"
|
||||
"go-micro.dev/v4/config/source"
|
||||
)
|
||||
|
||||
type contextKey struct{}
|
||||
|
2
config/source/env/env.go
vendored
2
config/source/env/env.go
vendored
@@ -6,8 +6,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/config/source"
|
||||
"github.com/imdario/mergo"
|
||||
"go-micro.dev/v4/config/source"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@@ -1,12 +1,13 @@
|
||||
//+build !linux
|
||||
//go:build !linux
|
||||
// +build !linux
|
||||
|
||||
package file
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"go-micro.dev/v4/config/source"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"go-micro.dev/v4/config/source"
|
||||
)
|
||||
|
||||
type watcher struct {
|
||||
|
@@ -1,12 +1,13 @@
|
||||
//+build linux
|
||||
//go:build linux
|
||||
// +build linux
|
||||
|
||||
package file
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"go-micro.dev/v4/config/source"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"go-micro.dev/v4/config/source"
|
||||
)
|
||||
|
||||
type watcher struct {
|
||||
|
@@ -3,8 +3,8 @@ package flag
|
||||
import (
|
||||
"errors"
|
||||
"flag"
|
||||
"go-micro.dev/v4/config/source"
|
||||
"github.com/imdario/mergo"
|
||||
"go-micro.dev/v4/config/source"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
@@ -5,8 +5,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/config/source"
|
||||
"github.com/google/uuid"
|
||||
"go-micro.dev/v4/config/source"
|
||||
)
|
||||
|
||||
type memory struct {
|
||||
|
@@ -3,8 +3,8 @@ package log
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"go-micro.dev/v4/util/ring"
|
||||
"github.com/google/uuid"
|
||||
"go-micro.dev/v4/util/ring"
|
||||
)
|
||||
|
||||
// Should stream from OS
|
||||
|
@@ -4,8 +4,8 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/util/ring"
|
||||
"github.com/google/uuid"
|
||||
"go-micro.dev/v4/util/ring"
|
||||
)
|
||||
|
||||
type memTracer struct {
|
||||
|
@@ -178,4 +178,4 @@ func (e *MultiError) HasErrors() bool {
|
||||
func (e *MultiError) Error() string {
|
||||
b, _ := json.Marshal(e)
|
||||
return string(b)
|
||||
}
|
||||
}
|
||||
|
@@ -109,23 +109,23 @@ func TestAppend(t *testing.T) {
|
||||
Status: http.StatusText(500),
|
||||
},
|
||||
{
|
||||
Id: "test2",
|
||||
Code: 400,
|
||||
Detail: "Bad Request",
|
||||
Id: "test2",
|
||||
Code: 400,
|
||||
Detail: "Bad Request",
|
||||
Status: http.StatusText(400),
|
||||
},
|
||||
{
|
||||
Id: "test3",
|
||||
Code: 404,
|
||||
Detail: "Not Found",
|
||||
Status: http.StatusText(404),
|
||||
Status: http.StatusText(404),
|
||||
},
|
||||
}
|
||||
|
||||
for _, e := range testData {
|
||||
mError.Append(&Error{
|
||||
Id: e.Id,
|
||||
Code: e.Code,
|
||||
Id: e.Id,
|
||||
Code: e.Code,
|
||||
Detail: e.Detail,
|
||||
Status: e.Status,
|
||||
})
|
||||
@@ -146,16 +146,16 @@ func TestHasErrors(t *testing.T) {
|
||||
Status: http.StatusText(500),
|
||||
},
|
||||
{
|
||||
Id: "test2",
|
||||
Code: 400,
|
||||
Detail: "Bad Request",
|
||||
Id: "test2",
|
||||
Code: 400,
|
||||
Detail: "Bad Request",
|
||||
Status: http.StatusText(400),
|
||||
},
|
||||
{
|
||||
Id: "test3",
|
||||
Code: 404,
|
||||
Detail: "Not Found",
|
||||
Status: http.StatusText(404),
|
||||
Status: http.StatusText(404),
|
||||
},
|
||||
}
|
||||
|
||||
@@ -165,8 +165,8 @@ func TestHasErrors(t *testing.T) {
|
||||
|
||||
for _, e := range testData {
|
||||
mError.Errors = append(mError.Errors, &Error{
|
||||
Id: e.Id,
|
||||
Code: e.Code,
|
||||
Id: e.Id,
|
||||
Code: e.Code,
|
||||
Detail: e.Detail,
|
||||
Status: e.Status,
|
||||
})
|
||||
@@ -175,4 +175,4 @@ func TestHasErrors(t *testing.T) {
|
||||
if !mError.HasErrors() {
|
||||
t.Fatal("Expected errors")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
92
events/events.go
Normal file
92
events/events.go
Normal file
@@ -0,0 +1,92 @@
|
||||
// Package events is for event streaming and storage
|
||||
package events
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// DefaultStream is the default events stream implementation
|
||||
DefaultStream Stream
|
||||
// DefaultStore is the default events store implementation
|
||||
DefaultStore Store
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrMissingTopic is returned if a blank topic was provided to publish
|
||||
ErrMissingTopic = errors.New("Missing topic")
|
||||
// ErrEncodingMessage is returned from publish if there was an error encoding the message option
|
||||
ErrEncodingMessage = errors.New("Error encoding message")
|
||||
)
|
||||
|
||||
// Stream is an event streaming interface
|
||||
type Stream interface {
|
||||
Publish(topic string, msg interface{}, opts ...PublishOption) error
|
||||
Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
|
||||
}
|
||||
|
||||
// Store is an event store interface
|
||||
type Store interface {
|
||||
Read(topic string, opts ...ReadOption) ([]*Event, error)
|
||||
Write(event *Event, opts ...WriteOption) error
|
||||
}
|
||||
|
||||
type AckFunc func() error
|
||||
type NackFunc func() error
|
||||
|
||||
// Event is the object returned by the broker when you subscribe to a topic
|
||||
type Event struct {
|
||||
// ID to uniquely identify the event
|
||||
ID string
|
||||
// Topic of event, e.g. "registry.service.created"
|
||||
Topic string
|
||||
// Timestamp of the event
|
||||
Timestamp time.Time
|
||||
// Metadata contains the values the event was indexed by
|
||||
Metadata map[string]string
|
||||
// Payload contains the encoded message
|
||||
Payload []byte
|
||||
|
||||
ackFunc AckFunc
|
||||
nackFunc NackFunc
|
||||
}
|
||||
|
||||
// Unmarshal the events message into an object
|
||||
func (e *Event) Unmarshal(v interface{}) error {
|
||||
return json.Unmarshal(e.Payload, v)
|
||||
}
|
||||
|
||||
// Ack acknowledges successful processing of the event in ManualAck mode
|
||||
func (e *Event) Ack() error {
|
||||
return e.ackFunc()
|
||||
}
|
||||
|
||||
func (e *Event) SetAckFunc(f AckFunc) {
|
||||
e.ackFunc = f
|
||||
}
|
||||
|
||||
// Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode
|
||||
func (e *Event) Nack() error {
|
||||
return e.nackFunc()
|
||||
}
|
||||
|
||||
func (e *Event) SetNackFunc(f NackFunc) {
|
||||
e.nackFunc = f
|
||||
}
|
||||
|
||||
// Publish an event to a topic
|
||||
func Publish(topic string, msg interface{}, opts ...PublishOption) error {
|
||||
return DefaultStream.Publish(topic, msg, opts...)
|
||||
}
|
||||
|
||||
// Consume to events
|
||||
func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) {
|
||||
return DefaultStream.Consume(topic, opts...)
|
||||
}
|
||||
|
||||
// Read events for a topic
|
||||
func Read(topic string, opts ...ReadOption) ([]*Event, error) {
|
||||
return DefaultStore.Read(topic, opts...)
|
||||
}
|
244
events/memory.go
Normal file
244
events/memory.go
Normal file
@@ -0,0 +1,244 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
// NewStream returns an initialized memory stream
|
||||
func NewStream(opts ...Option) (Stream, error) {
|
||||
// parse the options
|
||||
var options Options
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
return &mem{store: store.NewMemoryStore()}, nil
|
||||
}
|
||||
|
||||
type subscriber struct {
|
||||
Group string
|
||||
Topic string
|
||||
Channel chan Event
|
||||
|
||||
sync.RWMutex
|
||||
retryMap map[string]int
|
||||
retryLimit int
|
||||
autoAck bool
|
||||
ackWait time.Duration
|
||||
}
|
||||
|
||||
type mem struct {
|
||||
store store.Store
|
||||
|
||||
subs []*subscriber
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (m *mem) Publish(topic string, msg interface{}, opts ...PublishOption) error {
|
||||
// validate the topic
|
||||
if len(topic) == 0 {
|
||||
return ErrMissingTopic
|
||||
}
|
||||
|
||||
// parse the options
|
||||
options := PublishOptions{
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// encode the message if it's not already encoded
|
||||
var payload []byte
|
||||
if p, ok := msg.([]byte); ok {
|
||||
payload = p
|
||||
} else {
|
||||
p, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return ErrEncodingMessage
|
||||
}
|
||||
payload = p
|
||||
}
|
||||
|
||||
// construct the event
|
||||
event := &Event{
|
||||
ID: uuid.New().String(),
|
||||
Topic: topic,
|
||||
Timestamp: options.Timestamp,
|
||||
Metadata: options.Metadata,
|
||||
Payload: payload,
|
||||
}
|
||||
|
||||
// serialize the event to bytes
|
||||
bytes, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Error encoding event")
|
||||
}
|
||||
|
||||
// write to the store
|
||||
key := fmt.Sprintf("%v/%v", event.Topic, event.ID)
|
||||
if err := m.store.Write(&store.Record{Key: key, Value: bytes}); err != nil {
|
||||
return errors.Wrap(err, "Error writing event to store")
|
||||
}
|
||||
|
||||
// send to the subscribers async
|
||||
go m.handleEvent(event)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mem) Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) {
|
||||
// validate the topic
|
||||
if len(topic) == 0 {
|
||||
return nil, ErrMissingTopic
|
||||
}
|
||||
|
||||
// parse the options
|
||||
options := ConsumeOptions{
|
||||
Group: uuid.New().String(),
|
||||
AutoAck: true,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
// TODO RetryLimit
|
||||
|
||||
// setup the subscriber
|
||||
sub := &subscriber{
|
||||
Channel: make(chan Event),
|
||||
Topic: topic,
|
||||
Group: options.Group,
|
||||
retryMap: map[string]int{},
|
||||
autoAck: true,
|
||||
retryLimit: options.GetRetryLimit(),
|
||||
}
|
||||
|
||||
if !options.AutoAck {
|
||||
if options.AckWait == 0 {
|
||||
return nil, fmt.Errorf("invalid AckWait passed, should be positive integer")
|
||||
}
|
||||
sub.autoAck = options.AutoAck
|
||||
sub.ackWait = options.AckWait
|
||||
}
|
||||
|
||||
// register the subscriber
|
||||
m.Lock()
|
||||
m.subs = append(m.subs, sub)
|
||||
m.Unlock()
|
||||
|
||||
// lookup previous events if the start time option was passed
|
||||
if options.Offset.Unix() > 0 {
|
||||
go m.lookupPreviousEvents(sub, options.Offset)
|
||||
}
|
||||
|
||||
// return the channel
|
||||
return sub.Channel, nil
|
||||
}
|
||||
|
||||
// lookupPreviousEvents finds events for a subscriber which occurred before a given time and sends
|
||||
// them into the subscribers channel
|
||||
func (m *mem) lookupPreviousEvents(sub *subscriber, startTime time.Time) {
|
||||
// lookup all events which match the topic (a blank topic will return all results)
|
||||
recs, err := m.store.Read(sub.Topic+"/", store.ReadPrefix())
|
||||
if err != nil && logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("Error looking up previous events: %v", err)
|
||||
return
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// loop through the records and send it to the channel if it matches
|
||||
for _, r := range recs {
|
||||
var ev Event
|
||||
if err := json.Unmarshal(r.Value, &ev); err != nil {
|
||||
continue
|
||||
}
|
||||
if ev.Timestamp.Unix() < startTime.Unix() {
|
||||
continue
|
||||
}
|
||||
sendEvent(&ev, sub)
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvents sends the event to any registered subscribers.
|
||||
func (m *mem) handleEvent(ev *Event) {
|
||||
m.RLock()
|
||||
subs := m.subs
|
||||
m.RUnlock()
|
||||
|
||||
// filteredSubs is a KV map of the queue name and subscribers. This is used to prevent a message
|
||||
// being sent to two subscribers with the same queue.
|
||||
filteredSubs := map[string]*subscriber{}
|
||||
|
||||
// filter down to subscribers who are interested in this topic
|
||||
for _, sub := range subs {
|
||||
if len(sub.Topic) == 0 || sub.Topic == ev.Topic {
|
||||
filteredSubs[sub.Group] = sub
|
||||
}
|
||||
}
|
||||
|
||||
// send the message to each channel async (since one channel might be blocked)
|
||||
for _, sub := range filteredSubs {
|
||||
sendEvent(ev, sub)
|
||||
}
|
||||
}
|
||||
|
||||
func sendEvent(ev *Event, sub *subscriber) {
|
||||
go func(s *subscriber) {
|
||||
evCopy := *ev
|
||||
if s.autoAck {
|
||||
s.Channel <- evCopy
|
||||
return
|
||||
}
|
||||
evCopy.SetAckFunc(ackFunc(s, evCopy))
|
||||
evCopy.SetNackFunc(nackFunc(s, evCopy))
|
||||
s.retryMap[evCopy.ID] = 0
|
||||
tick := time.NewTicker(s.ackWait)
|
||||
defer tick.Stop()
|
||||
for range tick.C {
|
||||
s.Lock()
|
||||
count, ok := s.retryMap[evCopy.ID]
|
||||
s.Unlock()
|
||||
if !ok {
|
||||
// success
|
||||
break
|
||||
}
|
||||
|
||||
if s.retryLimit > -1 && count > s.retryLimit {
|
||||
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
|
||||
logger.Errorf("Message retry limit reached, discarding: %v %d %d", evCopy.ID, count, s.retryLimit)
|
||||
}
|
||||
s.Lock()
|
||||
delete(s.retryMap, evCopy.ID)
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
s.Channel <- evCopy
|
||||
s.Lock()
|
||||
s.retryMap[evCopy.ID] = count + 1
|
||||
s.Unlock()
|
||||
}
|
||||
}(sub)
|
||||
}
|
||||
|
||||
func ackFunc(s *subscriber, evCopy Event) func() error {
|
||||
return func() error {
|
||||
s.Lock()
|
||||
delete(s.retryMap, evCopy.ID)
|
||||
s.Unlock()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func nackFunc(s *subscriber, evCopy Event) func() error {
|
||||
return func() error {
|
||||
return nil
|
||||
}
|
||||
}
|
144
events/options.go
Normal file
144
events/options.go
Normal file
@@ -0,0 +1,144 @@
|
||||
package events
|
||||
|
||||
import "time"
|
||||
|
||||
type Options struct{}
|
||||
|
||||
type Option func(o *Options)
|
||||
|
||||
type StoreOptions struct {
|
||||
TTL time.Duration
|
||||
Backup Backup
|
||||
}
|
||||
|
||||
type StoreOption func(o *StoreOptions)
|
||||
|
||||
// PublishOptions contains all the options which can be provided when publishing an event
|
||||
type PublishOptions struct {
|
||||
// Metadata contains any keys which can be used to query the data, for example a customer id
|
||||
Metadata map[string]string
|
||||
// Timestamp to set for the event, if the timestamp is a zero value, the current time will be used
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// PublishOption sets attributes on PublishOptions
|
||||
type PublishOption func(o *PublishOptions)
|
||||
|
||||
// WithMetadata sets the Metadata field on PublishOptions
|
||||
func WithMetadata(md map[string]string) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.Metadata = md
|
||||
}
|
||||
}
|
||||
|
||||
// WithTimestamp sets the timestamp field on PublishOptions
|
||||
func WithTimestamp(t time.Time) PublishOption {
|
||||
return func(o *PublishOptions) {
|
||||
o.Timestamp = t
|
||||
}
|
||||
}
|
||||
|
||||
// ConsumeOptions contains all the options which can be provided when subscribing to a topic
|
||||
type ConsumeOptions struct {
|
||||
// Group is the name of the consumer group, if two consumers have the same group the events
|
||||
// are distributed between them
|
||||
Group string
|
||||
// Offset is the time from which the messages should be consumed from. If not provided then
|
||||
// the messages will be consumed starting from the moment the Subscription starts.
|
||||
Offset time.Time
|
||||
// AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered.
|
||||
// If false specifies that each message need ts to be manually acknowledged by the subscriber.
|
||||
// If processing is successful the message should be ack'ed to remove the message from the stream.
|
||||
// If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will
|
||||
// remain on the stream to be processed again.
|
||||
AutoAck bool
|
||||
AckWait time.Duration
|
||||
// RetryLimit indicates number of times a message is retried
|
||||
RetryLimit int
|
||||
// CustomRetries indicates whether to use RetryLimit
|
||||
CustomRetries bool
|
||||
}
|
||||
|
||||
// ConsumeOption sets attributes on ConsumeOptions
|
||||
type ConsumeOption func(o *ConsumeOptions)
|
||||
|
||||
// WithGroup sets the consumer group to be part of when consuming events
|
||||
func WithGroup(q string) ConsumeOption {
|
||||
return func(o *ConsumeOptions) {
|
||||
o.Group = q
|
||||
}
|
||||
}
|
||||
|
||||
// WithOffset sets the offset time at which to start consuming events
|
||||
func WithOffset(t time.Time) ConsumeOption {
|
||||
return func(o *ConsumeOptions) {
|
||||
o.Offset = t
|
||||
}
|
||||
}
|
||||
|
||||
// WithAutoAck sets the AutoAck field on ConsumeOptions and an ackWait duration after which if no ack is received
|
||||
// the message is requeued in case auto ack is turned off
|
||||
func WithAutoAck(ack bool, ackWait time.Duration) ConsumeOption {
|
||||
return func(o *ConsumeOptions) {
|
||||
o.AutoAck = ack
|
||||
o.AckWait = ackWait
|
||||
}
|
||||
}
|
||||
|
||||
// WithRetryLimit sets the RetryLimit field on ConsumeOptions.
|
||||
// Set to -1 for infinite retries (default)
|
||||
func WithRetryLimit(retries int) ConsumeOption {
|
||||
return func(o *ConsumeOptions) {
|
||||
o.RetryLimit = retries
|
||||
o.CustomRetries = true
|
||||
}
|
||||
}
|
||||
|
||||
func (s ConsumeOptions) GetRetryLimit() int {
|
||||
if !s.CustomRetries {
|
||||
return -1
|
||||
}
|
||||
return s.RetryLimit
|
||||
}
|
||||
|
||||
// WriteOptions contains all the options which can be provided when writing an event to a store
|
||||
type WriteOptions struct {
|
||||
// TTL is the duration the event should be recorded for, a zero value TTL indicates the event should
|
||||
// be stored indefinately
|
||||
TTL time.Duration
|
||||
}
|
||||
|
||||
// WriteOption sets attributes on WriteOptions
|
||||
type WriteOption func(o *WriteOptions)
|
||||
|
||||
// WithTTL sets the TTL attribute on WriteOptions
|
||||
func WithTTL(d time.Duration) WriteOption {
|
||||
return func(o *WriteOptions) {
|
||||
o.TTL = d
|
||||
}
|
||||
}
|
||||
|
||||
// ReadOptions contains all the options which can be provided when reading events from a store
|
||||
type ReadOptions struct {
|
||||
// Limit the number of results to return
|
||||
Limit uint
|
||||
// Offset the results by this number, useful for paginated queries
|
||||
Offset uint
|
||||
}
|
||||
|
||||
// ReadOption sets attributes on ReadOptions
|
||||
type ReadOption func(o *ReadOptions)
|
||||
|
||||
// ReadLimit sets the limit attribute on ReadOptions
|
||||
func ReadLimit(l uint) ReadOption {
|
||||
return func(o *ReadOptions) {
|
||||
o.Limit = 1
|
||||
}
|
||||
}
|
||||
|
||||
// ReadOffset sets the offset attribute on ReadOptions
|
||||
func ReadOffset(l uint) ReadOption {
|
||||
return func(o *ReadOptions) {
|
||||
o.Offset = 1
|
||||
}
|
||||
}
|
127
events/store.go
Normal file
127
events/store.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
const joinKey = "/"
|
||||
|
||||
// NewStore returns an initialized events store
|
||||
func NewStore(opts ...StoreOption) Store {
|
||||
// parse the options
|
||||
var options StoreOptions
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
if options.TTL.Seconds() == 0 {
|
||||
options.TTL = time.Hour * 24
|
||||
}
|
||||
|
||||
// return the store
|
||||
evs := &evStore{
|
||||
opts: options,
|
||||
store: store.NewMemoryStore(),
|
||||
}
|
||||
if options.Backup != nil {
|
||||
go evs.backupLoop()
|
||||
}
|
||||
return evs
|
||||
}
|
||||
|
||||
type evStore struct {
|
||||
opts StoreOptions
|
||||
store store.Store
|
||||
}
|
||||
|
||||
// Read events for a topic
|
||||
func (s *evStore) Read(topic string, opts ...ReadOption) ([]*Event, error) {
|
||||
// validate the topic
|
||||
if len(topic) == 0 {
|
||||
return nil, ErrMissingTopic
|
||||
}
|
||||
|
||||
// parse the options
|
||||
options := ReadOptions{
|
||||
Offset: 0,
|
||||
Limit: 250,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// execute the request
|
||||
recs, err := s.store.Read(topic+joinKey,
|
||||
store.ReadPrefix(),
|
||||
store.ReadLimit(options.Limit),
|
||||
store.ReadOffset(options.Offset),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Error reading from store")
|
||||
}
|
||||
|
||||
// unmarshal the result
|
||||
result := make([]*Event, len(recs))
|
||||
for i, r := range recs {
|
||||
var e Event
|
||||
if err := json.Unmarshal(r.Value, &e); err != nil {
|
||||
return nil, errors.Wrap(err, "Invalid event returned from stroe")
|
||||
}
|
||||
result[i] = &e
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Write an event to the store
|
||||
func (s *evStore) Write(event *Event, opts ...WriteOption) error {
|
||||
// parse the options
|
||||
options := WriteOptions{
|
||||
TTL: s.opts.TTL,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
// construct the store record
|
||||
bytes, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Error mashaling event to JSON")
|
||||
}
|
||||
// suffix event ID with hour resolution for easy retrieval in batches
|
||||
timeSuffix := time.Now().Format("2006010215")
|
||||
|
||||
record := &store.Record{
|
||||
// key is such that reading by prefix indexes by topic and reading by suffix indexes by time
|
||||
Key: event.Topic + joinKey + event.ID + joinKey + timeSuffix,
|
||||
Value: bytes,
|
||||
Expiry: options.TTL,
|
||||
}
|
||||
|
||||
// write the record to the store
|
||||
if err := s.store.Write(record); err != nil {
|
||||
return errors.Wrap(err, "Error writing to the store")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *evStore) backupLoop() {
|
||||
for {
|
||||
err := s.opts.Backup.Snapshot(s.store)
|
||||
if err != nil {
|
||||
logger.Errorf("Error running backup %s", err)
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Hour)
|
||||
}
|
||||
}
|
||||
|
||||
// Backup is an interface for snapshotting the events store to long term storage
|
||||
type Backup interface {
|
||||
Snapshot(st store.Store) error
|
||||
}
|
47
events/store_test.go
Normal file
47
events/store_test.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestStore(t *testing.T) {
|
||||
store := NewStore()
|
||||
|
||||
testData := []Event{
|
||||
{ID: uuid.New().String(), Topic: "foo"},
|
||||
{ID: uuid.New().String(), Topic: "foo"},
|
||||
{ID: uuid.New().String(), Topic: "bar"},
|
||||
}
|
||||
|
||||
// write the records to the store
|
||||
t.Run("Write", func(t *testing.T) {
|
||||
for _, event := range testData {
|
||||
err := store.Write(&event)
|
||||
assert.Nilf(t, err, "Writing an event should not return an error")
|
||||
}
|
||||
})
|
||||
|
||||
// should not be able to read events from a blank topic
|
||||
t.Run("ReadMissingTopic", func(t *testing.T) {
|
||||
evs, err := store.Read("")
|
||||
assert.Equal(t, err, ErrMissingTopic, "Reading a blank topic should return an error")
|
||||
assert.Nil(t, evs, "No events should be returned")
|
||||
})
|
||||
|
||||
// should only get the events from the topic requested
|
||||
t.Run("ReadTopic", func(t *testing.T) {
|
||||
evs, err := store.Read("foo")
|
||||
assert.Nilf(t, err, "No error should be returned")
|
||||
assert.Len(t, evs, 2, "Only the events for this topic should be returned")
|
||||
})
|
||||
|
||||
// limits should be honoured
|
||||
t.Run("ReadTopicLimit", func(t *testing.T) {
|
||||
evs, err := store.Read("foo", ReadLimit(1))
|
||||
assert.Nilf(t, err, "No error should be returned")
|
||||
assert.Len(t, evs, 1, "The result should include no more than the read limit")
|
||||
})
|
||||
}
|
241
events/stream_test.go
Normal file
241
events/stream_test.go
Normal file
@@ -0,0 +1,241 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type testPayload struct {
|
||||
Message string
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
str Stream
|
||||
name string
|
||||
}
|
||||
|
||||
func TestStream(t *testing.T) {
|
||||
tcs := []testCase{}
|
||||
|
||||
stream, err := NewStream()
|
||||
assert.Nilf(t, err, "NewStream should not return an error")
|
||||
assert.NotNilf(t, stream, "NewStream should return a stream object")
|
||||
tcs = append(tcs, testCase{str: stream, name: "memory"})
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
runTestStream(t, tc.str)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func runTestStream(t *testing.T, stream Stream) {
|
||||
// TestMissingTopic will test the topic validation on publish
|
||||
t.Run("TestMissingTopic", func(t *testing.T) {
|
||||
err := stream.Publish("", nil)
|
||||
assert.Equalf(t, err, ErrMissingTopic, "Publishing to a blank topic should return an error")
|
||||
})
|
||||
|
||||
// TestConsumeTopic will publish a message to the test topic. The subscriber will subscribe to the
|
||||
// same test topic.
|
||||
t.Run("TestConsumeTopic", func(t *testing.T) {
|
||||
payload := &testPayload{Message: "HelloWorld"}
|
||||
metadata := map[string]string{"foo": "bar"}
|
||||
|
||||
// create the subscriber
|
||||
evChan, err := stream.Consume("test")
|
||||
assert.Nilf(t, err, "Consume should not return an error")
|
||||
|
||||
// setup the subscriber async
|
||||
var wg sync.WaitGroup
|
||||
|
||||
go func() {
|
||||
timeout := time.NewTimer(time.Millisecond * 250)
|
||||
|
||||
select {
|
||||
case event, _ := <-evChan:
|
||||
assert.NotNilf(t, event, "The message was nil")
|
||||
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
|
||||
|
||||
var result testPayload
|
||||
err = event.Unmarshal(&result)
|
||||
assert.Nil(t, err, "Error decoding result")
|
||||
assert.Equal(t, result, *payload, "Payload didn't match")
|
||||
|
||||
wg.Done()
|
||||
case <-timeout.C:
|
||||
t.Fatalf("Event was not recieved")
|
||||
}
|
||||
}()
|
||||
|
||||
err = stream.Publish("test", payload, WithMetadata(metadata))
|
||||
assert.Nil(t, err, "Publishing a valid message should not return an error")
|
||||
wg.Add(1)
|
||||
|
||||
// wait for the subscriber to recieve the message or timeout
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
// TestConsumeGroup will publish a message to a random topic. Two subscribers will then consume
|
||||
// the message from the firehose topic with different queues. The second subscriber will be registered
|
||||
// after the message is published to test durability.
|
||||
t.Run("TestConsumeGroup", func(t *testing.T) {
|
||||
topic := uuid.New().String()
|
||||
payload := &testPayload{Message: "HelloWorld"}
|
||||
metadata := map[string]string{"foo": "bar"}
|
||||
|
||||
// create the first subscriber
|
||||
evChan1, err := stream.Consume(topic)
|
||||
assert.Nilf(t, err, "Consume should not return an error")
|
||||
|
||||
// setup the subscriber async
|
||||
var wg sync.WaitGroup
|
||||
|
||||
go func() {
|
||||
timeout := time.NewTimer(time.Millisecond * 250)
|
||||
|
||||
select {
|
||||
case event, _ := <-evChan1:
|
||||
assert.NotNilf(t, event, "The message was nil")
|
||||
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
|
||||
|
||||
var result testPayload
|
||||
err = event.Unmarshal(&result)
|
||||
assert.Nil(t, err, "Error decoding result")
|
||||
assert.Equal(t, result, *payload, "Payload didn't match")
|
||||
|
||||
wg.Done()
|
||||
case <-timeout.C:
|
||||
t.Fatalf("Event was not recieved")
|
||||
}
|
||||
}()
|
||||
|
||||
err = stream.Publish(topic, payload, WithMetadata(metadata))
|
||||
assert.Nil(t, err, "Publishing a valid message should not return an error")
|
||||
wg.Add(2)
|
||||
|
||||
// create the second subscriber
|
||||
evChan2, err := stream.Consume(topic,
|
||||
WithGroup("second_queue"),
|
||||
WithOffset(time.Now().Add(time.Minute*-1)),
|
||||
)
|
||||
assert.Nilf(t, err, "Consume should not return an error")
|
||||
|
||||
go func() {
|
||||
timeout := time.NewTimer(time.Second * 1)
|
||||
|
||||
select {
|
||||
case event, _ := <-evChan2:
|
||||
assert.NotNilf(t, event, "The message was nil")
|
||||
assert.Equal(t, event.Metadata, metadata, "Metadata didn't match")
|
||||
|
||||
var result testPayload
|
||||
err = event.Unmarshal(&result)
|
||||
assert.Nil(t, err, "Error decoding result")
|
||||
assert.Equal(t, result, *payload, "Payload didn't match")
|
||||
|
||||
wg.Done()
|
||||
case <-timeout.C:
|
||||
t.Fatalf("Event was not recieved")
|
||||
}
|
||||
}()
|
||||
|
||||
// wait for the subscriber to recieve the message or timeout
|
||||
wg.Wait()
|
||||
})
|
||||
|
||||
t.Run("AckingNacking", func(t *testing.T) {
|
||||
ch, err := stream.Consume("foobarAck", WithAutoAck(false, 5*time.Second))
|
||||
assert.NoError(t, err, "Unexpected error subscribing")
|
||||
assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 1"}))
|
||||
assert.NoError(t, stream.Publish("foobarAck", map[string]string{"foo": "message 2"}))
|
||||
|
||||
ev := <-ch
|
||||
ev.Ack()
|
||||
ev = <-ch
|
||||
nacked := ev.ID
|
||||
ev.Nack()
|
||||
select {
|
||||
case ev = <-ch:
|
||||
assert.Equal(t, ev.ID, nacked, "Nacked message should have been received again")
|
||||
assert.NoError(t, ev.Ack())
|
||||
case <-time.After(7 * time.Second):
|
||||
t.Fatalf("Timed out waiting for message to be put back on queue")
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("Retries", func(t *testing.T) {
|
||||
ch, err := stream.Consume("foobarRetries", WithAutoAck(false, 5*time.Second), WithRetryLimit(1))
|
||||
assert.NoError(t, err, "Unexpected error subscribing")
|
||||
assert.NoError(t, stream.Publish("foobarRetries", map[string]string{"foo": "message 1"}))
|
||||
|
||||
ev := <-ch
|
||||
id := ev.ID
|
||||
ev.Nack()
|
||||
ev = <-ch
|
||||
assert.Equal(t, id, ev.ID, "Nacked message should have been received again")
|
||||
ev.Nack()
|
||||
select {
|
||||
case ev = <-ch:
|
||||
t.Fatalf("Unexpected event received")
|
||||
case <-time.After(7 * time.Second):
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("InfiniteRetries", func(t *testing.T) {
|
||||
ch, err := stream.Consume("foobarRetriesInf", WithAutoAck(false, 2*time.Second))
|
||||
assert.NoError(t, err, "Unexpected error subscribing")
|
||||
assert.NoError(t, stream.Publish("foobarRetriesInf", map[string]string{"foo": "message 1"}))
|
||||
|
||||
count := 0
|
||||
id := ""
|
||||
for {
|
||||
select {
|
||||
case ev := <-ch:
|
||||
if id != "" {
|
||||
assert.Equal(t, id, ev.ID, "Nacked message should have been received again")
|
||||
}
|
||||
id = ev.ID
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("Unexpected event received")
|
||||
}
|
||||
|
||||
count++
|
||||
if count == 11 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
t.Run("twoSubs", func(t *testing.T) {
|
||||
ch1, err := stream.Consume("foobarTwoSubs1", WithAutoAck(false, 5*time.Second))
|
||||
assert.NoError(t, err, "Unexpected error subscribing to topic 1")
|
||||
ch2, err := stream.Consume("foobarTwoSubs2", WithAutoAck(false, 5*time.Second))
|
||||
assert.NoError(t, err, "Unexpected error subscribing to topic 2")
|
||||
|
||||
assert.NoError(t, stream.Publish("foobarTwoSubs2", map[string]string{"foo": "message 1"}))
|
||||
assert.NoError(t, stream.Publish("foobarTwoSubs1", map[string]string{"foo": "message 1"}))
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
ev := <-ch1
|
||||
assert.Equal(t, "foobarTwoSubs1", ev.Topic, "Received message from unexpected topic")
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
ev := <-ch2
|
||||
assert.Equal(t, "foobarTwoSubs2", ev.Topic, "Received message from unexpected topic")
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
})
|
||||
}
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
"go-micro.dev/v4/auth"
|
||||
"go-micro.dev/v4/broker"
|
||||
"go-micro.dev/v4/cache"
|
||||
@@ -18,7 +19,6 @@ import (
|
||||
"go-micro.dev/v4/server"
|
||||
"go-micro.dev/v4/store"
|
||||
"go-micro.dev/v4/transport"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
// Options for micro service
|
||||
|
@@ -12,6 +12,8 @@ import (
|
||||
"google.golang.org/grpc/encoding"
|
||||
"google.golang.org/protobuf/encoding/protojson"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/runtime/protoiface"
|
||||
"google.golang.org/protobuf/runtime/protoimpl"
|
||||
)
|
||||
|
||||
type jsonCodec struct{}
|
||||
@@ -66,16 +68,24 @@ func (protoCodec) Marshal(v interface{}) ([]byte, error) {
|
||||
return m.Data, nil
|
||||
case proto.Message:
|
||||
return proto.Marshal(m)
|
||||
case protoiface.MessageV1:
|
||||
// #2333 compatible with etcd legacy proto.Message
|
||||
m2 := protoimpl.X.ProtoMessageV2Of(m)
|
||||
return proto.Marshal(m2)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to marshal: %v is not type of *bytes.Frame or proto.Message", v)
|
||||
}
|
||||
|
||||
func (protoCodec) Unmarshal(data []byte, v interface{}) error {
|
||||
m, ok := v.(proto.Message)
|
||||
if !ok {
|
||||
return fmt.Errorf("failed to unmarshal: %v is not type of proto.Message", v)
|
||||
switch m := v.(type) {
|
||||
case proto.Message:
|
||||
return proto.Unmarshal(data, m)
|
||||
case protoiface.MessageV1:
|
||||
// #2333 compatible with etcd legacy proto.Message
|
||||
m2 := protoimpl.X.ProtoMessageV2Of(m)
|
||||
return proto.Unmarshal(data, m2)
|
||||
}
|
||||
return proto.Unmarshal(data, m)
|
||||
return fmt.Errorf("failed to unmarshal: %v is not type of proto.Message", v)
|
||||
}
|
||||
|
||||
func (protoCodec) Name() string {
|
||||
|
@@ -3,10 +3,10 @@ module github.com/asim/go-micro/plugins/client/grpc/v4
|
||||
go 1.16
|
||||
|
||||
require (
|
||||
go-micro.dev/v4 v4.2.1
|
||||
google.golang.org/grpc v1.41.0
|
||||
google.golang.org/grpc/examples v0.0.0-20211020220737-f00baa6c3c84
|
||||
google.golang.org/protobuf v1.26.0
|
||||
go-micro.dev/v4 v4.3.0
|
||||
google.golang.org/grpc v1.42.0
|
||||
google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e
|
||||
google.golang.org/protobuf v1.27.1
|
||||
)
|
||||
|
||||
replace go-micro.dev/v4 => ../../../../go-micro
|
||||
|
@@ -42,6 +42,7 @@ github.com/Azure/go-autorest/tracing v0.1.0/go.mod h1:ROEEAFwXycQw7Sn3DXNtEedEvd
|
||||
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
|
||||
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
|
||||
github.com/Microsoft/go-winio v0.4.16-0.20201130162521-d1ffc52c7331/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugXOPRXwdLnMv0=
|
||||
github.com/Microsoft/go-winio v0.4.16/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugXOPRXwdLnMv0=
|
||||
@@ -56,6 +57,7 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
|
||||
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
|
||||
github.com/acomagu/bufpipe v1.0.3 h1:fxAGrHZTgQ9w5QqVItgzwj235/uYZYgbXitB+dLupOk=
|
||||
github.com/acomagu/bufpipe v1.0.3/go.mod h1:mxdxdup/WdsKVreO5GpW4+M/1CE2sMG4jeGJ2sYmHc4=
|
||||
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
|
||||
github.com/akamai/AkamaiOPEN-edgegrid-golang v1.1.0/go.mod h1:kX6YddBkXqqywAe8c9LyvgTCyFuZCTMF4cRPQhc3Fy8=
|
||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
@@ -94,7 +96,10 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
|
||||
github.com/cloudflare/cloudflare-go v0.14.0/go.mod h1:EnwdgGMaFOruiPZRFSgn+TsQ3hQ7C/YWzIGLeu5c304=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
|
||||
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI=
|
||||
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
|
||||
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
|
||||
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
|
||||
github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59/go.mod h1:pA0z1pT8KYB3TCXK/ocprsh7MAkoW8bZVzPdih9snmM=
|
||||
github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw=
|
||||
github.com/containerd/containerd v1.3.2/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
|
||||
@@ -149,6 +154,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
|
||||
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
|
||||
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
|
||||
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
|
||||
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
@@ -190,6 +196,7 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A=
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
|
||||
@@ -225,6 +232,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
@@ -253,7 +261,6 @@ github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv
|
||||
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
|
||||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
|
||||
@@ -307,6 +314,7 @@ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1
|
||||
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
|
||||
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
|
||||
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
|
||||
github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351 h1:DowS9hvgyYSX4TO5NpyC606/Z4SxnNYbT+WX27or6Ck=
|
||||
github.com/kevinburke/ssh_config v0.0.0-20201106050909-4977a11b4351/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM=
|
||||
@@ -335,7 +343,6 @@ github.com/liquidweb/go-lwApi v0.0.0-20190605172801-52a4864d2738/go.mod h1:0sYF9
|
||||
github.com/liquidweb/go-lwApi v0.0.5/go.mod h1:0sYF9rMXb0vlG+4SzdiGMXHheCZxjguMq+Zb4S2BfBs=
|
||||
github.com/liquidweb/liquidweb-cli v0.6.9/go.mod h1:cE1uvQ+x24NGUL75D0QagOFCG8Wdvmwu8aL9TLmA/eQ=
|
||||
github.com/liquidweb/liquidweb-go v1.6.3/go.mod h1:SuXXp+thr28LnjEw18AYtWwIbWMHSUiajPQs8T9c/Rc=
|
||||
github.com/m3o/m3o-go/client v0.0.0-20210421144725-8bfd7992ada3/go.mod h1:vmeaYrKYpgVNhny/l7iH8mXS88S7ijUiYni3gZUrCq0=
|
||||
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/magiconair/properties v1.8.4/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
|
||||
github.com/matryer/is v1.2.0 h1:92UTHpy8CDwaJ08GqLDzhhuixiBUUD1p3AU6PHddz4A=
|
||||
@@ -403,6 +410,7 @@ github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zM
|
||||
github.com/opencontainers/runc v0.0.0-20190115041553-12f6a991201f/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U=
|
||||
github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U=
|
||||
github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
|
||||
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
|
||||
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
|
||||
github.com/oracle/oci-go-sdk v24.3.0+incompatible/go.mod h1:VQb79nF8Z2cwLkLS35ukwStZIg5F66tcBccjip/j888=
|
||||
github.com/ovh/go-ovh v1.1.0/go.mod h1:AxitLZ5HBRPyUd+Zl60Ajaag+rNTdVXWIkzfrVuTXWA=
|
||||
@@ -500,6 +508,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69
|
||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||
github.com/transip/gotransip/v6 v6.2.0/go.mod h1:pQZ36hWWRahCUXkFWlx9Hs711gLd8J4qdgLdRzmtY+g=
|
||||
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
|
||||
github.com/uber/jaeger-client-go v2.29.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
|
||||
github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
|
||||
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||
github.com/urfave/cli v1.22.4 h1:u7tSpNPPswAFymm8IehJhy4uJMlUuU/GmqSkvJ1InXA=
|
||||
github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||
@@ -519,6 +529,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
|
||||
go.m3o.com v0.1.0/go.mod h1:p8FdLqZH3R9a0y04qiMNT+clw69d3SxyQPFzCNbDRtk=
|
||||
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
|
||||
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
|
||||
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
|
||||
@@ -550,7 +561,10 @@ golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm
|
||||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a h1:kr2P4QFmQr29mSLA43kwrOcgcReGTfbE9N577tCTuBc=
|
||||
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
|
||||
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
|
||||
@@ -560,6 +574,7 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
|
||||
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
||||
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
|
||||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
||||
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
|
||||
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
|
||||
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
@@ -707,10 +722,12 @@ golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxb
|
||||
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190206041539-40960b6deb8e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
|
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
@@ -750,6 +767,10 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
|
||||
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
|
||||
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
|
||||
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
|
||||
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
|
||||
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
|
||||
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
|
||||
@@ -801,10 +822,10 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
|
||||
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
|
||||
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
|
||||
google.golang.org/grpc v1.41.0 h1:f+PlOh7QV4iIJkPrx5NQ7qaNGFQ3OTse67yaDHfju4E=
|
||||
google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k=
|
||||
google.golang.org/grpc/examples v0.0.0-20211020220737-f00baa6c3c84 h1:vTEaoYojw/smuQT/Fva/AX+2Bnla97/oRbY75XFhg40=
|
||||
google.golang.org/grpc/examples v0.0.0-20211020220737-f00baa6c3c84/go.mod h1:gID3PKrg7pWKntu9Ss6zTLJ0ttC0X9IHgREOCZwbCVU=
|
||||
google.golang.org/grpc v1.42.0 h1:XT2/MFpuPFsEX2fWh3YQtHkZ+WYZFQRfaUgLZYj/p6A=
|
||||
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
|
||||
google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e h1:m7aQHHqd0q89mRwhwS9Bx2rjyl/hsFAeta+uGrHsQaU=
|
||||
google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e/go.mod h1:gID3PKrg7pWKntu9Ss6zTLJ0ttC0X9IHgREOCZwbCVU=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
@@ -816,8 +837,9 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
|
||||
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
|
||||
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
|
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
|
||||
google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk=
|
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
|
||||
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
|
||||
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
@@ -863,5 +885,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
|
||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
|
||||
rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=
|
||||
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
|
||||
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
|
||||
|
@@ -5,8 +5,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/logger"
|
||||
"github.com/google/uuid"
|
||||
"go-micro.dev/v4/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@@ -13,9 +13,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nxadm/tail"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/runtime/local/git"
|
||||
"github.com/nxadm/tail"
|
||||
)
|
||||
|
||||
// defaultNamespace to use if not provided as an option
|
||||
|
@@ -8,9 +8,9 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/runtime/local/build"
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
)
|
||||
|
||||
type Builder struct {
|
||||
|
@@ -1,3 +1,4 @@
|
||||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
// Package os runs processes locally
|
||||
|
@@ -6,8 +6,8 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"go-micro.dev/v4/runtime/local/source"
|
||||
"github.com/go-git/go-git/v5"
|
||||
"go-micro.dev/v4/runtime/local/source"
|
||||
)
|
||||
|
||||
// Source retrieves source code
|
||||
|
@@ -4,8 +4,8 @@ import (
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
"go-micro.dev/v4/server"
|
||||
"github.com/google/uuid"
|
||||
"go-micro.dev/v4/server"
|
||||
)
|
||||
|
||||
type MockServer struct {
|
||||
|
@@ -4,6 +4,8 @@ import (
|
||||
"bytes"
|
||||
"sync"
|
||||
|
||||
"github.com/oxtoacart/bpool"
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/codec"
|
||||
raw "go-micro.dev/v4/codec/bytes"
|
||||
"go-micro.dev/v4/codec/grpc"
|
||||
@@ -12,8 +14,6 @@ import (
|
||||
"go-micro.dev/v4/codec/proto"
|
||||
"go-micro.dev/v4/codec/protorpc"
|
||||
"go-micro.dev/v4/transport"
|
||||
"github.com/oxtoacart/bpool"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type rpcCodec struct {
|
||||
|
@@ -9,9 +9,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go-micro.dev/v4/codec/json"
|
||||
protoCodec "go-micro.dev/v4/codec/proto"
|
||||
"github.com/golang/protobuf/proto"
|
||||
)
|
||||
|
||||
// protoStruct implements proto.Message
|
||||
|
@@ -7,11 +7,11 @@ import (
|
||||
"os/signal"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go-micro.dev/v4/codec"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/registry"
|
||||
signalutil "go-micro.dev/v4/util/signal"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Server is a simple micro server abstraction
|
||||
|
@@ -174,9 +174,21 @@ func (m *memoryStore) Read(key string, opts ...ReadOption) ([]*Record, error) {
|
||||
|
||||
// Handle Prefix / suffix
|
||||
if readOpts.Prefix || readOpts.Suffix {
|
||||
k := m.list(prefix, readOpts.Limit, readOpts.Offset)
|
||||
k := m.list(prefix, 0, 0)
|
||||
limit := int(readOpts.Limit)
|
||||
offset := int(readOpts.Offset)
|
||||
|
||||
if limit > len(k) {
|
||||
limit = len(k)
|
||||
}
|
||||
|
||||
if offset > len(k) {
|
||||
offset = len(k)
|
||||
}
|
||||
|
||||
for i := offset; i < limit; i++ {
|
||||
kk := k[i]
|
||||
|
||||
for _, kk := range k {
|
||||
if readOpts.Prefix && !strings.HasPrefix(kk, key) {
|
||||
continue
|
||||
}
|
||||
|
@@ -8,8 +8,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
"github.com/miekg/dns"
|
||||
log "go-micro.dev/v4/logger"
|
||||
"golang.org/x/net/ipv4"
|
||||
"golang.org/x/net/ipv6"
|
||||
)
|
||||
|
@@ -4,8 +4,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/transport"
|
||||
"github.com/google/uuid"
|
||||
"go-micro.dev/v4/transport"
|
||||
)
|
||||
|
||||
type pool struct {
|
||||
|
@@ -3,8 +3,8 @@ package sync
|
||||
import (
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/store"
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
type operation struct {
|
||||
|
@@ -6,9 +6,9 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go-micro.dev/v4/store"
|
||||
"github.com/ef-ds/deque"
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
||||
// Sync implements a sync in for stores
|
||||
|
@@ -6,9 +6,9 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
"go-micro.dev/v4"
|
||||
"go-micro.dev/v4/registry"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
//Options for web
|
||||
|
@@ -7,10 +7,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
"go-micro.dev/v4"
|
||||
"go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/web"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
func TestWeb(t *testing.T) {
|
||||
|
Reference in New Issue
Block a user