From 8e4e710e1539712f36b9abc52422ae976fd901b0 Mon Sep 17 00:00:00 2001 From: Asim Aslam <asim@aslam.me> Date: Tue, 11 Jun 2019 17:20:52 +0100 Subject: [PATCH 1/7] Move data to top level --- {sync/data => data}/consul/consul.go | 29 +++++++++++++++------------- {sync/data => data}/data.go | 6 ++++-- data/options.go | 15 ++++++++++++++ sync/README.md | 25 ------------------------ sync/data/options.go | 19 ------------------ sync/db.go | 4 ++-- sync/options.go | 2 +- sync/sync.go | 2 +- 8 files changed, 39 insertions(+), 63 deletions(-) rename {sync/data => data}/consul/consul.go (75%) rename {sync/data => data}/data.go (88%) create mode 100644 data/options.go delete mode 100644 sync/data/options.go diff --git a/sync/data/consul/consul.go b/data/consul/consul.go similarity index 75% rename from sync/data/consul/consul.go rename to data/consul/consul.go index 9d2c7a93..58c62495 100644 --- a/sync/data/consul/consul.go +++ b/data/consul/consul.go @@ -6,10 +6,12 @@ import ( "net" "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/sync/data" + "github.com/micro/go-micro/data" + "github.com/micro/go-micro/options" ) type ckv struct { + options.Options client *api.Client } @@ -64,22 +66,22 @@ func (c *ckv) String() string { return "consul" } -func NewData(opts ...data.Option) data.Data { - var options data.Options - for _, o := range opts { - o(&options) - } - +func NewData(opts ...options.Option) data.Data { + options := options.NewOptions(opts...) config := api.DefaultConfig() + var nodes []string + + if n, ok := options.Values().Get("data.nodes"); ok { + nodes = n.([]string) + } + // set host - // config.Host something - // check if there are any addrs - if len(options.Nodes) > 0 { - addr, port, err := net.SplitHostPort(options.Nodes[0]) + if len(nodes) > 0 { + addr, port, err := net.SplitHostPort(nodes[0]) if ae, ok := err.(*net.AddrError); ok && ae.Err == "missing port in address" { port = "8500" - config.Address = fmt.Sprintf("%s:%s", options.Nodes[0], port) + config.Address = fmt.Sprintf("%s:%s", nodes[0], port) } else if err == nil { config.Address = fmt.Sprintf("%s:%s", addr, port) } @@ -88,6 +90,7 @@ func NewData(opts ...data.Option) data.Data { client, _ := api.NewClient(config) return &ckv{ - client: client, + Options: options, + client: client, } } diff --git a/sync/data/data.go b/data/data.go similarity index 88% rename from sync/data/data.go rename to data/data.go index 20e4a4c8..004c5160 100644 --- a/sync/data/data.go +++ b/data/data.go @@ -4,6 +4,8 @@ package data import ( "errors" "time" + + "github.com/micro/go-micro/options" ) var ( @@ -12,6 +14,8 @@ var ( // Data is a data storage interface type Data interface { + // embed options + options.Options // Dump the known records Dump() ([]*Record, error) // Read a record with key @@ -28,5 +32,3 @@ type Record struct { Value []byte Expiration time.Duration } - -type Option func(o *Options) diff --git a/data/options.go b/data/options.go new file mode 100644 index 00000000..510e0fe3 --- /dev/null +++ b/data/options.go @@ -0,0 +1,15 @@ +package data + +import ( + "github.com/micro/go-micro/options" +) + +// Set the nodes used to back the data +func Nodes(a ...string) options.Option { + return options.WithValue("data.nodes", a) +} + +// Prefix sets a prefix to any key ids used +func Prefix(p string) options.Option { + return options.WithValue("data.prefix", p) +} diff --git a/sync/README.md b/sync/README.md index 94349374..d69c2060 100644 --- a/sync/README.md +++ b/sync/README.md @@ -10,7 +10,6 @@ an external database or eventing system. Go Sync provides a framework for synchr ## Getting Started -- [Data](#data) - simple distributed data storage - [Leader](#leader) - leadership election for group coordination - [Lock](#lock) - distributed locking for exclusive resource access - [Task](#task) - distributed job execution @@ -70,30 +69,6 @@ for { e.Resign() ``` -## Data - -Data provides a simple interface for distributed data storage. - -```go -import ( - "github.com/micro/go-micro/sync/data" - "github.com/micro/go-micro/sync/data/consul" -) - -keyval := consul.NewData() - -err := keyval.Write(&data.Record{ - Key: "foo", - Value: []byte(`bar`), -}) -// handle err - -v, err := keyval.Read("foo") -// handle err - -err = keyval.Delete("foo") -``` - ## Task Task provides distributed job execution. It's a simple way to distribute work across a coordinated pool of workers. diff --git a/sync/data/options.go b/sync/data/options.go deleted file mode 100644 index 1d85aca3..00000000 --- a/sync/data/options.go +++ /dev/null @@ -1,19 +0,0 @@ -package data - -type Options struct { - Nodes []string - Prefix string -} - -func Nodes(a ...string) Option { - return func(o *Options) { - o.Nodes = a - } -} - -// Prefix sets a prefix to any lock ids used -func Prefix(p string) Option { - return func(o *Options) { - o.Prefix = p - } -} diff --git a/sync/db.go b/sync/db.go index a16af598..b5141d42 100644 --- a/sync/db.go +++ b/sync/db.go @@ -6,8 +6,8 @@ import ( "encoding/json" "fmt" - "github.com/micro/go-micro/sync/data" - ckv "github.com/micro/go-micro/sync/data/consul" + "github.com/micro/go-micro/data" + ckv "github.com/micro/go-micro/data/consul" lock "github.com/micro/go-micro/sync/lock/consul" ) diff --git a/sync/options.go b/sync/options.go index 8b46acf0..2922b77c 100644 --- a/sync/options.go +++ b/sync/options.go @@ -1,7 +1,7 @@ package sync import ( - "github.com/micro/go-micro/sync/data" + "github.com/micro/go-micro/data" "github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/time" diff --git a/sync/sync.go b/sync/sync.go index 7cb63c51..b27118e5 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -2,7 +2,7 @@ package sync import ( - "github.com/micro/go-micro/sync/data" + "github.com/micro/go-micro/data" "github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/task" From 7727b359c88e6be8ffbfe94429efd97de07b47e7 Mon Sep 17 00:00:00 2001 From: Asim Aslam <asim@aslam.me> Date: Tue, 11 Jun 2019 17:49:34 +0100 Subject: [PATCH 2/7] Add memory data store --- data/data.go | 6 +-- data/memory/memory.go | 97 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 3 deletions(-) create mode 100644 data/memory/memory.go diff --git a/data/data.go b/data/data.go index 004c5160..1744c5dc 100644 --- a/data/data.go +++ b/data/data.go @@ -28,7 +28,7 @@ type Data interface { // Record represents a data record type Record struct { - Key string - Value []byte - Expiration time.Duration + Key string + Value []byte + Expiry time.Duration } diff --git a/data/memory/memory.go b/data/memory/memory.go new file mode 100644 index 00000000..12a529af --- /dev/null +++ b/data/memory/memory.go @@ -0,0 +1,97 @@ +// Package memory is a in-memory data store +package memory + +import ( + "sync" + "time" + + "github.com/micro/go-micro/data" + "github.com/micro/go-micro/options" +) + +type memoryData struct { + options.Options + + sync.RWMutex + values map[string]*memoryRecord +} + +type memoryRecord struct { + r *data.Record + c time.Time +} + +func (m *memoryData) Dump() ([]*data.Record, error) { + m.RLock() + defer m.RUnlock() + + var values []*data.Record + + for _, v := range m.values { + // get expiry + d := v.r.Expiry + t := time.Since(v.c) + + // expired + if d > time.Duration(0) && t > d { + continue + } + values = append(values, v.r) + } + + return values, nil +} + +func (m *memoryData) Read(key string) (*data.Record, error) { + m.RLock() + defer m.RUnlock() + + v, ok := m.values[key] + if !ok { + return nil, data.ErrNotFound + } + + // get expiry + d := v.r.Expiry + t := time.Since(v.c) + + // expired + if d > time.Duration(0) && t > d { + return nil, data.ErrNotFound + } + + return v.r, nil +} + +func (m *memoryData) Write(r *data.Record) error { + m.Lock() + defer m.Unlock() + + // set the record + m.values[r.Key] = &memoryRecord{ + r: r, + c: time.Now(), + } + + return nil +} + +func (m *memoryData) Delete(key string) error { + m.Lock() + defer m.Unlock() + + // delete the value + delete(m.values, key) + + return nil +} + +// NewData returns a new data.Data +func NewData(opts ...options.Option) data.Data { + options := options.NewOptions(opts...) + + return &memoryData{ + Options: options, + values: make(map[string]*memoryRecord), + } +} From 43ed8f58f0cc0c17997805dbfc960cd1640a48d3 Mon Sep 17 00:00:00 2001 From: Asim Aslam <asim@aslam.me> Date: Tue, 11 Jun 2019 18:15:18 +0100 Subject: [PATCH 3/7] change wording --- data/data.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data/data.go b/data/data.go index 1744c5dc..a7dbf3c0 100644 --- a/data/data.go +++ b/data/data.go @@ -1,4 +1,4 @@ -// Package data is an interface for key-value storage. +// Package data is an interface for distribute data storage. package data import ( From f81f66c98b688544d007d83ae30b9c5cc7d3a834 Mon Sep 17 00:00:00 2001 From: Asim Aslam <asim@aslam.me> Date: Tue, 11 Jun 2019 18:21:33 +0100 Subject: [PATCH 4/7] Move DB to Map --- sync/{db.go => map.go} | 14 +++++++------- sync/sync.go | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) rename sync/{db.go => map.go} (88%) diff --git a/sync/db.go b/sync/map.go similarity index 88% rename from sync/db.go rename to sync/map.go index b5141d42..d9fcd475 100644 --- a/sync/db.go +++ b/sync/map.go @@ -11,7 +11,7 @@ import ( lock "github.com/micro/go-micro/sync/lock/consul" ) -type syncDB struct { +type syncMap struct { opts Options } @@ -20,7 +20,7 @@ func ekey(k interface{}) string { return base64.StdEncoding.EncodeToString(b) } -func (m *syncDB) Read(key, val interface{}) error { +func (m *syncMap) Read(key, val interface{}) error { if key == nil { return fmt.Errorf("key is nil") } @@ -43,7 +43,7 @@ func (m *syncDB) Read(key, val interface{}) error { return json.Unmarshal(kval.Value, val) } -func (m *syncDB) Write(key, val interface{}) error { +func (m *syncMap) Write(key, val interface{}) error { if key == nil { return fmt.Errorf("key is nil") } @@ -69,7 +69,7 @@ func (m *syncDB) Write(key, val interface{}) error { }) } -func (m *syncDB) Delete(key interface{}) error { +func (m *syncMap) Delete(key interface{}) error { if key == nil { return fmt.Errorf("key is nil") } @@ -84,7 +84,7 @@ func (m *syncDB) Delete(key interface{}) error { return m.opts.Data.Delete(kstr) } -func (m *syncDB) Iterate(fn func(key, val interface{}) error) error { +func (m *syncMap) Iterate(fn func(key, val interface{}) error) error { keyvals, err := m.opts.Data.Dump() if err != nil { return err @@ -137,7 +137,7 @@ func (m *syncDB) Iterate(fn func(key, val interface{}) error) error { return nil } -func NewDB(opts ...Option) DB { +func NewMap(opts ...Option) Map { var options Options for _, o := range opts { o(&options) @@ -151,7 +151,7 @@ func NewDB(opts ...Option) DB { options.Data = ckv.NewData() } - return &syncDB{ + return &syncMap{ opts: options, } } diff --git a/sync/sync.go b/sync/sync.go index b27118e5..54bf26c5 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -9,10 +9,10 @@ import ( "github.com/micro/go-micro/sync/time" ) -// DB provides synchronized access to key-value storage. +// Map provides synchronized access to key-value storage. // It uses the data interface and lock interface to // provide a consistent storage mechanism. -type DB interface { +type Map interface { // Read value with given key Read(key, val interface{}) error // Write value with given key From a5412dd4a04823fb3179f9959b421b09f78591ad Mon Sep 17 00:00:00 2001 From: Asim Aslam <asim@aslam.me> Date: Wed, 12 Jun 2019 07:46:20 +0100 Subject: [PATCH 5/7] Move data to store --- {data => store}/consul/consul.go | 22 +++++++++++----------- {data => store}/memory/memory.go | 28 ++++++++++++++-------------- {data => store}/options.go | 8 ++++---- data/data.go => store/store.go | 8 ++++---- 4 files changed, 33 insertions(+), 33 deletions(-) rename {data => store}/consul/consul.go (75%) rename {data => store}/memory/memory.go (63%) rename {data => store}/options.go (55%) rename data/data.go => store/store.go (77%) diff --git a/data/consul/consul.go b/store/consul/consul.go similarity index 75% rename from data/consul/consul.go rename to store/consul/consul.go index 58c62495..9fa1634f 100644 --- a/data/consul/consul.go +++ b/store/consul/consul.go @@ -6,8 +6,8 @@ import ( "net" "github.com/hashicorp/consul/api" - "github.com/micro/go-micro/data" "github.com/micro/go-micro/options" + "github.com/micro/go-micro/store" ) type ckv struct { @@ -15,17 +15,17 @@ type ckv struct { client *api.Client } -func (c *ckv) Read(key string) (*data.Record, error) { +func (c *ckv) Read(key string) (*store.Record, error) { keyval, _, err := c.client.KV().Get(key, nil) if err != nil { return nil, err } if keyval == nil { - return nil, data.ErrNotFound + return nil, store.ErrNotFound } - return &data.Record{ + return &store.Record{ Key: keyval.Key, Value: keyval.Value, }, nil @@ -36,7 +36,7 @@ func (c *ckv) Delete(key string) error { return err } -func (c *ckv) Write(record *data.Record) error { +func (c *ckv) Write(record *store.Record) error { _, err := c.client.KV().Put(&api.KVPair{ Key: record.Key, Value: record.Value, @@ -44,17 +44,17 @@ func (c *ckv) Write(record *data.Record) error { return err } -func (c *ckv) Dump() ([]*data.Record, error) { +func (c *ckv) Dump() ([]*store.Record, error) { keyval, _, err := c.client.KV().List("/", nil) if err != nil { return nil, err } if keyval == nil { - return nil, data.ErrNotFound + return nil, store.ErrNotFound } - var vals []*data.Record + var vals []*store.Record for _, keyv := range keyval { - vals = append(vals, &data.Record{ + vals = append(vals, &store.Record{ Key: keyv.Key, Value: keyv.Value, }) @@ -66,13 +66,13 @@ func (c *ckv) String() string { return "consul" } -func NewData(opts ...options.Option) data.Data { +func NewStore(opts ...options.Option) store.Store { options := options.NewOptions(opts...) config := api.DefaultConfig() var nodes []string - if n, ok := options.Values().Get("data.nodes"); ok { + if n, ok := options.Values().Get("store.nodes"); ok { nodes = n.([]string) } diff --git a/data/memory/memory.go b/store/memory/memory.go similarity index 63% rename from data/memory/memory.go rename to store/memory/memory.go index 12a529af..4c1d6c8a 100644 --- a/data/memory/memory.go +++ b/store/memory/memory.go @@ -1,15 +1,15 @@ -// Package memory is a in-memory data store +// Package memory is a in-memory store store package memory import ( "sync" "time" - "github.com/micro/go-micro/data" "github.com/micro/go-micro/options" + "github.com/micro/go-micro/store" ) -type memoryData struct { +type memoryStore struct { options.Options sync.RWMutex @@ -17,15 +17,15 @@ type memoryData struct { } type memoryRecord struct { - r *data.Record + r *store.Record c time.Time } -func (m *memoryData) Dump() ([]*data.Record, error) { +func (m *memoryStore) Dump() ([]*store.Record, error) { m.RLock() defer m.RUnlock() - var values []*data.Record + var values []*store.Record for _, v := range m.values { // get expiry @@ -42,13 +42,13 @@ func (m *memoryData) Dump() ([]*data.Record, error) { return values, nil } -func (m *memoryData) Read(key string) (*data.Record, error) { +func (m *memoryStore) Read(key string) (*store.Record, error) { m.RLock() defer m.RUnlock() v, ok := m.values[key] if !ok { - return nil, data.ErrNotFound + return nil, store.ErrNotFound } // get expiry @@ -57,13 +57,13 @@ func (m *memoryData) Read(key string) (*data.Record, error) { // expired if d > time.Duration(0) && t > d { - return nil, data.ErrNotFound + return nil, store.ErrNotFound } return v.r, nil } -func (m *memoryData) Write(r *data.Record) error { +func (m *memoryStore) Write(r *store.Record) error { m.Lock() defer m.Unlock() @@ -76,7 +76,7 @@ func (m *memoryData) Write(r *data.Record) error { return nil } -func (m *memoryData) Delete(key string) error { +func (m *memoryStore) Delete(key string) error { m.Lock() defer m.Unlock() @@ -86,11 +86,11 @@ func (m *memoryData) Delete(key string) error { return nil } -// NewData returns a new data.Data -func NewData(opts ...options.Option) data.Data { +// NewStore returns a new store.Store +func NewStore(opts ...options.Option) store.Store { options := options.NewOptions(opts...) - return &memoryData{ + return &memoryStore{ Options: options, values: make(map[string]*memoryRecord), } diff --git a/data/options.go b/store/options.go similarity index 55% rename from data/options.go rename to store/options.go index 510e0fe3..2f63e582 100644 --- a/data/options.go +++ b/store/options.go @@ -1,15 +1,15 @@ -package data +package store import ( "github.com/micro/go-micro/options" ) -// Set the nodes used to back the data +// Set the nodes used to back the store func Nodes(a ...string) options.Option { - return options.WithValue("data.nodes", a) + return options.WithValue("store.nodes", a) } // Prefix sets a prefix to any key ids used func Prefix(p string) options.Option { - return options.WithValue("data.prefix", p) + return options.WithValue("store.prefix", p) } diff --git a/data/data.go b/store/store.go similarity index 77% rename from data/data.go rename to store/store.go index a7dbf3c0..e8c17eae 100644 --- a/data/data.go +++ b/store/store.go @@ -1,5 +1,5 @@ -// Package data is an interface for distribute data storage. -package data +// Package store is an interface for distribute data storage. +package store import ( "errors" @@ -12,8 +12,8 @@ var ( ErrNotFound = errors.New("not found") ) -// Data is a data storage interface -type Data interface { +// Store is a data storage interface +type Store interface { // embed options options.Options // Dump the known records From 7a1cef46b0b22240073c120446ecc9d4ec88a49f Mon Sep 17 00:00:00 2001 From: Asim Aslam <asim@aslam.me> Date: Wed, 12 Jun 2019 07:50:04 +0100 Subject: [PATCH 6/7] fix broken links --- sync/map.go | 18 +++++++++--------- sync/options.go | 8 ++++---- sync/sync.go | 6 +++--- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sync/map.go b/sync/map.go index d9fcd475..2053c57d 100644 --- a/sync/map.go +++ b/sync/map.go @@ -6,8 +6,8 @@ import ( "encoding/json" "fmt" - "github.com/micro/go-micro/data" - ckv "github.com/micro/go-micro/data/consul" + "github.com/micro/go-micro/store" + ckv "github.com/micro/go-micro/store/consul" lock "github.com/micro/go-micro/sync/lock/consul" ) @@ -34,7 +34,7 @@ func (m *syncMap) Read(key, val interface{}) error { defer m.opts.Lock.Release(kstr) // get key - kval, err := m.opts.Data.Read(kstr) + kval, err := m.opts.Store.Read(kstr) if err != nil { return err } @@ -63,7 +63,7 @@ func (m *syncMap) Write(key, val interface{}) error { } // set key - return m.opts.Data.Write(&data.Record{ + return m.opts.Store.Write(&store.Record{ Key: kstr, Value: b, }) @@ -81,11 +81,11 @@ func (m *syncMap) Delete(key interface{}) error { return err } defer m.opts.Lock.Release(kstr) - return m.opts.Data.Delete(kstr) + return m.opts.Store.Delete(kstr) } func (m *syncMap) Iterate(fn func(key, val interface{}) error) error { - keyvals, err := m.opts.Data.Dump() + keyvals, err := m.opts.Store.Dump() if err != nil { return err } @@ -126,7 +126,7 @@ func (m *syncMap) Iterate(fn func(key, val interface{}) error) error { } // set key - if err := m.opts.Data.Write(&data.Record{ + if err := m.opts.Store.Write(&store.Record{ Key: keyval.Key, Value: b, }); err != nil { @@ -147,8 +147,8 @@ func NewMap(opts ...Option) Map { options.Lock = lock.NewLock() } - if options.Data == nil { - options.Data = ckv.NewData() + if options.Store == nil { + options.Store = ckv.NewStore() } return &syncMap{ diff --git a/sync/options.go b/sync/options.go index 2922b77c..0e5cf3d0 100644 --- a/sync/options.go +++ b/sync/options.go @@ -1,7 +1,7 @@ package sync import ( - "github.com/micro/go-micro/data" + "github.com/micro/go-micro/store" "github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/time" @@ -21,10 +21,10 @@ func WithLock(l lock.Lock) Option { } } -// WithData sets the data implementation option -func WithData(s data.Data) Option { +// WithStore sets the store implementation option +func WithStore(s store.Store) Option { return func(o *Options) { - o.Data = s + o.Store = s } } diff --git a/sync/sync.go b/sync/sync.go index 54bf26c5..7b080b1d 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -2,7 +2,7 @@ package sync import ( - "github.com/micro/go-micro/data" + "github.com/micro/go-micro/store" "github.com/micro/go-micro/sync/leader" "github.com/micro/go-micro/sync/lock" "github.com/micro/go-micro/sync/task" @@ -10,7 +10,7 @@ import ( ) // Map provides synchronized access to key-value storage. -// It uses the data interface and lock interface to +// It uses the store interface and lock interface to // provide a consistent storage mechanism. type Map interface { // Read value with given key @@ -33,7 +33,7 @@ type Cron interface { type Options struct { Leader leader.Leader Lock lock.Lock - Data data.Data + Store store.Store Task task.Task Time time.Time } From 000e25a4b22dcc4beef863d18b8612b9f5d3464a Mon Sep 17 00:00:00 2001 From: Asim Aslam <asim@aslam.me> Date: Wed, 12 Jun 2019 12:05:34 +0100 Subject: [PATCH 7/7] use the router --- server/rpc_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/rpc_server.go b/server/rpc_server.go index 22fca2d9..9c4a3ef1 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -40,7 +40,7 @@ func newRpcServer(opts ...Option) Server { return &rpcServer{ opts: options, - router: DefaultRouter, + router: router, handlers: make(map[string]Handler), subscribers: make(map[*subscriber][]broker.Subscriber), exit: make(chan chan error),