1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-06-06 22:06:19 +02:00

Revert "WIP: move file store (do not merge) (#2766)" (#2768)

This reverts commit 97275d3db9499f59e5dfef401f5f1823f95fc295.
This commit is contained in:
Asim Aslam 2025-05-16 19:06:45 +01:00 committed by GitHub
parent 1fe2638298
commit 37bb1a8ab6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 62 additions and 64 deletions

View File

@ -1,4 +1,4 @@
package file package store
import ( import (
"context" "context"
@ -10,7 +10,6 @@ import (
"sync" "sync"
"time" "time"
"go-micro.dev/v5/store"
bolt "go.etcd.io/bbolt" bolt "go.etcd.io/bbolt"
) )
@ -27,7 +26,7 @@ var (
dataBucket = "data" dataBucket = "data"
) )
func NewStore(opts ...store.Option) store.Store { func NewFileStore(opts ...Option) Store {
s := &fileStore{ s := &fileStore{
handles: make(map[string]*fileHandle), handles: make(map[string]*fileHandle),
} }
@ -36,7 +35,7 @@ func NewStore(opts ...store.Option) store.Store {
} }
type fileStore struct { type fileStore struct {
options store.Options options Options
dir string dir string
// the database handle // the database handle
@ -71,7 +70,7 @@ func (m *fileStore) delete(fd *fileHandle, key string) error {
}) })
} }
func (m *fileStore) init(opts ...store.Option) error { func (m *fileStore) init(opts ...Option) error {
for _, o := range opts { for _, o := range opts {
o(&m.options) o(&m.options)
} }
@ -207,7 +206,7 @@ func (m *fileStore) list(fd *fileHandle, limit, offset uint) []string {
return allKeys return allKeys
} }
func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) { func (m *fileStore) get(fd *fileHandle, k string) (*Record, error) {
var value []byte var value []byte
fd.db.View(func(tx *bolt.Tx) error { fd.db.View(func(tx *bolt.Tx) error {
@ -222,7 +221,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) {
}) })
if value == nil { if value == nil {
return nil, store.ErrNotFound return nil, ErrNotFound
} }
storedRecord := &record{} storedRecord := &record{}
@ -231,7 +230,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) {
return nil, err return nil, err
} }
newRecord := &store.Record{} newRecord := &Record{}
newRecord.Key = storedRecord.Key newRecord.Key = storedRecord.Key
newRecord.Value = storedRecord.Value newRecord.Value = storedRecord.Value
newRecord.Metadata = make(map[string]interface{}) newRecord.Metadata = make(map[string]interface{})
@ -242,7 +241,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) {
if !storedRecord.ExpiresAt.IsZero() { if !storedRecord.ExpiresAt.IsZero() {
if storedRecord.ExpiresAt.Before(time.Now()) { if storedRecord.ExpiresAt.Before(time.Now()) {
return nil, store.ErrNotFound return nil, ErrNotFound
} }
newRecord.Expiry = time.Until(storedRecord.ExpiresAt) newRecord.Expiry = time.Until(storedRecord.ExpiresAt)
} }
@ -250,7 +249,7 @@ func (m *fileStore) get(fd *fileHandle, k string) (*store.Record, error) {
return newRecord, nil return newRecord, nil
} }
func (m *fileStore) set(fd *fileHandle, r *store.Record) error { func (m *fileStore) set(fd *fileHandle, r *Record) error {
// copy the incoming record and then // copy the incoming record and then
// convert the expiry in to a hard timestamp // convert the expiry in to a hard timestamp
item := &record{} item := &record{}
@ -292,12 +291,12 @@ func (f *fileStore) Close() error {
return nil return nil
} }
func (f *fileStore) Init(opts ...store.Option) error { func (f *fileStore) Init(opts ...Option) error {
return f.init(opts...) return f.init(opts...)
} }
func (m *fileStore) Delete(key string, opts ...store.DeleteOption) error { func (m *fileStore) Delete(key string, opts ...DeleteOption) error {
var deleteOptions store.DeleteOptions var deleteOptions DeleteOptions
for _, o := range opts { for _, o := range opts {
o(&deleteOptions) o(&deleteOptions)
} }
@ -310,8 +309,8 @@ func (m *fileStore) Delete(key string, opts ...store.DeleteOption) error {
return m.delete(fd, key) return m.delete(fd, key)
} }
func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) { func (m *fileStore) Read(key string, opts ...ReadOption) ([]*Record, error) {
var readOpts store.ReadOptions var readOpts ReadOptions
for _, o := range opts { for _, o := range opts {
o(&readOpts) o(&readOpts)
} }
@ -343,7 +342,7 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
keys = []string{key} keys = []string{key}
} }
var results []*store.Record var results []*Record
for _, k := range keys { for _, k := range keys {
r, err := m.get(fd, k) r, err := m.get(fd, k)
@ -356,8 +355,8 @@ func (m *fileStore) Read(key string, opts ...store.ReadOption) ([]*store.Record,
return results, nil return results, nil
} }
func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error { func (m *fileStore) Write(r *Record, opts ...WriteOption) error {
var writeOpts store.WriteOptions var writeOpts WriteOptions
for _, o := range opts { for _, o := range opts {
o(&writeOpts) o(&writeOpts)
} }
@ -369,7 +368,7 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error {
if len(opts) > 0 { if len(opts) > 0 {
// Copy the record before applying options, or the incoming record will be mutated // Copy the record before applying options, or the incoming record will be mutated
newRecord := store.Record{} newRecord := Record{}
newRecord.Key = r.Key newRecord.Key = r.Key
newRecord.Value = r.Value newRecord.Value = r.Value
newRecord.Metadata = make(map[string]interface{}) newRecord.Metadata = make(map[string]interface{})
@ -392,12 +391,12 @@ func (m *fileStore) Write(r *store.Record, opts ...store.WriteOption) error {
return m.set(fd, r) return m.set(fd, r)
} }
func (m *fileStore) Options() store.Options { func (m *fileStore) Options() Options {
return m.options return m.options
} }
func (m *fileStore) List(opts ...store.ListOption) ([]string, error) { func (m *fileStore) List(opts ...ListOption) ([]string, error) {
var listOptions store.ListOptions var listOptions ListOptions
for _, o := range opts { for _, o := range opts {
o(&listOptions) o(&listOptions)
@ -440,9 +439,9 @@ func (m *fileStore) String() string {
type dirOptionKey struct{} type dirOptionKey struct{}
// DirOption is a file store store.Option to set the directory for the file // DirOption is a file store Option to set the directory for the file
func DirOption(dir string) store.Option { func DirOption(dir string) Option {
return func(o *store.Options) { return func(o *Options) {
if o.Context == nil { if o.Context == nil {
o.Context = context.Background() o.Context = context.Background()
} }

View File

@ -1,4 +1,4 @@
package file package store
import ( import (
"fmt" "fmt"
@ -10,19 +10,18 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/kr/pretty" "github.com/kr/pretty"
"go-micro.dev/v5/store"
) )
func cleanup(db string, s store.Store) { func cleanup(db string, s Store) {
s.Close() s.Close()
dir := filepath.Join(DefaultDir, db+"/") dir := filepath.Join(DefaultDir, db+"/")
os.RemoveAll(dir) os.RemoveAll(dir)
} }
func TestFileStoreReInit(t *testing.T) { func TestFileStoreReInit(t *testing.T) {
s := NewStore(store.Table("aaa")) s := NewStore(Table("aaa"))
defer cleanup(DefaultDatabase, s) defer cleanup(DefaultDatabase, s)
s.Init(store.Table("bbb")) s.Init(Table("bbb"))
if s.Options().Table != "bbb" { if s.Options().Table != "bbb" {
t.Error("Init didn't reinitialise the store") t.Error("Init didn't reinitialise the store")
} }
@ -35,29 +34,29 @@ func TestFileStoreBasic(t *testing.T) {
} }
func TestFileStoreTable(t *testing.T) { func TestFileStoreTable(t *testing.T) {
s := NewStore(store.Table("teststore.Table")) s := NewStore(Table("testTable"))
defer cleanup(DefaultDatabase, s) defer cleanup(DefaultDatabase, s)
fileTest(s, t) fileTest(s, t)
} }
func TestFileStoreDatabase(t *testing.T) { func TestFileStoreDatabase(t *testing.T) {
s := NewStore(store.Database("testdb")) s := NewStore(Database("testdb"))
defer cleanup("testdb", s) defer cleanup("testdb", s)
fileTest(s, t) fileTest(s, t)
} }
func TestFileStoreDatabaseTable(t *testing.T) { func TestFileStoreDatabaseTable(t *testing.T) {
s := NewStore(store.Table("teststore.Table"), store.Database("testdb")) s := NewStore(Table("testTable"), Database("testdb"))
defer cleanup("testdb", s) defer cleanup("testdb", s)
fileTest(s, t) fileTest(s, t)
} }
func fileTest(s store.Store, t *testing.T) { func fileTest(s Store, t *testing.T) {
if len(os.Getenv("IN_TRAVIS_CI")) == 0 { if len(os.Getenv("IN_TRAVIS_CI")) == 0 {
t.Logf("Options %s %v\n", s.String(), s.Options()) t.Logf("Options %s %v\n", s.String(), s.Options())
} }
// Read and Write an expiring Record // Read and Write an expiring Record
if err := s.Write(&store.Record{ if err := s.Write(&Record{
Key: "Hello", Key: "Hello",
Value: []byte("World"), Value: []byte("World"),
Expiry: time.Millisecond * 150, Expiry: time.Millisecond * 150,
@ -82,12 +81,12 @@ func fileTest(s store.Store, t *testing.T) {
// wait for expiry // wait for expiry
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
if _, err := s.Read("Hello"); err != store.ErrNotFound { if _, err := s.Read("Hello"); err != ErrNotFound {
t.Errorf("Expected %# v, got %# v", store.ErrNotFound, err) t.Errorf("Expected %# v, got %# v", ErrNotFound, err)
} }
// Write 3 records with various expiry and get with store.Table // Write 3 records with various expiry and get with Table
records := []*store.Record{ records := []*Record{
{ {
Key: "foo", Key: "foo",
Value: []byte("foofoo"), Value: []byte("foofoo"),
@ -105,30 +104,30 @@ func fileTest(s store.Store, t *testing.T) {
} }
} }
if results, err := s.Read("foo", store.ReadPrefix()); err != nil { if results, err := s.Read("foo", ReadPrefix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 2 { if len(results) != 2 {
t.Errorf("Expected 2 items, got %d", len(results)) t.Errorf("Expected 2 items, got %d", len(results))
// t.Logf("store.Table test: %v\n", spew.Sdump(results)) // t.Logf("Table test: %v\n", spew.Sdump(results))
} }
} }
// wait for the expiry // wait for the expiry
time.Sleep(time.Millisecond * 200) time.Sleep(time.Millisecond * 200)
if results, err := s.Read("foo", store.ReadPrefix()); err != nil { if results, err := s.Read("foo", ReadPrefix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else if len(results) != 1 { } else if len(results) != 1 {
t.Errorf("Expected 1 item, got %d", len(results)) t.Errorf("Expected 1 item, got %d", len(results))
// t.Logf("store.Table test: %v\n", spew.Sdump(results)) // t.Logf("Table test: %v\n", spew.Sdump(results))
} }
if err := s.Delete("foo"); err != nil { if err := s.Delete("foo"); err != nil {
t.Errorf("Delete failed (%v)", err) t.Errorf("Delete failed (%v)", err)
} }
if results, err := s.Read("foo"); err != store.ErrNotFound { if results, err := s.Read("foo"); err != ErrNotFound {
t.Errorf("Expected read failure read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Expected read failure read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 0 { if len(results) != 0 {
@ -137,7 +136,7 @@ func fileTest(s store.Store, t *testing.T) {
} }
// Write 3 records with various expiry and get with Suffix // Write 3 records with various expiry and get with Suffix
records = []*store.Record{ records = []*Record{
{ {
Key: "foo", Key: "foo",
Value: []byte("foofoo"), Value: []byte("foofoo"),
@ -159,36 +158,36 @@ func fileTest(s store.Store, t *testing.T) {
t.Errorf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err) t.Errorf("Couldn't write k: %s, v: %# v (%s)", r.Key, pretty.Formatter(r.Value), err)
} }
} }
if results, err := s.Read("foo", store.ReadSuffix()); err != nil { if results, err := s.Read("foo", ReadSuffix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 3 { if len(results) != 3 {
t.Errorf("Expected 3 items, got %d", len(results)) t.Errorf("Expected 3 items, got %d", len(results))
// t.Logf("store.Table test: %v\n", spew.Sdump(results)) // t.Logf("Table test: %v\n", spew.Sdump(results))
} }
} }
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
if results, err := s.Read("foo", store.ReadSuffix()); err != nil { if results, err := s.Read("foo", ReadSuffix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 2 { if len(results) != 2 {
t.Errorf("Expected 2 items, got %d", len(results)) t.Errorf("Expected 2 items, got %d", len(results))
// t.Logf("store.Table test: %v\n", spew.Sdump(results)) // t.Logf("Table test: %v\n", spew.Sdump(results))
} }
} }
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
if results, err := s.Read("foo", store.ReadSuffix()); err != nil { if results, err := s.Read("foo", ReadSuffix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 1 { if len(results) != 1 {
t.Errorf("Expected 1 item, got %d", len(results)) t.Errorf("Expected 1 item, got %d", len(results))
// t.Logf("store.Table test: %# v\n", spew.Sdump(results)) // t.Logf("Table test: %# v\n", spew.Sdump(results))
} }
} }
if err := s.Delete("foo"); err != nil { if err := s.Delete("foo"); err != nil {
t.Errorf("Delete failed (%v)", err) t.Errorf("Delete failed (%v)", err)
} }
if results, err := s.Read("foo", store.ReadSuffix()); err != nil { if results, err := s.Read("foo", ReadSuffix()); err != nil {
t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err) t.Errorf("Couldn't read all \"foo\" keys, got %# v (%s)", spew.Sdump(results), err)
} else { } else {
if len(results) != 0 { if len(results) != 0 {
@ -196,28 +195,28 @@ func fileTest(s store.Store, t *testing.T) {
} }
} }
// Test store.Table, Suffix and WriteOptions // Test Table, Suffix and WriteOptions
if err := s.Write(&store.Record{ if err := s.Write(&Record{
Key: "foofoobarbar", Key: "foofoobarbar",
Value: []byte("something"), Value: []byte("something"),
}, store.WriteTTL(time.Millisecond*100)); err != nil { }, WriteTTL(time.Millisecond*100)); err != nil {
t.Error(err) t.Error(err)
} }
if err := s.Write(&store.Record{ if err := s.Write(&Record{
Key: "foofoo", Key: "foofoo",
Value: []byte("something"), Value: []byte("something"),
}, store.WriteExpiry(time.Now().Add(time.Millisecond*100))); err != nil { }, WriteExpiry(time.Now().Add(time.Millisecond*100))); err != nil {
t.Error(err) t.Error(err)
} }
if err := s.Write(&store.Record{ if err := s.Write(&Record{
Key: "barbar", Key: "barbar",
Value: []byte("something"), Value: []byte("something"),
// TTL has higher precedence than expiry // TTL has higher precedence than expiry
}, store.WriteExpiry(time.Now().Add(time.Hour)), store.WriteTTL(time.Millisecond*100)); err != nil { }, WriteExpiry(time.Now().Add(time.Hour)), WriteTTL(time.Millisecond*100)); err != nil {
t.Error(err) t.Error(err)
} }
if results, err := s.Read("foo", store.ReadPrefix(), store.ReadSuffix()); err != nil { if results, err := s.Read("foo", ReadPrefix(), ReadSuffix()); err != nil {
t.Error(err) t.Error(err)
} else { } else {
if len(results) != 1 { if len(results) != 1 {
@ -237,14 +236,14 @@ func fileTest(s store.Store, t *testing.T) {
// write the following records // write the following records
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
s.Write(&store.Record{ s.Write(&Record{
Key: fmt.Sprintf("a%d", i), Key: fmt.Sprintf("a%d", i),
Value: []byte{}, Value: []byte{},
}) })
} }
// read back a few records // read back a few records
if results, err := s.Read("a", store.ReadLimit(5), store.ReadPrefix()); err != nil { if results, err := s.Read("a", ReadLimit(5), ReadPrefix()); err != nil {
t.Error(err) t.Error(err)
} else { } else {
if len(results) != 5 { if len(results) != 5 {
@ -256,7 +255,7 @@ func fileTest(s store.Store, t *testing.T) {
} }
// read the rest back // read the rest back
if results, err := s.Read("a", store.ReadLimit(30), store.ReadOffset(5), store.ReadPrefix()); err != nil { if results, err := s.Read("a", ReadLimit(30), ReadOffset(5), ReadPrefix()); err != nil {
t.Fatal(err) t.Fatal(err)
} else { } else {
if len(results) != 5 { if len(results) != 5 {

View File

@ -49,7 +49,7 @@ type Record struct {
} }
func NewStore(opts ...Option) Store { func NewStore(opts ...Option) Store {
return NewMemoryStore(opts...) return NewFileStore(opts...)
} }
func NewRecord(key string, val interface{}) *Record { func NewRecord(key string, val interface{}) *Record {