From 5cae3307321c568b16227353aca554be49f4db87 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Sat, 29 Dec 2018 15:44:51 +0000 Subject: [PATCH] Update selector race, rename cache selector --- cmd/cmd.go | 4 +- registry/gossip/util.go | 47 ++- selector/cache/cache.go | 424 ------------------------ selector/cache/cache_test.go | 29 -- selector/default.go | 378 ++++++++++++++++++++- selector/default_test.go | 10 +- selector/{cache => registry}/options.go | 8 +- selector/registry/registry.go | 11 + selector/selector.go | 6 +- 9 files changed, 416 insertions(+), 501 deletions(-) delete mode 100644 selector/cache/cache.go delete mode 100644 selector/cache/cache_test.go rename selector/{cache => registry}/options.go (66%) create mode 100644 selector/registry/registry.go diff --git a/cmd/cmd.go b/cmd/cmd.go index 8c1aae19..7d41b452 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -26,7 +26,6 @@ import ( // selectors "github.com/micro/go-micro/selector" - "github.com/micro/go-micro/selector/cache" // transports "github.com/micro/go-micro/transport" @@ -149,7 +148,6 @@ var ( Name: "selector", EnvVar: "MICRO_SELECTOR", Usage: "Selector used to pick nodes for querying", - Value: "cache", }, cli.StringFlag{ Name: "transport", @@ -179,7 +177,7 @@ var ( DefaultSelectors = map[string]func(...selector.Option) selector.Selector{ "default": selector.NewSelector, - "cache": cache.NewSelector, + "cache": selector.NewSelector, } DefaultServers = map[string]func(...server.Option) server.Server{ diff --git a/registry/gossip/util.go b/registry/gossip/util.go index 67635957..5d0160a7 100644 --- a/registry/gossip/util.go +++ b/registry/gossip/util.go @@ -38,20 +38,35 @@ func cp(current []*registry.Service) []*registry.Service { } func addNodes(old, neu []*registry.Node) []*registry.Node { + var nodes []*registry.Node + + // add all new nodes for _, n := range neu { - var seen bool - for i, o := range old { + node := *n + nodes = append(nodes, &node) + } + + // look at old nodes + for _, o := range old { + var exists bool + + // check against new nodes + for _, n := range nodes { + // ids match then skip if o.Id == n.Id { - seen = true - old[i] = n + exists = true break } } - if !seen { - old = append(old, n) + + // keep old node + if !exists { + node := *o + nodes = append(nodes, &node) } } - return old + + return nodes } func addServices(old, neu []*registry.Service) []*registry.Service { @@ -91,19 +106,27 @@ func delNodes(old, del []*registry.Node) []*registry.Node { func delServices(old, del []*registry.Service) []*registry.Service { var services []*registry.Service - for i, o := range old { + + for _, o := range old { + srv := new(registry.Service) + *srv = *o + var rem bool + for _, s := range del { - if o.Version == s.Version { - old[i].Nodes = delNodes(o.Nodes, s.Nodes) - if len(old[i].Nodes) == 0 { + if srv.Version == s.Version { + srv.Nodes = delNodes(srv.Nodes, s.Nodes) + + if len(srv.Nodes) == 0 { rem = true } } } + if !rem { - services = append(services, o) + services = append(services, srv) } } + return services } diff --git a/selector/cache/cache.go b/selector/cache/cache.go deleted file mode 100644 index 4947d95c..00000000 --- a/selector/cache/cache.go +++ /dev/null @@ -1,424 +0,0 @@ -// Package cache is a caching selector. It uses the registry watcher. -package cache - -import ( - "sync" - "time" - - "github.com/micro/go-log" - "github.com/micro/go-micro/registry" - "github.com/micro/go-micro/selector" -) - -type cacheSelector struct { - so selector.Options - ttl time.Duration - - // registry cache - sync.RWMutex - cache map[string][]*registry.Service - ttls map[string]time.Time - - watched map[string]bool - - // used to close or reload watcher - reload chan bool - exit chan bool -} - -var ( - DefaultTTL = time.Minute -) - -func (c *cacheSelector) quit() bool { - select { - case <-c.exit: - return true - default: - return false - } -} - -// cp copies a service. Because we're caching handing back pointers would -// create a race condition, so we do this instead -// its fast enough -func (c *cacheSelector) cp(current []*registry.Service) []*registry.Service { - var services []*registry.Service - - for _, service := range current { - // copy service - s := new(registry.Service) - *s = *service - - // copy nodes - var nodes []*registry.Node - for _, node := range service.Nodes { - n := new(registry.Node) - *n = *node - nodes = append(nodes, n) - } - s.Nodes = nodes - - // copy endpoints - var eps []*registry.Endpoint - for _, ep := range service.Endpoints { - e := new(registry.Endpoint) - *e = *ep - eps = append(eps, e) - } - s.Endpoints = eps - - // append service - services = append(services, s) - } - - return services -} - -func (c *cacheSelector) del(service string) { - delete(c.cache, service) - delete(c.ttls, service) -} - -func (c *cacheSelector) get(service string) ([]*registry.Service, error) { - // read lock - c.RLock() - - // check the cache first - services, ok := c.cache[service] - // get cache ttl - ttl, kk := c.ttls[service] - - // got services && within ttl so return cache - if ok && kk && time.Since(ttl) < c.ttl { - // make a copy - cp := c.cp(services) - // unlock the read - c.RUnlock() - // return servics - return cp, nil - } - - // get does the actual request for a service and cache it - get := func(service string) ([]*registry.Service, error) { - // ask the registry - services, err := c.so.Registry.GetService(service) - if err != nil { - return nil, err - } - - // cache results - c.Lock() - c.set(service, c.cp(services)) - c.Unlock() - - return services, nil - } - - // watch service if not watched - if _, ok := c.watched[service]; !ok { - go c.run(service) - } - - // unlock the read lock - c.RUnlock() - - // get and return services - return get(service) -} - -func (c *cacheSelector) set(service string, services []*registry.Service) { - c.cache[service] = services - c.ttls[service] = time.Now().Add(c.ttl) -} - -func (c *cacheSelector) update(res *registry.Result) { - if res == nil || res.Service == nil { - return - } - - c.Lock() - defer c.Unlock() - - services, ok := c.cache[res.Service.Name] - if !ok { - // we're not going to cache anything - // unless there was already a lookup - return - } - - if len(res.Service.Nodes) == 0 { - switch res.Action { - case "delete": - c.del(res.Service.Name) - } - return - } - - // existing service found - var service *registry.Service - var index int - for i, s := range services { - if s.Version == res.Service.Version { - service = s - index = i - } - } - - switch res.Action { - case "create", "update": - if service == nil { - c.set(res.Service.Name, append(services, res.Service)) - return - } - - // append old nodes to new service - for _, cur := range service.Nodes { - var seen bool - for _, node := range res.Service.Nodes { - if cur.Id == node.Id { - seen = true - break - } - } - if !seen { - res.Service.Nodes = append(res.Service.Nodes, cur) - } - } - - services[index] = res.Service - c.set(res.Service.Name, services) - case "delete": - if service == nil { - return - } - - var nodes []*registry.Node - - // filter cur nodes to remove the dead one - for _, cur := range service.Nodes { - var seen bool - for _, del := range res.Service.Nodes { - if del.Id == cur.Id { - seen = true - break - } - } - if !seen { - nodes = append(nodes, cur) - } - } - - // still got nodes, save and return - if len(nodes) > 0 { - service.Nodes = nodes - services[index] = service - c.set(service.Name, services) - return - } - - // zero nodes left - - // only have one thing to delete - // nuke the thing - if len(services) == 1 { - c.del(service.Name) - return - } - - // still have more than 1 service - // check the version and keep what we know - var srvs []*registry.Service - for _, s := range services { - if s.Version != service.Version { - srvs = append(srvs, s) - } - } - - // save - c.set(service.Name, srvs) - } -} - -// run starts the cache watcher loop -// it creates a new watcher if there's a problem -// reloads the watcher if Init is called -// and returns when Close is called -func (c *cacheSelector) run(name string) { - // set watcher - c.Lock() - c.watched[name] = true - c.Unlock() - - // delete watcher on exit - defer func() { - c.Lock() - delete(c.watched, name) - c.Unlock() - }() - - for { - // exit early if already dead - if c.quit() { - return - } - - // create new watcher - w, err := c.so.Registry.Watch( - registry.WatchService(name), - ) - if err != nil { - if c.quit() { - return - } - log.Log(err) - time.Sleep(time.Second) - continue - } - - // watch for events - if err := c.watch(w); err != nil { - if c.quit() { - return - } - log.Log(err) - continue - } - } -} - -// watch loops the next event and calls update -// it returns if there's an error -func (c *cacheSelector) watch(w registry.Watcher) error { - defer w.Stop() - - // manage this loop - go func() { - // wait for exit or reload signal - select { - case <-c.exit: - case <-c.reload: - } - - // stop the watcher - w.Stop() - }() - - for { - res, err := w.Next() - if err != nil { - return err - } - c.update(res) - } -} - -func (c *cacheSelector) Init(opts ...selector.Option) error { - for _, o := range opts { - o(&c.so) - } - - // reload the watcher - go func() { - select { - case <-c.exit: - return - default: - c.reload <- true - } - }() - - return nil -} - -func (c *cacheSelector) Options() selector.Options { - return c.so -} - -func (c *cacheSelector) Select(service string, opts ...selector.SelectOption) (selector.Next, error) { - sopts := selector.SelectOptions{ - Strategy: c.so.Strategy, - } - - for _, opt := range opts { - opt(&sopts) - } - - // get the service - // try the cache first - // if that fails go directly to the registry - services, err := c.get(service) - if err != nil { - return nil, err - } - - // apply the filters - for _, filter := range sopts.Filters { - services = filter(services) - } - - // if there's nothing left, return - if len(services) == 0 { - return nil, selector.ErrNoneAvailable - } - - return sopts.Strategy(services), nil -} - -func (c *cacheSelector) Mark(service string, node *registry.Node, err error) { -} - -func (c *cacheSelector) Reset(service string) { -} - -// Close stops the watcher and destroys the cache -func (c *cacheSelector) Close() error { - c.Lock() - c.cache = make(map[string][]*registry.Service) - c.watched = make(map[string]bool) - c.Unlock() - - select { - case <-c.exit: - return nil - default: - close(c.exit) - } - return nil -} - -func (c *cacheSelector) String() string { - return "cache" -} - -func NewSelector(opts ...selector.Option) selector.Selector { - sopts := selector.Options{ - Strategy: selector.Random, - } - - for _, opt := range opts { - opt(&sopts) - } - - if sopts.Registry == nil { - sopts.Registry = registry.DefaultRegistry - } - - ttl := DefaultTTL - - if sopts.Context != nil { - if t, ok := sopts.Context.Value(ttlKey{}).(time.Duration); ok { - ttl = t - } - } - - return &cacheSelector{ - so: sopts, - ttl: ttl, - watched: make(map[string]bool), - cache: make(map[string][]*registry.Service), - ttls: make(map[string]time.Time), - reload: make(chan bool, 1), - exit: make(chan bool), - } -} diff --git a/selector/cache/cache_test.go b/selector/cache/cache_test.go deleted file mode 100644 index 931e3801..00000000 --- a/selector/cache/cache_test.go +++ /dev/null @@ -1,29 +0,0 @@ -package cache - -import ( - "testing" - - "github.com/micro/go-micro/registry/mock" - "github.com/micro/go-micro/selector" -) - -func TestCacheSelector(t *testing.T) { - counts := map[string]int{} - - cache := NewSelector(selector.Registry(mock.NewRegistry())) - - next, err := cache.Select("foo") - if err != nil { - t.Errorf("Unexpected error calling cache select: %v", err) - } - - for i := 0; i < 100; i++ { - node, err := next() - if err != nil { - t.Errorf("Expected node err, got err: %v", err) - } - counts[node.Id]++ - } - - t.Logf("Cache Counts %v", counts) -} diff --git a/selector/default.go b/selector/default.go index 72abfc1b..e0bd3672 100644 --- a/selector/default.go +++ b/selector/default.go @@ -1,27 +1,341 @@ package selector import ( + "sync" + "time" + + "github.com/micro/go-log" "github.com/micro/go-micro/registry" ) -type defaultSelector struct { - so Options +type registrySelector struct { + so Options + ttl time.Duration + + // registry cache + sync.RWMutex + cache map[string][]*registry.Service + ttls map[string]time.Time + + watched map[string]bool + + // used to close or reload watcher + reload chan bool + exit chan bool } -func (r *defaultSelector) Init(opts ...Option) error { - for _, o := range opts { - o(&r.so) +var ( + DefaultTTL = time.Minute +) + +func (c *registrySelector) quit() bool { + select { + case <-c.exit: + return true + default: + return false } +} + +// cp copies a service. Because we're caching handing back pointers would +// create a race condition, so we do this instead +// its fast enough +func (c *registrySelector) cp(current []*registry.Service) []*registry.Service { + var services []*registry.Service + + for _, service := range current { + // copy service + s := new(registry.Service) + *s = *service + + // copy nodes + var nodes []*registry.Node + for _, node := range service.Nodes { + n := new(registry.Node) + *n = *node + nodes = append(nodes, n) + } + s.Nodes = nodes + + // copy endpoints + var eps []*registry.Endpoint + for _, ep := range service.Endpoints { + e := new(registry.Endpoint) + *e = *ep + eps = append(eps, e) + } + s.Endpoints = eps + + // append service + services = append(services, s) + } + + return services +} + +func (c *registrySelector) del(service string) { + delete(c.cache, service) + delete(c.ttls, service) +} + +func (c *registrySelector) get(service string) ([]*registry.Service, error) { + // read lock + c.RLock() + + // check the cache first + services, ok := c.cache[service] + // get cache ttl + ttl, kk := c.ttls[service] + + // got services && within ttl so return cache + if ok && kk && time.Since(ttl) < c.ttl { + // make a copy + cp := c.cp(services) + // unlock the read + c.RUnlock() + // return servics + return cp, nil + } + + // get does the actual request for a service and cache it + get := func(service string) ([]*registry.Service, error) { + // ask the registry + services, err := c.so.Registry.GetService(service) + if err != nil { + return nil, err + } + + // cache results + c.Lock() + c.set(service, c.cp(services)) + c.Unlock() + + return services, nil + } + + // watch service if not watched + if _, ok := c.watched[service]; !ok { + go c.run(service) + } + + // unlock the read lock + c.RUnlock() + + // get and return services + return get(service) +} + +func (c *registrySelector) set(service string, services []*registry.Service) { + c.cache[service] = services + c.ttls[service] = time.Now().Add(c.ttl) +} + +func (c *registrySelector) update(res *registry.Result) { + if res == nil || res.Service == nil { + return + } + + c.Lock() + defer c.Unlock() + + services, ok := c.cache[res.Service.Name] + if !ok { + // we're not going to cache anything + // unless there was already a lookup + return + } + + if len(res.Service.Nodes) == 0 { + switch res.Action { + case "delete": + c.del(res.Service.Name) + } + return + } + + // existing service found + var service *registry.Service + var index int + for i, s := range services { + if s.Version == res.Service.Version { + service = s + index = i + } + } + + switch res.Action { + case "create", "update": + if service == nil { + c.set(res.Service.Name, append(services, res.Service)) + return + } + + // append old nodes to new service + for _, cur := range service.Nodes { + var seen bool + for _, node := range res.Service.Nodes { + if cur.Id == node.Id { + seen = true + break + } + } + if !seen { + res.Service.Nodes = append(res.Service.Nodes, cur) + } + } + + services[index] = res.Service + c.set(res.Service.Name, services) + case "delete": + if service == nil { + return + } + + var nodes []*registry.Node + + // filter cur nodes to remove the dead one + for _, cur := range service.Nodes { + var seen bool + for _, del := range res.Service.Nodes { + if del.Id == cur.Id { + seen = true + break + } + } + if !seen { + nodes = append(nodes, cur) + } + } + + // still got nodes, save and return + if len(nodes) > 0 { + service.Nodes = nodes + services[index] = service + c.set(service.Name, services) + return + } + + // zero nodes left + + // only have one thing to delete + // nuke the thing + if len(services) == 1 { + c.del(service.Name) + return + } + + // still have more than 1 service + // check the version and keep what we know + var srvs []*registry.Service + for _, s := range services { + if s.Version != service.Version { + srvs = append(srvs, s) + } + } + + // save + c.set(service.Name, srvs) + } +} + +// run starts the cache watcher loop +// it creates a new watcher if there's a problem +// reloads the watcher if Init is called +// and returns when Close is called +func (c *registrySelector) run(name string) { + // set watcher + c.Lock() + c.watched[name] = true + c.Unlock() + + // delete watcher on exit + defer func() { + c.Lock() + delete(c.watched, name) + c.Unlock() + }() + + for { + // exit early if already dead + if c.quit() { + return + } + + // create new watcher + w, err := c.so.Registry.Watch( + registry.WatchService(name), + ) + if err != nil { + if c.quit() { + return + } + log.Log(err) + time.Sleep(time.Second) + continue + } + + // watch for events + if err := c.watch(w); err != nil { + if c.quit() { + return + } + log.Log(err) + continue + } + } +} + +// watch loops the next event and calls update +// it returns if there's an error +func (c *registrySelector) watch(w registry.Watcher) error { + defer w.Stop() + + // manage this loop + go func() { + // wait for exit or reload signal + select { + case <-c.exit: + case <-c.reload: + } + + // stop the watcher + w.Stop() + }() + + for { + res, err := w.Next() + if err != nil { + return err + } + c.update(res) + } +} + +func (c *registrySelector) Init(opts ...Option) error { + for _, o := range opts { + o(&c.so) + } + + // reload the watcher + go func() { + select { + case <-c.exit: + return + default: + c.reload <- true + } + }() + return nil } -func (r *defaultSelector) Options() Options { - return r.so +func (c *registrySelector) Options() Options { + return c.so } -func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, error) { +func (c *registrySelector) Select(service string, opts ...SelectOption) (Next, error) { sopts := SelectOptions{ - Strategy: r.so.Strategy, + Strategy: c.so.Strategy, } for _, opt := range opts { @@ -29,7 +343,9 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er } // get the service - services, err := r.so.Registry.GetService(service) + // try the cache first + // if that fails go directly to the registry + services, err := c.get(service) if err != nil { return nil, err } @@ -47,21 +363,33 @@ func (r *defaultSelector) Select(service string, opts ...SelectOption) (Next, er return sopts.Strategy(services), nil } -func (r *defaultSelector) Mark(service string, node *registry.Node, err error) { +func (c *registrySelector) Mark(service string, node *registry.Node, err error) { } -func (r *defaultSelector) Reset(service string) { +func (c *registrySelector) Reset(service string) { } -func (r *defaultSelector) Close() error { +// Close stops the watcher and destroys the cache +func (c *registrySelector) Close() error { + c.Lock() + c.cache = make(map[string][]*registry.Service) + c.watched = make(map[string]bool) + c.Unlock() + + select { + case <-c.exit: + return nil + default: + close(c.exit) + } return nil } -func (r *defaultSelector) String() string { - return "default" +func (c *registrySelector) String() string { + return "registry" } -func newDefaultSelector(opts ...Option) Selector { +func NewSelector(opts ...Option) Selector { sopts := Options{ Strategy: Random, } @@ -74,7 +402,21 @@ func newDefaultSelector(opts ...Option) Selector { sopts.Registry = registry.DefaultRegistry } - return &defaultSelector{ - so: sopts, + ttl := DefaultTTL + + if sopts.Context != nil { + if t, ok := sopts.Context.Value("selector_ttl").(time.Duration); ok { + ttl = t + } + } + + return ®istrySelector{ + so: sopts, + ttl: ttl, + watched: make(map[string]bool), + cache: make(map[string][]*registry.Service), + ttls: make(map[string]time.Time), + reload: make(chan bool, 1), + exit: make(chan bool), } } diff --git a/selector/default_test.go b/selector/default_test.go index e0a8ec22..68d5c850 100644 --- a/selector/default_test.go +++ b/selector/default_test.go @@ -6,14 +6,14 @@ import ( "github.com/micro/go-micro/registry/mock" ) -func TestDefaultSelector(t *testing.T) { +func TestRegistrySelector(t *testing.T) { counts := map[string]int{} - rs := newDefaultSelector(Registry(mock.NewRegistry())) + cache := NewSelector(Registry(mock.NewRegistry())) - next, err := rs.Select("foo") + next, err := cache.Select("foo") if err != nil { - t.Errorf("Unexpected error calling default select: %v", err) + t.Errorf("Unexpected error calling cache select: %v", err) } for i := 0; i < 100; i++ { @@ -24,5 +24,5 @@ func TestDefaultSelector(t *testing.T) { counts[node.Id]++ } - t.Logf("Default Counts %v", counts) + t.Logf("Selector Counts %v", counts) } diff --git a/selector/cache/options.go b/selector/registry/options.go similarity index 66% rename from selector/cache/options.go rename to selector/registry/options.go index e32e1b59..90aaf802 100644 --- a/selector/cache/options.go +++ b/selector/registry/options.go @@ -1,4 +1,4 @@ -package cache +package registry import ( "context" @@ -7,14 +7,12 @@ import ( "github.com/micro/go-micro/selector" ) -type ttlKey struct{} - -// Set the cache ttl +// Set the registry cache ttl func TTL(t time.Duration) selector.Option { return func(o *selector.Options) { if o.Context == nil { o.Context = context.Background() } - o.Context = context.WithValue(o.Context, ttlKey{}, t) + o.Context = context.WithValue(o.Context, "selector_ttl", t) } } diff --git a/selector/registry/registry.go b/selector/registry/registry.go new file mode 100644 index 00000000..20f219e8 --- /dev/null +++ b/selector/registry/registry.go @@ -0,0 +1,11 @@ +// Package registry is uses the go-micro registry for selection +package registry + +import ( + "github.com/micro/go-micro/selector" +) + +// NewSelector returns a new registry selector +func NewSelector(opts ...selector.Option) selector.Selector { + return selector.NewSelector(opts...) +} diff --git a/selector/selector.go b/selector/selector.go index dab26de2..ae80915a 100644 --- a/selector/selector.go +++ b/selector/selector.go @@ -35,12 +35,8 @@ type Filter func([]*registry.Service) []*registry.Service type Strategy func([]*registry.Service) Next var ( - DefaultSelector = newDefaultSelector() + DefaultSelector = NewSelector() ErrNotFound = errors.New("not found") ErrNoneAvailable = errors.New("none available") ) - -func NewSelector(opts ...Option) Selector { - return newDefaultSelector(opts...) -}