mirror of
https://github.com/go-micro/go-micro.git
synced 2025-08-04 21:42:57 +02:00
* feat: more plugins * chore(ci): split out benchmarks Attempt to resolve too many open files in ci * chore(ci): split out benchmarks * fix(ci): Attempt to resolve too many open files in ci * fix: set DefaultX for cli flag and service option * fix: restore http broker * fix: default http broker * feat: full nats profile * chore: still ugly, not ready * fix: better initialization for profiles * fix(tests): comment out flaky listen tests * fix: disable benchmarks on gha * chore: cleanup, comments * chore: add nats config source
338 lines
8.8 KiB
Go
338 lines
8.8 KiB
Go
package natsjskv
|
|
|
|
import (
|
|
"context"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/pkg/errors"
|
|
"go-micro.dev/v5/store"
|
|
)
|
|
|
|
func TestNats(t *testing.T) {
|
|
// Setup without calling Init on purpose
|
|
var err error
|
|
for i := 0; i < 5; i++ {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
addr := startNatsServer(ctx, t)
|
|
s := NewStore(store.Nodes(addr), EncodeKeys())
|
|
|
|
// Test String method
|
|
t.Log("Testing:", s.String())
|
|
|
|
err = basicTest(t, s)
|
|
if err != nil {
|
|
t.Log(err)
|
|
continue
|
|
}
|
|
|
|
// Test reading non-existing key
|
|
r, err := s.Read("this-is-a-random-key")
|
|
if !errors.Is(err, store.ErrNotFound) {
|
|
t.Errorf("Expected %# v, got %# v", store.ErrNotFound, err)
|
|
}
|
|
if len(r) > 0 {
|
|
t.Fatal("Lenth should be 0")
|
|
}
|
|
err = s.Close()
|
|
if err != nil {
|
|
t.Logf("Failed to close store: %v", err)
|
|
}
|
|
cancel()
|
|
return
|
|
}
|
|
t.Fatal(err)
|
|
}
|
|
|
|
func TestOptions(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s := testSetup(ctx, t,
|
|
DefaultMemory(),
|
|
|
|
// Having a non-default description will trigger nats.ErrStreamNameAlreadyInUse
|
|
// since the buckets have been created in previous tests with a different description.
|
|
//
|
|
// NOTE: this is only the case with a manually set up server, not with current
|
|
// test setup, where new servers are started for each test.
|
|
DefaultDescription("My fancy description"),
|
|
|
|
// Option has no effect in this context, just to test setting the option
|
|
JetStreamOptions(nats.PublishAsyncMaxPending(256)),
|
|
|
|
// Sets a custom NATS client name, just to test the NatsOptions() func
|
|
NatsOptions(nats.Options{Name: "Go NATS Store Plugin Tests Client"}),
|
|
|
|
KeyValueOptions(&nats.KeyValueConfig{
|
|
Bucket: "TestBucketName",
|
|
Description: "This bucket is not used",
|
|
TTL: 5 * time.Minute,
|
|
MaxBytes: 1024,
|
|
Storage: nats.MemoryStorage,
|
|
Replicas: 1,
|
|
}),
|
|
|
|
// Encode keys to avoid character limitations
|
|
EncodeKeys(),
|
|
)
|
|
defer cancel()
|
|
|
|
if err := basicTest(t, s); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func TestTTL(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
ttl := 500 * time.Millisecond
|
|
s := testSetup(ctx, t,
|
|
DefaultTTL(ttl),
|
|
|
|
// Since these buckets will be new they will have the new description
|
|
DefaultDescription("My fancy description"),
|
|
)
|
|
defer cancel()
|
|
|
|
// Use a uuid to make sure a new bucket is created when using local server
|
|
id := uuid.New().String()
|
|
for _, r := range table {
|
|
if err := s.Write(r.Record, store.WriteTo(r.Database+id, r.Table)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
time.Sleep(ttl * 2)
|
|
|
|
for _, r := range table {
|
|
res, err := s.Read(r.Record.Key, store.ReadFrom(r.Database+id, r.Table))
|
|
if !errors.Is(err, store.ErrNotFound) {
|
|
t.Errorf("Expected %# v, got %# v", store.ErrNotFound, err)
|
|
}
|
|
if len(res) > 0 {
|
|
t.Fatal("Fetched record while it should have expired")
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestMetaData(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s := testSetup(ctx, t)
|
|
defer cancel()
|
|
|
|
record := store.Record{
|
|
Key: "KeyOne",
|
|
Value: []byte("Some value"),
|
|
Metadata: map[string]interface{}{
|
|
"meta-one": "val",
|
|
"meta-two": 5,
|
|
},
|
|
Expiry: 0,
|
|
}
|
|
bucket := "meta-data-test"
|
|
if err := s.Write(&record, store.WriteTo(bucket, "")); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
r, err := s.Read(record.Key, store.ReadFrom(bucket, ""))
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(r) == 0 {
|
|
t.Fatal("No results found")
|
|
}
|
|
|
|
m := r[0].Metadata
|
|
if m["meta-one"].(string) != record.Metadata["meta-one"].(string) ||
|
|
m["meta-two"].(float64) != float64(record.Metadata["meta-two"].(int)) {
|
|
t.Fatalf("Metadata does not match: (%+v) != (%+v)", m, record.Metadata)
|
|
}
|
|
}
|
|
|
|
func TestDelete(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s := testSetup(ctx, t)
|
|
defer cancel()
|
|
|
|
for _, r := range table {
|
|
if err := s.Write(r.Record, store.WriteTo(r.Database, r.Table)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if err := s.Delete(r.Record.Key, store.DeleteFrom(r.Database, r.Table)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
time.Sleep(time.Second)
|
|
|
|
res, err := s.Read(r.Record.Key, store.ReadFrom(r.Database, r.Table))
|
|
if !errors.Is(err, store.ErrNotFound) {
|
|
t.Errorf("Expected %# v, got %# v", store.ErrNotFound, err)
|
|
}
|
|
if len(res) > 0 {
|
|
t.Fatalf("Failed to delete %s:%s from %s %s (len: %d)", r.Record.Key, r.Record.Value, r.Database, r.Table, len(res))
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestList(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s := testSetup(ctx, t)
|
|
defer cancel()
|
|
|
|
for _, r := range table {
|
|
if err := s.Write(r.Record, store.WriteTo(r.Database, r.Table)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
l := []struct {
|
|
Database string
|
|
Table string
|
|
Length int
|
|
Prefix string
|
|
Suffix string
|
|
Offset int
|
|
Limit int
|
|
}{
|
|
{Length: 7},
|
|
{Database: "prefix-test", Length: 7},
|
|
{Database: "prefix-test", Offset: 2, Length: 5},
|
|
{Database: "prefix-test", Offset: 2, Limit: 3, Length: 3},
|
|
{Database: "prefix-test", Table: "names", Length: 3},
|
|
{Database: "prefix-test", Table: "cities", Length: 4},
|
|
{Database: "prefix-test", Table: "cities", Suffix: "City", Length: 3},
|
|
{Database: "prefix-test", Table: "cities", Suffix: "City", Limit: 2, Length: 2},
|
|
{Database: "prefix-test", Table: "cities", Suffix: "City", Offset: 1, Length: 2},
|
|
{Prefix: "test", Length: 1},
|
|
{Table: "some_table", Prefix: "test", Suffix: "test", Length: 2},
|
|
}
|
|
|
|
for i, entry := range l {
|
|
// Test listing keys
|
|
keys, err := s.List(
|
|
store.ListFrom(entry.Database, entry.Table),
|
|
store.ListPrefix(entry.Prefix),
|
|
store.ListSuffix(entry.Suffix),
|
|
store.ListOffset(uint(entry.Offset)),
|
|
store.ListLimit(uint(entry.Limit)),
|
|
)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(keys) != entry.Length {
|
|
t.Fatalf("Length of returned keys is invalid for test %d - %+v (%d)", i+1, entry, len(keys))
|
|
}
|
|
|
|
// Test reading keys
|
|
if entry.Prefix != "" || entry.Suffix != "" {
|
|
var key string
|
|
options := []store.ReadOption{
|
|
store.ReadFrom(entry.Database, entry.Table),
|
|
store.ReadLimit(uint(entry.Limit)),
|
|
store.ReadOffset(uint(entry.Offset)),
|
|
}
|
|
if entry.Prefix != "" {
|
|
key = entry.Prefix
|
|
options = append(options, store.ReadPrefix())
|
|
}
|
|
if entry.Suffix != "" {
|
|
key = entry.Suffix
|
|
options = append(options, store.ReadSuffix())
|
|
}
|
|
r, err := s.Read(key, options...)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if len(r) != entry.Length {
|
|
t.Fatalf("Length of read keys is invalid for test %d - %+v (%d)", i+1, entry, len(r))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestDeleteBucket(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
s := testSetup(ctx, t)
|
|
defer cancel()
|
|
|
|
for _, r := range table {
|
|
if err := s.Write(r.Record, store.WriteTo(r.Database, r.Table)); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
bucket := "prefix-test"
|
|
if err := s.Delete(bucket, DeleteBucket()); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
keys, err := s.List(store.ListFrom(bucket, ""))
|
|
if err != nil && !errors.Is(err, ErrBucketNotFound) {
|
|
t.Fatalf("Failed to delete bucket: %v", err)
|
|
}
|
|
|
|
if len(keys) > 0 {
|
|
t.Fatal("Length of key list should be 0 after bucket deletion")
|
|
}
|
|
|
|
r, err := s.Read("", store.ReadPrefix(), store.ReadFrom(bucket, ""))
|
|
if err != nil && !errors.Is(err, ErrBucketNotFound) {
|
|
t.Fatalf("Failed to delete bucket: %v", err)
|
|
}
|
|
if len(r) > 0 {
|
|
t.Fatal("Length of record list should be 0 after bucket deletion", len(r))
|
|
}
|
|
}
|
|
|
|
func TestEnforceLimits(t *testing.T) {
|
|
s := []string{"a", "b", "c", "d"}
|
|
var testCasts = []struct {
|
|
Alias string
|
|
Offset uint
|
|
Limit uint
|
|
Expected []string
|
|
}{
|
|
{"plain", 0, 0, []string{"a", "b", "c", "d"}},
|
|
{"offset&limit-1", 1, 3, []string{"b", "c", "d"}},
|
|
{"offset&limit-2", 1, 1, []string{"b"}},
|
|
{"offset=length", 4, 0, []string{}},
|
|
{"offset>length", 222, 0, []string{}},
|
|
{"limit>length", 0, 36, []string{"a", "b", "c", "d"}},
|
|
}
|
|
for _, tc := range testCasts {
|
|
actual := enforceLimits(s, tc.Limit, tc.Offset)
|
|
if !reflect.DeepEqual(actual, tc.Expected) {
|
|
t.Fatalf("%s: Expected %v, got %v", tc.Alias, tc.Expected, actual)
|
|
}
|
|
}
|
|
}
|
|
|
|
func basicTest(t *testing.T, s store.Store) error {
|
|
t.Helper()
|
|
for _, test := range table {
|
|
if err := s.Write(test.Record, store.WriteTo(test.Database, test.Table)); err != nil {
|
|
return errors.Wrap(err, "Failed to write record in basic test")
|
|
}
|
|
r, err := s.Read(test.Record.Key, store.ReadFrom(test.Database, test.Table))
|
|
if err != nil {
|
|
return errors.Wrap(err, "Failed to read record in basic test")
|
|
}
|
|
if len(r) == 0 {
|
|
t.Fatalf("No results found for %s (%s) %s", test.Record.Key, test.Database, test.Table)
|
|
}
|
|
|
|
key := test.Record.Key
|
|
val1 := string(test.Record.Value)
|
|
|
|
key2 := r[0].Key
|
|
val2 := string(r[0].Value)
|
|
if val1 != val2 {
|
|
t.Fatalf("Value not equal for (%s: %s) != (%s: %s)", key, val1, key2, val2)
|
|
}
|
|
}
|
|
return nil
|
|
}
|