1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-17 17:44:30 +02:00

Fix zk watchDir (#2102)

Signed-off-by: Goober <chenhao86899@gmail.com>
This commit is contained in:
Goober 2021-01-06 01:32:17 +08:00 committed by GitHub
parent 20b5755788
commit bf4ab679e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 59 deletions

View File

@ -68,71 +68,29 @@ func (zw *zookeeperWatcher) writeResult(r *result) {
} }
func (zw *zookeeperWatcher) watchDir(key string) { func (zw *zookeeperWatcher) watchDir(key string) {
// mark children changed
childrenChanged := true
for { for {
// get current children for a key // get current children for a key
children, _, childEventCh, err := zw.client.ChildrenW(key) newChildren, _, childEventCh, err := zw.client.ChildrenW(key)
if err != nil { if err != nil {
zw.writeRespChan(&watchResponse{zk.Event{}, nil, err}) zw.writeRespChan(&watchResponse{zk.Event{}, nil, err})
return return
} }
if childrenChanged {
zw.overrideChildrenInfo(newChildren, key)
}
select { select {
case e := <-childEventCh: case e := <-childEventCh:
if e.Type != zk.EventNodeChildrenChanged { if e.Type != zk.EventNodeChildrenChanged {
childrenChanged = false
continue continue
} }
newChildren, _, err := zw.client.Children(e.Path) childrenChanged = true
if err != nil {
zw.writeRespChan(&watchResponse{e, nil, err})
return
}
// a node was added -- watch the new node
for _, i := range newChildren {
if contains(children, i) {
continue
}
newNode := path.Join(e.Path, i)
if key == prefix {
// a new service was created under prefix
go zw.watchDir(newNode)
nodes, _, _ := zw.client.Children(newNode)
for _, node := range nodes {
n := path.Join(newNode, node)
go zw.watchKey(n)
s, _, err := zw.client.Get(n)
if err != nil {
continue
}
e.Type = zk.EventNodeCreated
srv, err := decode(s)
if err != nil {
continue
}
zw.writeRespChan(&watchResponse{e, srv, err})
}
} else {
go zw.watchKey(newNode)
s, _, err := zw.client.Get(newNode)
if err != nil {
continue
}
e.Type = zk.EventNodeCreated
srv, err := decode(s)
if err != nil {
continue
}
zw.writeRespChan(&watchResponse{e, srv, err})
}
}
case <-zw.stop: case <-zw.stop:
// There is no way to stop GetW/ChildrenW so just quit // There is no way to stop GetW/ChildrenW so just quit
return return
@ -194,14 +152,10 @@ func (zw *zookeeperWatcher) watch() {
//watch every service //watch every service
for _, service := range services() { for _, service := range services() {
sPath := childPath(prefix, service) sPath := childPath(prefix, service)
if sPath == prefix {
continue
}
go zw.watchDir(sPath) go zw.watchDir(sPath)
children, _, err := zw.client.Children(sPath)
if err != nil {
zw.writeResult(&result{nil, err})
}
for _, c := range children {
go zw.watchKey(path.Join(sPath, c))
}
} }
var service *registry.Service var service *registry.Service
@ -225,6 +179,9 @@ func (zw *zookeeperWatcher) watch() {
case zk.EventNodeCreated: case zk.EventNodeCreated:
action = "create" action = "create"
service = rsp.service service = rsp.service
case zk.EventNodeChildrenChanged:
action = "override"
service = rsp.service
} }
} }
zw.writeResult(&result{&registry.Result{Action: action, Service: service}, nil}) zw.writeResult(&result{&registry.Result{Action: action, Service: service}, nil})
@ -248,3 +205,35 @@ func (zw *zookeeperWatcher) Next() (*registry.Result, error) {
return r.res, r.err return r.res, r.err
} }
} }
func (zw *zookeeperWatcher) overrideChildrenInfo(newChildren []string, parentPath string) {
// override resp
var overrideResp *watchResponse
for _, i := range newChildren {
newNode := path.Join(parentPath, i)
s, _, err := zw.client.Get(newNode)
if err != nil {
continue
}
srv, err := decode(s)
if err != nil {
continue
}
// if nil, then init, do it once
if overrideResp == nil {
overrideResp = &watchResponse{zk.Event{
Type: zk.EventNodeChildrenChanged,
}, srv, nil}
zw.writeRespChan(overrideResp)
}
// when node was added or updated
zw.writeRespChan(&watchResponse{zk.Event{
Type: zk.EventNodeCreated,
}, srv, nil})
}
}

View File

@ -305,6 +305,12 @@ func (c *cache) update(res *registry.Result) {
// save // save
c.set(service.Name, srvs) c.set(service.Name, srvs)
case "override":
if service == nil {
return
}
c.del(service.Name)
} }
} }