mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-10 00:29:01 +02:00
289 lines
6.3 KiB
Go
289 lines
6.3 KiB
Go
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/bilibili/kratos/pkg/log"
|
|
"github.com/bilibili/kratos/pkg/naming"
|
|
"github.com/coreos/etcd/clientv3"
|
|
"github.com/coreos/etcd/mvcc/mvccpb"
|
|
)
|
|
|
|
const (
|
|
//Prefix is a etcd globe key prefix
|
|
Prefix = "kratos_etcd3"
|
|
|
|
RegisterTTL = 30
|
|
)
|
|
|
|
var (
|
|
_once sync.Once
|
|
_builder naming.Builder
|
|
//ErrDuplication is a register duplication err
|
|
ErrDuplication = errors.New("etcd: instance duplicate registration")
|
|
)
|
|
|
|
// Builder return default etcd resolver builder.
|
|
func Builder(c *clientv3.Config) naming.Builder {
|
|
_once.Do(func() {
|
|
_builder,_ = New(c)
|
|
})
|
|
return _builder
|
|
}
|
|
|
|
// Build register resolver into default etcd.
|
|
func Build(c *clientv3.Config, id string) naming.Resolver {
|
|
return Builder(c).Build(id)
|
|
}
|
|
|
|
//EtcdBuilder is a etcd clientv3 EtcdBuilder
|
|
type EtcdBuilder struct {
|
|
cli *clientv3.Client
|
|
ctx context.Context
|
|
cancelFunc context.CancelFunc
|
|
|
|
mutex sync.RWMutex
|
|
apps map[string]*appInfo
|
|
registry map[string]struct{}
|
|
}
|
|
type appInfo struct {
|
|
resolver map[*Resolve]struct{}
|
|
ins atomic.Value
|
|
e *EtcdBuilder
|
|
once sync.Once
|
|
}
|
|
|
|
// Resolve etch resolver.
|
|
type Resolve struct {
|
|
id string
|
|
event chan struct{}
|
|
e *EtcdBuilder
|
|
}
|
|
//New is new a etcdbuilder
|
|
func New(c *clientv3.Config) (e *EtcdBuilder,err error) {
|
|
cli, err := clientv3.New(*c)
|
|
if err != nil {
|
|
return nil,err
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
e = &EtcdBuilder{
|
|
cli: cli,
|
|
ctx: ctx,
|
|
cancelFunc: cancel,
|
|
apps: map[string]*appInfo{},
|
|
registry: map[string]struct{}{},
|
|
}
|
|
return
|
|
}
|
|
|
|
// Build disovery resovler builder.
|
|
func (e *EtcdBuilder) Build(appid string) naming.Resolver {
|
|
r := &Resolve{
|
|
id: appid,
|
|
e: e,
|
|
event: make(chan struct{}, 1),
|
|
}
|
|
e.mutex.Lock()
|
|
app, ok := e.apps[appid]
|
|
if !ok {
|
|
app = &appInfo{
|
|
resolver: make(map[*Resolve]struct{}),
|
|
e: e,
|
|
}
|
|
e.apps[appid] = app
|
|
}
|
|
app.resolver[r] = struct{}{}
|
|
e.mutex.Unlock()
|
|
if ok {
|
|
select {
|
|
case r.event <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
app.once.Do(func() {
|
|
go app.watch(appid)
|
|
log.Info("etcd: AddWatch(%s) already watch(%v)", appid, ok)
|
|
})
|
|
return r
|
|
}
|
|
|
|
// Scheme return etcd's scheme
|
|
func (e *EtcdBuilder) Scheme() string {
|
|
return "etcd"
|
|
|
|
}
|
|
|
|
//Register is register instance
|
|
func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) {
|
|
e.mutex.Lock()
|
|
if _, ok := e.registry[ins.AppID]; ok {
|
|
err = ErrDuplication
|
|
} else {
|
|
e.registry[ins.AppID] = struct{}{}
|
|
}
|
|
e.mutex.Unlock()
|
|
if err != nil {
|
|
return
|
|
}
|
|
ctx, cancel := context.WithCancel(e.ctx)
|
|
if err = e.register(ctx, ins); err != nil {
|
|
e.mutex.Lock()
|
|
delete(e.registry, ins.AppID)
|
|
e.mutex.Unlock()
|
|
cancel()
|
|
return
|
|
}
|
|
ch := make(chan struct{}, 1)
|
|
cancelFunc = context.CancelFunc(func() {
|
|
cancel()
|
|
<-ch
|
|
})
|
|
|
|
go func() {
|
|
//提前2秒续约 避免续约操作缓慢时租约过期
|
|
ticker := time.NewTicker((RegisterTTL - 2) * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
_ = e.register(ctx, ins)
|
|
case <-ctx.Done():
|
|
_ = e.unregister(ins)
|
|
ch <- struct{}{}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
|
|
//注册和续约公用一个操作
|
|
func (e *EtcdBuilder) register(ctx context.Context, ins *naming.Instance) (err error) {
|
|
prefix := e.getprefix(ins)
|
|
val, _ := json.Marshal(ins)
|
|
|
|
ttlResp, err := e.cli.Grant(context.TODO(), int64(RegisterTTL))
|
|
if err != nil {
|
|
log.Error("etcd: register client.Grant(%v) error(%v)", RegisterTTL, err)
|
|
return err
|
|
}
|
|
_, err = e.cli.Put(ctx, prefix, string(val), clientv3.WithLease(ttlResp.ID))
|
|
if err != nil {
|
|
log.Error("etcd: register client.Put(%v) appid(%s) hostname(%s) error(%v)",
|
|
prefix, ins.AppID, ins.Hostname, err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
func (e *EtcdBuilder) unregister(ins *naming.Instance) (err error) {
|
|
prefix := e.getprefix(ins)
|
|
|
|
if _, err = e.cli.Delete(context.TODO(), prefix); err != nil {
|
|
log.Error("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) error(%v)",
|
|
prefix, ins.AppID, ins.Hostname, err)
|
|
}
|
|
log.Info("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) success",
|
|
prefix, ins.AppID, ins.Hostname)
|
|
return
|
|
}
|
|
|
|
func (e *EtcdBuilder) getprefix(ins *naming.Instance) string {
|
|
return fmt.Sprintf("/%s/%s/%s", Prefix, ins.AppID, ins.Hostname)
|
|
}
|
|
|
|
// Close stop all running process including etcdfetch and register
|
|
func (e *EtcdBuilder) Close() error {
|
|
e.cancelFunc()
|
|
return nil
|
|
}
|
|
func (a *appInfo) watch(appID string) {
|
|
_ = a.fetchstore(appID)
|
|
prefix := fmt.Sprintf("/%s/%s/", Prefix, appID)
|
|
rch := a.e.cli.Watch(a.e.ctx, prefix, clientv3.WithPrefix())
|
|
for wresp := range rch {
|
|
for _, ev := range wresp.Events {
|
|
if ev.Type == mvccpb.PUT || ev.Type == mvccpb.DELETE {
|
|
_ = a.fetchstore(appID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *appInfo) fetchstore(appID string) (err error) {
|
|
prefix := fmt.Sprintf("/%s/%s/", Prefix, appID)
|
|
resp, err := a.e.cli.Get(a.e.ctx, prefix, clientv3.WithPrefix())
|
|
if err != nil {
|
|
log.Error("etcd: fetch client.Get(%s) error(%+v)", prefix, err)
|
|
return err
|
|
}
|
|
|
|
ins, err := a.paserIns(resp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
a.store(ins)
|
|
return nil
|
|
}
|
|
func (a *appInfo) store(ins *naming.InstancesInfo) {
|
|
|
|
a.ins.Store(ins)
|
|
a.e.mutex.RLock()
|
|
for rs := range a.resolver {
|
|
select {
|
|
case rs.event <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
a.e.mutex.RUnlock()
|
|
}
|
|
|
|
func (a *appInfo) paserIns(resp *clientv3.GetResponse) (ins *naming.InstancesInfo, err error) {
|
|
ins = &naming.InstancesInfo{
|
|
Instances: make(map[string][]*naming.Instance, 0),
|
|
}
|
|
for _, ev := range resp.Kvs {
|
|
in := new(naming.Instance)
|
|
|
|
err := json.Unmarshal(ev.Value, in)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in)
|
|
}
|
|
return ins, nil
|
|
}
|
|
|
|
// Watch watch instance.
|
|
func (r *Resolve) Watch() <-chan struct{} {
|
|
return r.event
|
|
}
|
|
|
|
// Fetch fetch resolver instance.
|
|
func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) {
|
|
r.e.mutex.RLock()
|
|
app, ok := r.e.apps[r.id]
|
|
r.e.mutex.RUnlock()
|
|
if ok {
|
|
ins, ok = app.ins.Load().(*naming.InstancesInfo)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// Close close resolver.
|
|
func (r *Resolve) Close() error {
|
|
r.e.mutex.Lock()
|
|
if app, ok := r.e.apps[r.id]; ok && len(app.resolver) != 0 {
|
|
delete(app.resolver, r)
|
|
}
|
|
r.e.mutex.Unlock()
|
|
return nil
|
|
}
|