1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-05-19 21:23:04 +02:00

WIP: move file store (do not merge) (#2766)

* move file store

* fix build
This commit is contained in:
Asim Aslam 2025-05-16 15:08:13 +01:00 committed by GitHub
parent e29159e836
commit 97275d3db9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 64 additions and 62 deletions

View File

@ -1,4 +1,4 @@
package store package file
import ( import (
"context" "context"
@ -10,6 +10,7 @@ import (
"sync" "sync"
"time" "time"
"go-micro.dev/v5/store"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
) )
@ -26,7 +27,7 @@ var (
dataBucket = "data" dataBucket = "data"
) )
func NewFileStore(opts ...Option) Store { func NewStore(opts ...store.Option) store.Store {
s := &fileStore{ s := &fileStore{
handles: make(map[string]*fileHandle), handles: make(map[string]*fileHandle),
} }
@ -35,7 +36,7 @@ func NewFileStore(opts ...Option) Store {
} }
type fileStore struct { type fileStore struct {
options Options options store.Options
dir string dir string
// the database handle // the database handle
@ -70,7 +71,7 @@ func (m *fileStore) delete(fd *fileHandle, key string) error {
}) })
} }
func (m *fileStore) init(opts ...Option) error { func (m *fileStore) init(opts ...store.Option) error {
for _, o := range opts { for _, o := range opts {
o(&m.options) o(&m.options)
} }
@ -206,7 +207,7 @@ func (m *fileStore) list(fd *fileHandle, limit, offset uint) []string {
return allKeys return allKeys
} }
func (m *fileStore) get(fd *fileHandle, k string) (*Record, error) { func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) {
var value []byte var value []byte
fd.db.View(func(tx *bolt.Tx) error { fd.db.View(func(tx *bolt.Tx) error {
@ -221,7 +222,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*Record, error) {
}) })
if value == nil { if value == nil {
return nil, ErrNotFound return nil, store.ErrNotFound
} }
storedRecord := &record{} storedRecord := &record{}
@ -230,7 +231,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*Record, error) {
return nil, err return nil, err
} }
newRecord := &Record{} newRecord := &store.Record{}
newRecord.Key = storedRecord.Key newRecord.Key = storedRecord.Key
newRecord.Value = storedRecord.Value newRecord.Value = storedRecord.Value
newRecord.Metadata = make(map[string]interface{}) newRecord.Metadata = make(map[string]interface{})
@ -241,7 +242,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*Record, error) {
if !storedRecord.ExpiresAt.IsZero() { if !storedRecord.ExpiresAt.IsZero() {
if storedRecord.ExpiresAt.Before(time.Now()) { if storedRecord.ExpiresAt.Before(time.Now()) {
return nil, ErrNotFound return nil, store.ErrNotFound
} }
newRecord.Expiry = time.Until(storedRecord.ExpiresAt) newRecord.Expiry = time.Until(storedRecord.ExpiresAt)
} }
@ -249,7 +250,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*Record, error) {
return newRecord, nil return newRecord, nil
} }
func (m *fileStore) set(fd *fileHandle, r *Record) error { func (m *fileStore) set(fd *fileHandle, r *store.Record) error {
// copy the incoming record and then // copy the incoming record and then
// convert the expiry in to a hard timestamp // convert the expiry in to a hard timestamp
item := &record{} item := &record{}
@ -291,12 +292,12 @@ func (f *fileStore) Close() error {
return nil return nil
} }
func (f *fileStore) Init(opts ...Option) error { func (f *fileStore) Init(opts ...store.Option) error {
return f.init(opts...) return f.init(opts...)
} }
func (m *fileStore) Delete(key string, opts ...DeleteOption) error { func (m *fileStore) Delete(key string, opts ...store.DeleteOption) error {
var deleteOptions DeleteOptions var deleteOptions store.DeleteOptions
for _, o := range opts { for _, o := range opts {
o(&deleteOptions) o(&deleteOptions)
} }
@ -309,8 +310,8 @@ func (m *fileStore) Delete(key string, opts ...DeleteOption) error {
return m.delete(fd, key) return m.delete(fd, key)
} }
func (m *fileStore) Read(key string, opts ...ReadOption) ([]*Record, error) { func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
var readOpts ReadOptions var readOpts store.ReadOptions
for _, o := range opts { for _, o := range opts {
o(&readOpts) o(&readOpts)
} }
@ -342,7 +343,7 @@ func (m *fileStore) Read(key string, opts ...ReadOption) ([]*Record, error) {
keys = []string{key} keys = []string{key}
} }
var results []*Record var results []*store.Record
for _, k := range keys { for _, k := range keys {
r, err := m.get(fd, k) r, err := m.get(fd, k)
@ -355,8 +356,8 @@ func (m *fileStore) Read(key string, opts ...ReadOption) ([]*Record, error) {
return results, nil return results, nil
} }
func (m *fileStore) Write(r *Record, opts ...WriteOption) error { func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error {
var writeOpts WriteOptions var writeOpts store.WriteOptions
for _, o := range opts { for _, o := range opts {
o(&writeOpts) o(&writeOpts)
} }
@ -368,7 +369,7 @@ func (m *fileStore) Write(r *Record, opts ...WriteOption) error {
if len(opts) > 0 { if len(opts) > 0 {
// Copy the record before applying options, or the incoming record will be mutated // Copy the record before applying options, or the incoming record will be mutated
newRecord := Record{} newRecord := store.Record{}
newRecord.Key = r.Key newRecord.Key = r.Key
newRecord.Value = r.Value newRecord.Value = r.Value
newRecord.Metadata = make(map[string]interface{}) newRecord.Metadata = make(map[string]interface{})
@ -391,12 +392,12 @@ func (m *fileStore) Write(r *Record, opts ...WriteOption) error {
return m.set(fd, r) return m.set(fd, r)
} }
func (m *fileStore) Options() Options { func (m *fileStore) Options() store.Options {
return m.options return m.options
} }
func (m *fileStore) List(opts ...ListOption) ([]string, error) { func (m *fileStore) List(opts ...store.ListOption) ([]string, error) {
var listOptions ListOptions var listOptions store.ListOptions
for _, o := range opts { for _, o := range opts {
o(&listOptions) o(&listOptions)
@ -439,9 +440,9 @@ func (m *fileStore) String() string {
type dirOptionKey struct{} type dirOptionKey struct{}
// DirOption is a file store Option to set the directory for the file // DirOption is a file store store.Option to set the directory for the file
func DirOption(dir string) Option { func DirOption(dir string) store.Option {
return func(o *Options) { return func(o *store.Options) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }

View File

@ -1,4 +1,4 @@
package store package file
import ( import (
"fmt" "fmt"
@ -10,18 +10,19 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/kr/pretty" "github.com/kr/pretty"
"go-micro.dev/v5/store"
) )
func cleanup(db string, s Store) { func cleanup(db string, s store.Store) {
s.Close() s.Close()
dir := filepath.Join(DefaultDir, db+"/") dir := filepath.Join(DefaultDir, db+"/")
os.RemoveAll(dir) os.RemoveAll(dir)
} }
func TestFileStoreReInit(t *testing.T) { func TestFileStoreReInit(t *testing.T) {
s := NewStore(Table("aaa")) s := NewStore(store.Table("aaa"))
defer cleanup(DefaultDatabase, s) defer cleanup(DefaultDatabase, s)
s.Init(Table("bbb")) s.Init(store.Table("bbb"))
if s.Options().Table != "bbb" { if s.Options().Table != "bbb" {
t.Error("Init didn't reinitialise the store") t.Error("Init didn't reinitialise the store")
} }
@ -34,29 +35,29 @@ func TestFileStoreBasic(t *testing.T) {
} }
func TestFileStoreTable(t *testing.T) { func TestFileStoreTable(t *testing.T) {
s := NewStore(Table("testTable")) s := NewStore(store.Table("teststore.Table"))
defer cleanup(DefaultDatabase, s) defer cleanup(DefaultDatabase, s)
fileTest(s, t) fileTest(s, t)
} }
func TestFileStoreDatabase(t *testing.T) { func TestFileStoreDatabase(t *testing.T) {
s := NewStore(Database("testdb")) s := NewStore(store.Database("testdb"))
defer cleanup("testdb", s) defer cleanup("testdb", s)
fileTest(s, t) fileTest(s, t)
} }
func TestFileStoreDatabaseTable(t *testing.T) { func TestFileStoreDatabaseTable(t *testing.T) {
s := NewStore(Table("testTable"), Database("testdb")) s := NewStore(store.Table("teststore.Table"), store.Database("testdb"))
defer cleanup("testdb", s) defer cleanup("testdb", s)
fileTest(s, t) fileTest(s, t)
} }
func fileTest(s Store, t *testing.T) { func fileTest(s store.Store, t *testing.T) {
if len(os.Getenv("IN_TRAVIS_CI")) == 0 { if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
t.Logf("Options %s %v\n", s.String(), s.Options()) t.Logf("Options %s %v\n", s.String(), s.Options())
} }
// Read and Write an expiring Record // Read and Write an expiring Record
if err := s.Write(&Record{ if err := s.Write(&store.Record{
Key: "Hello", Key: "Hello",
Value: []byte("World"), Value: []byte("World"),
Expiry: time.Millisecond * 150, Expiry: time.Millisecond * 150,
@ -81,12 +82,12 @@ func fileTest(s Store, t *testing.T) {
// wait for expiry // wait for expiry
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
if _, err := s.Read("Hello"); err != ErrNotFound { if _, err := s.Read("Hello"); err != store.ErrNotFound {
t.Errorf("Expected %# v, got %# v", ErrNotFound, err) t.Errorf("Expected %# v, got %# v", store.ErrNotFound, err)
} }
// Write 3 records with various expiry and get with Table // Write 3 records with various expiry and get with store.Table
records := []*Record{ records := []*store.Record{
{ {
Key: "foo", Key: "foo",
Value: []byte("foofoo"), Value: []byte("foofoo"),
@ -104,30 +105,30 @@ func fileTest(s Store, t *testing.T) {
} }
} }
if results, err := s.Read("foo", ReadPrefix()); err != nil { if results, err := s.Read("foo", store.ReadPrefix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 2 { if len(results) != 2 {
t.Errorf("Expected 2 items, got %d", len(results)) t.Errorf("Expected 2 items, got %d", len(results))
// t.Logf("Table test: %v\n", spew.Sdump(results)) // t.Logf("store.Table test: %v\n", spew.Sdump(results))
} }
} }
// wait for the expiry // wait for the expiry
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
if results, err := s.Read("foo", ReadPrefix()); err != nil { if results, err := s.Read("foo", store.ReadPrefix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else if len(results) != 1 { } else if len(results) != 1 {
t.Errorf("Expected 1 item, got %d", len(results)) t.Errorf("Expected 1 item, got %d", len(results))
// t.Logf("Table test: %v\n", spew.Sdump(results)) // t.Logf("store.Table test: %v\n", spew.Sdump(results))
} }
if err := s.Delete("foo"); err != nil { if err := s.Delete("foo"); err != nil {
t.Errorf("Delete failed (%v)", err) t.Errorf("Delete failed (%v)", err)
} }
if results, err := s.Read("foo"); err != ErrNotFound { if results, err := s.Read("foo"); err != store.ErrNotFound {
t.Errorf("Expected read failure read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Expected read failure read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 0 { if len(results) != 0 {
@ -136,7 +137,7 @@ func fileTest(s Store, t *testing.T) {
} }
// Write 3 records with various expiry and get with Suffix // Write 3 records with various expiry and get with Suffix
records = []*Record{ records = []*store.Record{
{ {
Key: "foo", Key: "foo",
Value: []byte("foofoo"), Value: []byte("foofoo"),
@ -158,36 +159,36 @@ func fileTest(s Store, t *testing.T) {
t.Errorf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err) t.Errorf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err)
} }
} }
if results, err := s.Read("foo", ReadSuffix()); err != nil { if results, err := s.Read("foo", store.ReadSuffix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 3 { if len(results) != 3 {
t.Errorf("Expected 3 items, got %d", len(results)) t.Errorf("Expected 3 items, got %d", len(results))
// t.Logf("Table test: %v\n", spew.Sdump(results)) // t.Logf("store.Table test: %v\n", spew.Sdump(results))
} }
} }
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
if results, err := s.Read("foo", ReadSuffix()); err != nil { if results, err := s.Read("foo", store.ReadSuffix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 2 { if len(results) != 2 {
t.Errorf("Expected 2 items, got %d", len(results)) t.Errorf("Expected 2 items, got %d", len(results))
// t.Logf("Table test: %v\n", spew.Sdump(results)) // t.Logf("store.Table test: %v\n", spew.Sdump(results))
} }
} }
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
if results, err := s.Read("foo", ReadSuffix()); err != nil { if results, err := s.Read("foo", store.ReadSuffix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 1 { if len(results) != 1 {
t.Errorf("Expected 1 item, got %d", len(results)) t.Errorf("Expected 1 item, got %d", len(results))
// t.Logf("Table test: %# v\n", spew.Sdump(results)) // t.Logf("store.Table test: %# v\n", spew.Sdump(results))
} }
} }
if err := s.Delete("foo"); err != nil { if err := s.Delete("foo"); err != nil {
t.Errorf("Delete failed (%v)", err) t.Errorf("Delete failed (%v)", err)
} }
if results, err := s.Read("foo", ReadSuffix()); err != nil { if results, err := s.Read("foo", store.ReadSuffix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 0 { if len(results) != 0 {
@ -195,28 +196,28 @@ func fileTest(s Store, t *testing.T) {
} }
} }
// Test Table, Suffix and WriteOptions // Test store.Table, Suffix and WriteOptions
if err := s.Write(&Record{ if err := s.Write(&store.Record{
Key: "foofoobarbar", Key: "foofoobarbar",
Value: []byte("something"), Value: []byte("something"),
}, WriteTTL(time.Millisecond*100)); err != nil { }, store.WriteTTL(time.Millisecond*100)); err != nil {
t.Error(err) t.Error(err)
} }
if err := s.Write(&Record{ if err := s.Write(&store.Record{
Key: "foofoo", Key: "foofoo",
Value: []byte("something"), Value: []byte("something"),
}, WriteExpiry(time.Now().Add(time.Millisecond*100))); err != nil { }, store.WriteExpiry(time.Now().Add(time.Millisecond*100))); err != nil {
t.Error(err) t.Error(err)
} }
if err := s.Write(&Record{ if err := s.Write(&store.Record{
Key: "barbar", Key: "barbar",
Value: []byte("something"), Value: []byte("something"),
// TTL has higher precedence than expiry // TTL has higher precedence than expiry
}, WriteExpiry(time.Now().Add(time.Hour)), WriteTTL(time.Millisecond*100)); err != nil { }, store.WriteExpiry(time.Now().Add(time.Hour)), store.WriteTTL(time.Millisecond*100)); err != nil {
t.Error(err) t.Error(err)
} }
if results, err := s.Read("foo", ReadPrefix(), ReadSuffix()); err != nil { if results, err := s.Read("foo", store.ReadPrefix(), store.ReadSuffix()); err != nil {
t.Error(err) t.Error(err)
} else { } else {
if len(results) != 1 { if len(results) != 1 {
@ -236,14 +237,14 @@ func fileTest(s Store, t *testing.T) {
// write the following records // write the following records
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
s.Write(&Record{ s.Write(&store.Record{
Key: fmt.Sprintf("a%d", i), Key: fmt.Sprintf("a%d", i),
Value: []byte{}, Value: []byte{},
}) })
} }
// read back a few records // read back a few records
if results, err := s.Read("a", ReadLimit(5), ReadPrefix()); err != nil { if results, err := s.Read("a", store.ReadLimit(5), store.ReadPrefix()); err != nil {
t.Error(err) t.Error(err)
} else { } else {
if len(results) != 5 { if len(results) != 5 {
@ -255,7 +256,7 @@ func fileTest(s Store, t *testing.T) {
} }
// read the rest back // read the rest back
if results, err := s.Read("a", ReadLimit(30), ReadOffset(5), ReadPrefix()); err != nil { if results, err := s.Read("a", store.ReadLimit(30), store.ReadOffset(5), store.ReadPrefix()); err != nil {
t.Fatal(err) t.Fatal(err)
} else { } else {
if len(results) != 5 { if len(results) != 5 {

View File

@ -49,7 +49,7 @@ type Record struct {
} }
func NewStore(opts ...Option) Store { func NewStore(opts ...Option) Store {
return NewFileStore(opts...) return NewMemoryStore(opts...)
} }
func NewRecord(key string, val interface{}) *Record { func NewRecord(key string, val interface{}) *Record {