diff --git a/go.mod b/go.mod index ce078a8e7..67d49efd3 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect github.com/go-sql-driver/mysql v1.4.1 + github.com/go-zookeeper/zk v1.0.1 // indirect github.com/gogo/protobuf v1.3.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 // indirect diff --git a/go.sum b/go.sum index 1f489a404..65b167ca5 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEK github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-zookeeper/zk v1.0.1 h1:LmXNmSnkNsNKai+aDu6sHRr8ZJzIrHJo8z8Z4sm8cT8= +github.com/go-zookeeper/zk v1.0.1/go.mod h1:gpJdHazfkmlg4V0rt0vYeHYJHSL8hHFwV0qOd+HRTJE= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go new file mode 100644 index 000000000..778e8a5a3 --- /dev/null +++ b/pkg/naming/zookeeper/zookeeper.go @@ -0,0 +1,397 @@ +package zookeeper + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/bilibili/kratos/pkg/log" + "github.com/bilibili/kratos/pkg/naming" + "github.com/go-zookeeper/zk" +) + +const ( + RootPath = "/" +) + +type Config struct { + // Endpoints is a list of URLs. + Endpoints []string `json:"endpoints"` +} + +var ( + _once sync.Once + _builder naming.Builder + + //ErrDuplication is a register duplication err + ErrDuplication = errors.New("zookeeper: instance duplicate registration") +) + +// Builder return default zookeeper resolver builder. +func Builder(c *Config) naming.Builder { + _once.Do(func() { + _builder, _ = New(c) + }) + return _builder +} + +// Build register resolver into default zookeeper. +func Build(c *Config, id string) naming.Resolver { + return Builder(c).Build(id) +} + +// ZookeeperBuilder is a zookeeper client Builder +type ZookeeperBuilder struct { + cli *zk.Conn + connEvent <-chan zk.Event + ctx context.Context + cancelFunc context.CancelFunc + + mutex sync.RWMutex + apps map[string]*appInfo + registry map[string]struct{} +} + +type appInfo struct { + resolver map[*Resolve]struct{} + ins atomic.Value + zkb *ZookeeperBuilder + once sync.Once +} + +// Resolve zookeeper resolver. +type Resolve struct { + id string + event chan struct{} + zkb *ZookeeperBuilder +} + +// New is new a zookeeper builder +func New(c *Config) (zkb *ZookeeperBuilder, err error) { + //example: endpointSli = []string{"192.168.1.78:2181", "192.168.1.79:2181", "192.168.1.80:2181"} + if len(c.Endpoints) == 0 { + errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null") + log.Error(errInfo) + return nil, errors.New(errInfo) + } + + zkConn, connEvent, err := zk.Connect(c.Endpoints, 5*time.Second) + if err != nil { + log.Error(fmt.Sprintf("zk Connect err:(%v)", err)) + return + } else { + log.Info(fmt.Sprintf("zk Connect ok!")) + } + + ctx, cancel := context.WithCancel(context.Background()) + zkb = &ZookeeperBuilder{ + cli: zkConn, + connEvent: connEvent, + ctx: ctx, + cancelFunc: cancel, + apps: map[string]*appInfo{}, + registry: map[string]struct{}{}, + } + return +} + +// Build zookeeper resovler builder. +func (z *ZookeeperBuilder) Build(appid string) naming.Resolver { + r := &Resolve{ + id: appid, + zkb: z, + event: make(chan struct{}, 1), + } + z.mutex.Lock() + app, ok := z.apps[appid] + if !ok { + app = &appInfo{ + resolver: make(map[*Resolve]struct{}), + zkb: z, + } + z.apps[appid] = app + } + app.resolver[r] = struct{}{} + z.mutex.Unlock() + if ok { + select { + case r.event <- struct{}{}: + default: + } + } + + app.once.Do(func() { + go app.watch(appid) + log.Info("zookeeper: AddWatch(%s) already watch(%v)", appid, ok) + }) + return r +} + +// Scheme return zookeeper's scheme +func (z *ZookeeperBuilder) Scheme() string { + return "zookeeper" +} + +// Register is register instance +func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { + z.mutex.Lock() + if _, ok := z.registry[ins.AppID]; ok { + err = ErrDuplication + } else { + z.registry[ins.AppID] = struct{}{} + } + z.mutex.Unlock() + if err != nil { + return + } + ctx, cancel := context.WithCancel(z.ctx) + if err = z.register(ctx, ins); err != nil { + z.mutex.Lock() + delete(z.registry, ins.AppID) + z.mutex.Unlock() + cancel() + return + } + ch := make(chan struct{}, 1) + cancelFunc = context.CancelFunc(func() { + cancel() + <-ch + }) + + go func() { + for { + select { + case connEvent := <-z.connEvent: + log.Warn("watch zkClient state, connEvent:(%v)", connEvent) + if connEvent.State == zk.StateHasSession { + log.Warn("watch zkClient state, state is StateHasSession...") + err = z.register(ctx, ins) + if err != nil { + log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err)) + continue + } + } + case <-time.After(time.Second): + } + } + }() + return +} + +func (z *ZookeeperBuilder) registerPerServer(name string) (err error) { + var ( + str string + ) + + str, err = z.cli.Create(name, nil, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("registerPerServer, fail to Create node:(%s). err:(%v)", name, err)) + } else { + log.Info(fmt.Sprintf("registerPerServer, succeed to Create node:(%s). retStr:(%s)", name, str)) + } + + return +} + +func (z *ZookeeperBuilder) registerEphServer(name, host string, ins *naming.Instance) (err error) { + var ( + str string + ) + + val, _ := json.Marshal(ins) + log.Info(fmt.Sprintf("registerEphServer, ins after json.Marshal:(%v)", string(val))) + + str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("registerEphServer, fail to Create node:%s. err:(%v)", name+host, err)) + } else { + log.Info(fmt.Sprintf("registerEphServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) + } + + return +} + +// register 注册zookeeper节点 +func (z *ZookeeperBuilder) register(ctx context.Context, ins *naming.Instance) (err error) { + log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs) + prefix := z.keyPrefix(ins) + + err = z.registerPerServer(prefix) + if err != nil { + log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err)) + } + + var svrAddr string + for _, val := range ins.Addrs { + if strings.HasPrefix(val, "grpc://") { + svrAddr = strings.TrimPrefix(val, "grpc://") + break + } + } + if svrAddr == "" { + errInfo := fmt.Sprintf("register, error occur, grpc svrAddr is null") + log.Error(errInfo) + return errors.New(errInfo) + } + + err = z.registerEphServer(prefix, RootPath+svrAddr, ins) + if err != nil { + log.Error(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) + //return + } else { + log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) + } + + return nil +} + +// unregister 删除zookeeper中节点信息 +func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) { + log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) + prefix := z.keyPrefix(ins) + + var svrAddr string + for _, val := range ins.Addrs { + if strings.HasPrefix(val, "grpc://") { + svrAddr = strings.TrimPrefix(val, "grpc://") + break + } + } + if svrAddr == "" { + errInfo := fmt.Sprintf("unregister, error occur, grpc svrAddr is null") + log.Error(errInfo) + return errors.New(errInfo) + } + + strNode := prefix + RootPath + svrAddr + exists, _, err := z.cli.Exists(strNode) + if err != nil { + log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) + return err + } + if exists { + _, s, err := z.cli.Get(strNode) + if err != nil { + log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) + return err + } + return z.cli.Delete(strNode, s.Version) + } + + log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) + return +} + +func (z *ZookeeperBuilder) keyPrefix(ins *naming.Instance) string { + return fmt.Sprintf("/%s", ins.AppID) +} + +// Close stop all running process including zk fetch and register +func (z *ZookeeperBuilder) Close() error { + z.cancelFunc() + return nil +} + +func (a *appInfo) watch(appID string) { + _ = a.fetchstore(appID) + prefix := fmt.Sprintf("/%s", appID) + + go func() { + for { + log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) + snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) + if err != nil { + continue + } + + log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot)) + for ev := range event { + log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) + if ev.Type == zk.EventNodeChildrenChanged { + _ = a.fetchstore(appID) + } + } + } + }() +} + +func (a *appInfo) fetchstore(appID string) (err error) { + prefix := fmt.Sprintf("/%s", appID) + strNode := "" + childs, _, err := a.zkb.cli.Children(prefix) + if err != nil { + log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), err:(%v)", prefix, err)) + } else { + log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) + } + + ins := &naming.InstancesInfo{ + Instances: make(map[string][]*naming.Instance, 0), + } + + //for range childs + for _, child := range childs { + strNode = prefix + RootPath + child + resp, _, err := a.zkb.cli.Get(strNode) + if err != nil { + log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err) + return err + } + + in := new(naming.Instance) + err = json.Unmarshal(resp, in) + if err != nil { + return err + } + ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in) + + } + a.store(ins) + + return nil +} + +func (a *appInfo) store(ins *naming.InstancesInfo) { + + a.ins.Store(ins) + a.zkb.mutex.RLock() + for rs := range a.resolver { + select { + case rs.event <- struct{}{}: + default: + } + } + a.zkb.mutex.RUnlock() +} + +// Watch watch instance. +func (r *Resolve) Watch() <-chan struct{} { + return r.event +} + +// Fetch fetch resolver instance. +func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) { + r.zkb.mutex.RLock() + app, ok := r.zkb.apps[r.id] + r.zkb.mutex.RUnlock() + if ok { + ins, ok = app.ins.Load().(*naming.InstancesInfo) + return + } + return +} + +// Close close resolver. +func (r *Resolve) Close() error { + r.zkb.mutex.Lock() + if app, ok := r.zkb.apps[r.id]; ok && len(app.resolver) != 0 { + delete(app.resolver, r) + } + r.zkb.mutex.Unlock() + return nil +}