mirror of
https://github.com/go-micro/go-micro.git
synced 2025-02-10 18:31:40 +02:00
191 lines
3.2 KiB
Go
191 lines
3.2 KiB
Go
package memcached
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
mc "github.com/bradfitz/gomemcache/memcache"
|
|
log "go-micro.dev/v4/logger"
|
|
"go-micro.dev/v4/store"
|
|
)
|
|
|
|
type mkv struct {
|
|
options store.Options
|
|
Server *mc.ServerList
|
|
Client *mc.Client
|
|
}
|
|
|
|
func (m *mkv) Init(opts ...store.Option) error {
|
|
for _, o := range opts {
|
|
o(&m.options)
|
|
}
|
|
return m.configure()
|
|
}
|
|
|
|
func (m *mkv) Options() store.Options {
|
|
return m.options
|
|
}
|
|
|
|
func (m *mkv) Close() error {
|
|
// memcaced does not supports close?
|
|
return nil
|
|
}
|
|
|
|
func (m *mkv) Read(key string, opts ...store.ReadOption) ([]*store.Record, error) {
|
|
// TODO: implement read options
|
|
records := make([]*store.Record, 0, 1)
|
|
|
|
keyval, err := m.Client.Get(key)
|
|
if err != nil && err == mc.ErrCacheMiss {
|
|
return nil, store.ErrNotFound
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if keyval == nil {
|
|
return nil, store.ErrNotFound
|
|
}
|
|
|
|
records = append(records, &store.Record{
|
|
Key: keyval.Key,
|
|
Value: keyval.Value,
|
|
Expiry: time.Second * time.Duration(keyval.Expiration),
|
|
})
|
|
|
|
return records, nil
|
|
}
|
|
|
|
func (m *mkv) Delete(key string, opts ...store.DeleteOption) error {
|
|
return m.Client.Delete(key)
|
|
}
|
|
|
|
func (m *mkv) Write(record *store.Record, opts ...store.WriteOption) error {
|
|
return m.Client.Set(&mc.Item{
|
|
Key: record.Key,
|
|
Value: record.Value,
|
|
Expiration: int32(record.Expiry.Seconds()),
|
|
})
|
|
}
|
|
|
|
func (m *mkv) List(opts ...store.ListOption) ([]string, error) {
|
|
// stats
|
|
// cachedump
|
|
// get keys
|
|
|
|
var keys []string
|
|
|
|
//store := make(map[string]string)
|
|
if err := m.Server.Each(func(c net.Addr) error {
|
|
cc, err := net.Dial("tcp", c.String())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer cc.Close()
|
|
|
|
b := bufio.NewReadWriter(bufio.NewReader(cc), bufio.NewWriter(cc))
|
|
|
|
// get records
|
|
if _, err := fmt.Fprintf(b, "stats records\r\n"); err != nil {
|
|
return err
|
|
}
|
|
|
|
b.Flush()
|
|
|
|
v, err := b.ReadSlice('\n')
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
parts := bytes.Split(v, []byte("\n"))
|
|
if len(parts) < 1 {
|
|
return nil
|
|
}
|
|
vals := strings.Split(string(parts[0]), ":")
|
|
records := vals[1]
|
|
|
|
// drain
|
|
for {
|
|
buf, err := b.ReadSlice('\n')
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if strings.HasPrefix(string(buf), "END") {
|
|
break
|
|
}
|
|
}
|
|
|
|
b.Writer.Reset(cc)
|
|
b.Reader.Reset(cc)
|
|
|
|
if _, err := fmt.Fprintf(b, "lru_crawler metadump %s\r\n", records); err != nil {
|
|
return err
|
|
}
|
|
b.Flush()
|
|
|
|
for {
|
|
v, err := b.ReadString('\n')
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if strings.HasPrefix(v, "END") {
|
|
break
|
|
}
|
|
key := strings.Split(v, " ")[0]
|
|
keys = append(keys, strings.TrimPrefix(key, "key="))
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return keys, nil
|
|
}
|
|
|
|
func (m *mkv) String() string {
|
|
return "memcached"
|
|
}
|
|
|
|
func NewStore(opts ...store.Option) store.Store {
|
|
var options store.Options
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
s := new(mkv)
|
|
s.options = options
|
|
|
|
if err := s.configure(); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (m *mkv) configure() error {
|
|
nodes := m.options.Nodes
|
|
|
|
if len(nodes) == 0 {
|
|
nodes = []string{"127.0.0.1:11211"}
|
|
}
|
|
|
|
ss := new(mc.ServerList)
|
|
ss.SetServers(nodes...)
|
|
|
|
m.Server = ss
|
|
m.Client = mc.New(nodes...)
|
|
|
|
return nil
|
|
}
|