From 254045e9f389e30d85d85ebbfa7739407b2758e8 Mon Sep 17 00:00:00 2001 From: Asim Aslam Date: Wed, 6 Nov 2019 15:49:40 +0000 Subject: [PATCH] Remove go routines for mdns watcher and cache registry (#919) --- registry/cache/cache.go | 44 +++++++++++++----- registry/mdns_registry.go | 95 ++++++++++++++++++++++++++++++++++++--- registry/mdns_watcher.go | 7 +++ 3 files changed, 128 insertions(+), 18 deletions(-) diff --git a/registry/cache/cache.go b/registry/cache/cache.go index f8bee4a9..75fcac32 100644 --- a/registry/cache/cache.go +++ b/registry/cache/cache.go @@ -39,6 +39,8 @@ type cache struct { // used to stop the cache exit chan bool + // indicate whether its running + running bool // status of the registry // used to hold onto the cache // in failure state @@ -157,13 +159,26 @@ func (c *cache) get(service string) ([]*registry.Service, error) { } // watch service if not watched - if _, ok := c.watched[service]; !ok { - go c.run(service) - } + _, ok := c.watched[service] // unlock the read lock c.RUnlock() + // check if its being watched + if !ok { + c.Lock() + + // set to watched + c.watched[service] = true + + // only kick it off if not running + if !c.running { + go c.run() + } + + c.Unlock() + } + // get and return services return get(service, cp) } @@ -181,6 +196,11 @@ func (c *cache) update(res *registry.Result) { c.Lock() defer c.Unlock() + // only save watched services + if _, ok := c.watched[res.Service.Name]; !ok { + return + } + services, ok := c.cache[res.Service.Name] if !ok { // we're not going to cache anything @@ -283,16 +303,16 @@ func (c *cache) update(res *registry.Result) { // run starts the cache watcher loop // it creates a new watcher if there's a problem -func (c *cache) run(service string) { - // set watcher +func (c *cache) run() { c.Lock() - c.watched[service] = true + c.running = true c.Unlock() - // delete watcher on exit + // reset watcher on exit defer func() { c.Lock() - delete(c.watched, service) + c.watched = make(map[string]bool) + c.running = false c.Unlock() }() @@ -309,10 +329,7 @@ func (c *cache) run(service string) { time.Sleep(time.Duration(j) * time.Millisecond) // create new watcher - w, err := c.Registry.Watch( - registry.WatchService(service), - ) - + w, err := c.Registry.Watch() if err != nil { if c.quit() { return @@ -414,6 +431,9 @@ func (c *cache) GetService(service string) ([]*registry.Service, error) { } func (c *cache) Stop() { + c.Lock() + defer c.Unlock() + select { case <-c.exit: return diff --git a/registry/mdns_registry.go b/registry/mdns_registry.go index 646032ad..5f0dc073 100644 --- a/registry/mdns_registry.go +++ b/registry/mdns_registry.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/micro/mdns" ) @@ -37,6 +38,14 @@ type mdnsRegistry struct { sync.Mutex services map[string][]*mdnsEntry + + mtx sync.RWMutex + + // watchers + watchers map[string]*mdnsWatcher + + // listener + listener chan *mdns.ServiceEntry } func newRegistry(opts ...Option) Registry { @@ -61,6 +70,7 @@ func newRegistry(opts ...Option) Registry { opts: options, domain: domain, services: make(map[string][]*mdnsEntry), + watchers: make(map[string]*mdnsWatcher), } } @@ -346,15 +356,88 @@ func (m *mdnsRegistry) Watch(opts ...WatchOption) (Watcher, error) { } md := &mdnsWatcher{ - wo: wo, - ch: make(chan *mdns.ServiceEntry, 32), - exit: make(chan struct{}), - domain: m.domain, + id: uuid.New().String(), + wo: wo, + ch: make(chan *mdns.ServiceEntry, 32), + exit: make(chan struct{}), + domain: m.domain, + registry: m, } + m.mtx.Lock() + defer m.mtx.Unlock() + + // save the watcher + m.watchers[md.id] = md + + // check of the listener exists + if m.listener != nil { + return md, nil + } + + // start the listener go func() { - if err := mdns.Listen(md.ch, md.exit); err != nil { - md.Stop() + // go to infinity + for { + m.mtx.Lock() + + // just return if there are no watchers + if len(m.watchers) == 0 { + m.listener = nil + m.mtx.Unlock() + return + } + + // check existing listener + if m.listener != nil { + m.mtx.Unlock() + return + } + + // reset the listener + exit := make(chan struct{}) + ch := make(chan *mdns.ServiceEntry, 32) + m.listener = ch + + m.mtx.Unlock() + + // send messages to the watchers + go func() { + send := func(w *mdnsWatcher, e *mdns.ServiceEntry) { + select { + case w.ch <- e: + default: + } + } + + for { + select { + case <-exit: + return + case e, ok := <-ch: + if !ok { + return + } + m.mtx.RLock() + // send service entry to all watchers + for _, w := range m.watchers { + send(w, e) + } + m.mtx.RUnlock() + } + } + + }() + + // start listening, blocking call + mdns.Listen(ch, exit) + + // mdns.Listen has unblocked + // kill the saved listener + m.mtx.Lock() + m.listener = nil + close(ch) + m.mtx.Unlock() } }() diff --git a/registry/mdns_watcher.go b/registry/mdns_watcher.go index ce13866f..402811b9 100644 --- a/registry/mdns_watcher.go +++ b/registry/mdns_watcher.go @@ -8,11 +8,14 @@ import ( ) type mdnsWatcher struct { + id string wo WatchOptions ch chan *mdns.ServiceEntry exit chan struct{} // the mdns domain domain string + // the registry + registry *mdnsRegistry } func (m *mdnsWatcher) Next() (*Result, error) { @@ -76,5 +79,9 @@ func (m *mdnsWatcher) Stop() { return default: close(m.exit) + // remove self from the registry + m.registry.mtx.Lock() + delete(m.registry.watchers, m.id) + m.registry.mtx.Unlock() } }