1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-17 17:44:30 +02:00
go-micro/store/etcd/etcd.go
Jake Sanders 3324d140c0
Rename store Namespace / Prefix options to Database and Table (#1492)
* Rename Namespace to DB, Rename Prefix to table, Remove Suffix Option

* Rename options

* Rename options

* Add store_table option

* Table per service, not Database per service
2020-04-06 16:45:55 +01:00

269 lines
6.7 KiB
Go

// Package etcd implements a go-micro/v2/store with etcd
package etcd
import (
"bytes"
"context"
"encoding/gob"
"math"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/namespace"
"github.com/micro/go-micro/v2/store"
"github.com/pkg/errors"
)
type etcdStore struct {
options store.Options
client *clientv3.Client
config clientv3.Config
}
// NewStore returns a new etcd store
func NewStore(opts ...store.Option) store.Store {
e := &etcdStore{}
for _, o := range opts {
o(&e.options)
}
e.init()
return e
}
func (e *etcdStore) Init(opts ...store.Option) error {
for _, o := range opts {
o(&e.options)
}
return e.init()
}
func (e *etcdStore) init() error {
// ensure context is non-nil
e.options.Context = context.Background()
// set up config
e.config = clientv3.Config{}
e.applyConfig(&e.config)
if len(e.options.Nodes) == 0 {
e.config.Endpoints = []string{"http://127.0.0.1:2379"}
} else {
e.config.Endpoints = make([]string, len(e.options.Nodes))
copy(e.config.Endpoints, e.options.Nodes)
}
if e.client != nil {
e.client.Close()
}
client, err := clientv3.New(e.config)
if err != nil {
return err
}
e.client = client
ns := ""
if len(e.options.Table) > 0 {
ns = e.options.Table
}
if len(e.options.Database) > 0 {
ns = e.options.Database + "/" + ns
}
if len(ns) > 0 {
e.client.KV = namespace.NewKV(e.client.KV, ns)
e.client.Watcher = namespace.NewWatcher(e.client.Watcher, ns)
e.client.Lease = namespace.NewLease(e.client.Lease, ns)
}
return nil
}
func (e *etcdStore) Options() store.Options {
return e.options
}
func (e *etcdStore) String() string {
return "etcd"
}
func (e *etcdStore) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
readOpts := store.ReadOptions{}
for _, o := range opts {
o(&readOpts)
}
if readOpts.Suffix {
return e.readSuffix(key, readOpts)
}
var etcdOpts []clientv3.OpOption
if readOpts.Prefix {
etcdOpts = append(etcdOpts, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
}
resp, err := e.client.KV.Get(context.Background(), key, etcdOpts...)
if err != nil {
return nil, err
}
if resp.Count == 0 && !(readOpts.Prefix || readOpts.Suffix) {
return nil, store.ErrNotFound
}
var records []*store.Record
for _, kv := range resp.Kvs {
ir := internalRecord{}
if err := gob.NewDecoder(bytes.NewReader(kv.Value)).Decode(&ir); err != nil {
return records, errors.Wrapf(err, "couldn't decode %s into internalRecord", err.Error())
}
r := store.Record{
Key: ir.Key,
Value: ir.Value,
}
if !ir.ExpiresAt.IsZero() {
r.Expiry = time.Until(ir.ExpiresAt)
}
records = append(records, &r)
}
if readOpts.Limit > 0 || readOpts.Offset > 0 {
return records[readOpts.Offset:min(readOpts.Limit, uint(len(records)))], nil
}
return records, nil
}
func (e *etcdStore) readSuffix(key string, readOpts store.ReadOptions) ([]*store.Record, error) {
opts := []store.ListOption{store.ListSuffix(key)}
if readOpts.Prefix {
opts = append(opts, store.ListPrefix(key))
}
keys, err := e.List(opts...)
if err != nil {
return nil, errors.Wrapf(err, "Couldn't list with suffix %s", key)
}
var records []*store.Record
for _, k := range keys {
resp, err := e.client.KV.Get(context.Background(), k)
if err != nil {
return nil, errors.Wrapf(err, "Couldn't get key %s", k)
}
ir := internalRecord{}
if err := gob.NewDecoder(bytes.NewReader(resp.Kvs[0].Value)).Decode(&ir); err != nil {
return records, errors.Wrapf(err, "couldn't decode %s into internalRecord", err.Error())
}
r := store.Record{
Key: ir.Key,
Value: ir.Value,
}
if !ir.ExpiresAt.IsZero() {
r.Expiry = time.Until(ir.ExpiresAt)
}
records = append(records, &r)
}
if readOpts.Limit > 0 || readOpts.Offset > 0 {
return records[readOpts.Offset:min(readOpts.Limit, uint(len(records)))], nil
}
return records, nil
}
func (e *etcdStore) Write(r *store.Record, opts ...store.WriteOption) error {
options := store.WriteOptions{}
for _, o := range opts {
o(&options)
}
if len(opts) > 0 {
// Copy the record before applying options, or the incoming record will be mutated
newRecord := store.Record{}
newRecord.Key = r.Key
newRecord.Value = make([]byte, len(r.Value))
copy(newRecord.Value, r.Value)
newRecord.Expiry = r.Expiry
if !options.Expiry.IsZero() {
newRecord.Expiry = time.Until(options.Expiry)
}
if options.TTL != 0 {
newRecord.Expiry = options.TTL
}
return e.write(&newRecord)
}
return e.write(r)
}
func (e *etcdStore) write(r *store.Record) error {
var putOpts []clientv3.OpOption
ir := &internalRecord{}
ir.Key = r.Key
ir.Value = make([]byte, len(r.Value))
copy(ir.Value, r.Value)
if r.Expiry != 0 {
ir.ExpiresAt = time.Now().Add(r.Expiry)
var leasexpiry int64
if r.Expiry.Seconds() < 5.0 {
// minimum etcd lease is 5 seconds
leasexpiry = 5
} else {
leasexpiry = int64(math.Ceil(r.Expiry.Seconds()))
}
lr, err := e.client.Lease.Grant(context.Background(), leasexpiry)
if err != nil {
return errors.Wrapf(err, "couldn't grant an etcd lease for %s", r.Key)
}
putOpts = append(putOpts, clientv3.WithLease(lr.ID))
}
b := &bytes.Buffer{}
if err := gob.NewEncoder(b).Encode(ir); err != nil {
return errors.Wrapf(err, "couldn't encode %s", r.Key)
}
_, err := e.client.KV.Put(context.Background(), ir.Key, string(b.Bytes()), putOpts...)
return errors.Wrapf(err, "couldn't put key %s in to etcd", err)
}
func (e *etcdStore) Delete(key string, opts ...store.DeleteOption) error {
options := store.DeleteOptions{}
for _, o := range opts {
o(&options)
}
_, err := e.client.KV.Delete(context.Background(), key)
return errors.Wrapf(err, "couldn't delete key %s", key)
}
func (e *etcdStore) List(opts ...store.ListOption) ([]string, error) {
options := store.ListOptions{}
for _, o := range opts {
o(&options)
}
searchPrefix := ""
if len(options.Prefix) > 0 {
searchPrefix = options.Prefix
}
resp, err := e.client.KV.Get(context.Background(), searchPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, errors.Wrap(err, "couldn't list, etcd get failed")
}
if len(options.Suffix) == 0 {
keys := make([]string, resp.Count)
for i, kv := range resp.Kvs {
keys[i] = string(kv.Key)
}
return keys, nil
}
keys := []string{}
for _, kv := range resp.Kvs {
if strings.HasSuffix(string(kv.Key), options.Suffix) {
keys = append(keys, string(kv.Key))
}
}
if options.Limit > 0 || options.Offset > 0 {
return keys[options.Offset:min(options.Limit, uint(len(keys)))], nil
}
return keys, nil
}
type internalRecord struct {
Key string
Value []byte
ExpiresAt time.Time
}
func min(i, j uint) uint {
if i < j {
return i
}
return j
}