mirror of
https://github.com/go-micro/go-micro.git
synced 2025-08-04 21:42:57 +02:00
invalidate service if node was not updated (#2736)
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
committed by
GitHub
parent
b318b7f097
commit
14a1791011
24
registry/cache/cache.go
vendored
24
registry/cache/cache.go
vendored
@@ -42,6 +42,7 @@ type cache struct {
|
||||
sg singleflight.Group
|
||||
cache map[string][]*registry.Service
|
||||
ttls map[string]time.Time
|
||||
nttls map[string]map[string]time.Time // node ttls
|
||||
watched map[string]bool
|
||||
|
||||
// used to stop the cache
|
||||
@@ -94,6 +95,17 @@ func (c *cache) isValid(services []*registry.Service, ttl time.Time) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// a node did not get updated
|
||||
for _, s := range services {
|
||||
for _, n := range s.Nodes {
|
||||
nttl := c.nttls[s.Name][n.Id]
|
||||
if time.Since(nttl) > 0 {
|
||||
delete(c.nttls, s.Name)
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ok
|
||||
return true
|
||||
}
|
||||
@@ -115,6 +127,7 @@ func (c *cache) del(service string) {
|
||||
// otherwise delete entries
|
||||
delete(c.cache, service)
|
||||
delete(c.ttls, service)
|
||||
delete(c.nttls, service)
|
||||
}
|
||||
|
||||
func (c *cache) get(service string) ([]*registry.Service, error) {
|
||||
@@ -128,7 +141,7 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
|
||||
// make a copy
|
||||
cp := util.Copy(services)
|
||||
|
||||
// got services && within ttl so return cache
|
||||
// got services, nodes && within ttl so return cache
|
||||
if c.isValid(cp, ttl) {
|
||||
c.RUnlock()
|
||||
// return services
|
||||
@@ -197,6 +210,14 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
|
||||
func (c *cache) set(service string, services []*registry.Service) {
|
||||
c.cache[service] = services
|
||||
c.ttls[service] = time.Now().Add(c.opts.TTL)
|
||||
for _, s := range services {
|
||||
for _, n := range s.Nodes {
|
||||
if c.nttls[s.Name] == nil {
|
||||
c.nttls[s.Name] = make(map[string]time.Time)
|
||||
}
|
||||
c.nttls[s.Name][n.Id] = time.Now().Add(c.opts.TTL)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cache) update(res *registry.Result) {
|
||||
@@ -483,6 +504,7 @@ func New(r registry.Registry, opts ...Option) Cache {
|
||||
watchedRunning: make(map[string]bool),
|
||||
cache: make(map[string][]*registry.Service),
|
||||
ttls: make(map[string]time.Time),
|
||||
nttls: make(map[string]map[string]time.Time),
|
||||
exit: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user