From b13361d0104034d7bbc2c5bc38dd6a076f12bffd Mon Sep 17 00:00:00 2001 From: Asim Date: Tue, 3 May 2016 19:26:50 +0100 Subject: [PATCH] Add cache ttl --- selector/cache/cache.go | 141 ++++++++++++++++++++++++++++---------- selector/cache/options.go | 20 ++++++ 2 files changed, 125 insertions(+), 36 deletions(-) create mode 100644 selector/cache/options.go diff --git a/selector/cache/cache.go b/selector/cache/cache.go index b7d30e9a..d9e5e2b4 100644 --- a/selector/cache/cache.go +++ b/selector/cache/cache.go @@ -10,18 +10,29 @@ import ( "github.com/micro/go-micro/selector" ) +/* + Cache selector is a selector which uses the registry.Watcher to Cache service entries. + It defaults to a TTL for 1 minute and causes a cache miss on the next request. +*/ + type cacheSelector struct { - so selector.Options + so selector.Options + ttl time.Duration // registry cache sync.Mutex cache map[string][]*registry.Service + ttls map[string]time.Time // used to close or reload watcher reload chan bool exit chan bool } +var ( + DefaultTTL = time.Minute +) + func init() { rand.Seed(time.Now().UnixNano()) } @@ -71,31 +82,46 @@ func (c *cacheSelector) cp(current []*registry.Service) []*registry.Service { return services } +func (c *cacheSelector) del(service string) { + delete(c.cache, service) + delete(c.ttls, service) +} + func (c *cacheSelector) get(service string) ([]*registry.Service, error) { c.Lock() defer c.Unlock() // check the cache first services, ok := c.cache[service] + ttl, kk := c.ttls[service] // got results, copy and return if ok && len(services) > 0 { - return c.cp(services), nil + // only return if its less than the ttl + if kk && time.Since(ttl) < c.ttl { + return c.cp(services), nil + } } + // cache miss or ttl expired + // now ask the registry services, err := c.so.Registry.GetService(service) if err != nil { return nil, err - } // we didn't have any results so cache c.cache[service] = c.cp(services) - + c.ttls[service] = time.Now().Add(c.ttl) return services, nil } +func (c *cacheSelector) set(service string, services []*registry.Service) { + c.cache[service] = services + c.ttls[service] = time.Now().Add(c.ttl) +} + func (c *cacheSelector) update(res *registry.Result) { if res == nil || res.Service == nil { return @@ -114,7 +140,7 @@ func (c *cacheSelector) update(res *registry.Result) { if len(res.Service.Nodes) == 0 { switch res.Action { case "delete": - delete(c.cache, res.Service.Name) + c.del(res.Service.Name) } return } @@ -132,8 +158,7 @@ func (c *cacheSelector) update(res *registry.Result) { switch res.Action { case "create", "update": if service == nil { - services = append(services, res.Service) - c.cache[res.Service.Name] = services + c.set(res.Service.Name, append(services, res.Service)) return } @@ -152,7 +177,7 @@ func (c *cacheSelector) update(res *registry.Result) { } services[index] = res.Service - c.cache[res.Service.Name] = services + c.set(res.Service.Name, services) case "delete": if service == nil { return @@ -174,24 +199,34 @@ func (c *cacheSelector) update(res *registry.Result) { } } - if len(nodes) == 0 { - if len(services) == 1 { - delete(c.cache, service.Name) - } else { - var srvs []*registry.Service - for _, s := range services { - if s.Version != service.Version { - srvs = append(srvs, s) - } - } - c.cache[service.Name] = srvs - } + // still got nodes, save and return + if len(nodes) > 0 { + service.Nodes = nodes + services[index] = service + c.set(service.Name, services) return } - service.Nodes = nodes - services[index] = service - c.cache[res.Service.Name] = services + // zero nodes left + + // only have one thing to delete + // nuke the thing + if len(services) == 1 { + c.del(service.Name) + return + } + + // still have more than 1 service + // check the version and keep what we know + var srvs []*registry.Service + for _, s := range services { + if s.Version != service.Version { + srvs = append(srvs, s) + } + } + + // save + c.set(service.Name, srvs) } } @@ -200,6 +235,8 @@ func (c *cacheSelector) update(res *registry.Result) { // reloads the watcher if Init is called // and returns when Close is called func (c *cacheSelector) run() { + go c.tick() + for { // exit early if already dead if c.quit() { @@ -214,18 +251,6 @@ func (c *cacheSelector) run() { continue } - // manage this loop - go func() { - // wait for exit or reload signal - select { - case <-c.exit: - case <-c.reload: - } - - // stop the watcher - w.Stop() - }() - // watch for events if err := c.watch(w); err != nil { log.Println(err) @@ -234,9 +259,44 @@ func (c *cacheSelector) run() { } } +// check cache and expire on each tick +func (c *cacheSelector) tick() { + t := time.NewTicker(time.Minute) + + for { + select { + case <-t.C: + c.Lock() + for service, expiry := range c.ttls { + if d := time.Since(expiry); d > c.ttl { + // TODO: maybe refresh the cache rather than blowing it away + c.del(service) + } + } + c.Unlock() + case <-c.exit: + return + } + } +} + // watch loops the next event and calls update // it returns if there's an error func (c *cacheSelector) watch(w registry.Watcher) error { + defer w.Stop() + + // manage this loop + go func() { + // wait for exit or reload signal + select { + case <-c.exit: + case <-c.reload: + } + + // stop the watcher + w.Stop() + }() + for { res, err := w.Next() if err != nil { @@ -357,14 +417,23 @@ func NewSelector(opts ...selector.Option) selector.Selector { sopts.Registry = registry.DefaultRegistry } + ttl := DefaultTTL + + if sopts.Context != nil { + if t, ok := sopts.Context.Value(ttlKey{}).(time.Duration); ok { + ttl = t + } + } + c := &cacheSelector{ so: sopts, + ttl: ttl, cache: make(map[string][]*registry.Service), + ttls: make(map[string]time.Time), reload: make(chan bool, 1), exit: make(chan bool), } go c.run() - return c } diff --git a/selector/cache/options.go b/selector/cache/options.go new file mode 100644 index 00000000..d12094b7 --- /dev/null +++ b/selector/cache/options.go @@ -0,0 +1,20 @@ +package cache + +import ( + "time" + + "github.com/micro/go-micro/selector" + "golang.org/x/net/context" +) + +type ttlKey struct{} + +// Set the cache ttl +func TTL(t time.Duration) selector.Option { + return func(o *selector.Options) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, ttlKey{}, t) + } +}