1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-02-10 18:31:40 +02:00
2021-10-12 12:55:53 +01:00

191 lines
3.2 KiB
Go

package memcached
import (
"bufio"
"bytes"
"fmt"
"io"
"net"
"strings"
"time"
mc "github.com/bradfitz/gomemcache/memcache"
log "go-micro.dev/v4/logger"
"go-micro.dev/v4/store"
)
type mkv struct {
options store.Options
Server *mc.ServerList
Client *mc.Client
}
func (m *mkv) Init(opts ...store.Option) error {
for _, o := range opts {
o(&m.options)
}
return m.configure()
}
func (m *mkv) Options() store.Options {
return m.options
}
func (m *mkv) Close() error {
// memcaced does not supports close?
return nil
}
func (m *mkv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
// TODO: implement read options
records := make([]*store.Record, 0, 1)
keyval, err := m.Client.Get(key)
if err != nil && err == mc.ErrCacheMiss {
return nil, store.ErrNotFound
} else if err != nil {
return nil, err
}
if keyval == nil {
return nil, store.ErrNotFound
}
records = append(records, &store.Record{
Key: keyval.Key,
Value: keyval.Value,
Expiry: time.Second * time.Duration(keyval.Expiration),
})
return records, nil
}
func (m *mkv) Delete(key string, opts ...store.DeleteOption) error {
return m.Client.Delete(key)
}
func (m *mkv) Write(record *store.Record, opts ...store.WriteOption) error {
return m.Client.Set(&mc.Item{
Key: record.Key,
Value: record.Value,
Expiration: int32(record.Expiry.Seconds()),
})
}
func (m *mkv) List(opts ...store.ListOption) ([]string, error) {
// stats
// cachedump
// get keys
var keys []string
//store := make(map[string]string)
if err := m.Server.Each(func(c net.Addr) error {
cc, err := net.Dial("tcp", c.String())
if err != nil {
return err
}
defer cc.Close()
b := bufio.NewReadWriter(bufio.NewReader(cc), bufio.NewWriter(cc))
// get records
if _, err := fmt.Fprintf(b, "stats records\r\n"); err != nil {
return err
}
b.Flush()
v, err := b.ReadSlice('\n')
if err != nil {
return err
}
parts := bytes.Split(v, []byte("\n"))
if len(parts) < 1 {
return nil
}
vals := strings.Split(string(parts[0]), ":")
records := vals[1]
// drain
for {
buf, err := b.ReadSlice('\n')
if err == io.EOF {
break
}
if err != nil {
return err
}
if strings.HasPrefix(string(buf), "END") {
break
}
}
b.Writer.Reset(cc)
b.Reader.Reset(cc)
if _, err := fmt.Fprintf(b, "lru_crawler metadump %s\r\n", records); err != nil {
return err
}
b.Flush()
for {
v, err := b.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
return err
}
if strings.HasPrefix(v, "END") {
break
}
key := strings.Split(v, " ")[0]
keys = append(keys, strings.TrimPrefix(key, "key="))
}
return nil
}); err != nil {
return nil, err
}
return keys, nil
}
func (m *mkv) String() string {
return "memcached"
}
func NewStore(opts ...store.Option) store.Store {
var options store.Options
for _, o := range opts {
o(&options)
}
s := new(mkv)
s.options = options
if err := s.configure(); err != nil {
log.Fatal(err)
}
return s
}
func (m *mkv) configure() error {
nodes := m.options.Nodes
if len(nodes) == 0 {
nodes = []string{"127.0.0.1:11211"}
}
ss := new(mc.ServerList)
ss.SetServers(nodes...)
m.Server = ss
m.Client = mc.New(nodes...)
return nil
}