1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-23 17:53:05 +02:00

Remove go routines for mdns watcher and cache registry (#919)

This commit is contained in:
Asim Aslam 2019-11-06 15:49:40 +00:00 committed by GitHub
parent b84134581c
commit 254045e9f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 128 additions and 18 deletions

View File

@ -39,6 +39,8 @@ type cache struct {
// used to stop the cache // used to stop the cache
exit chan bool exit chan bool
// indicate whether its running
running bool
// status of the registry // status of the registry
// used to hold onto the cache // used to hold onto the cache
// in failure state // in failure state
@ -157,13 +159,26 @@ func (c *cache) get(service string) ([]*registry.Service, error) {
} }
// watch service if not watched // watch service if not watched
if _, ok := c.watched[service]; !ok { _, ok := c.watched[service]
go c.run(service)
}
// unlock the read lock // unlock the read lock
c.RUnlock() c.RUnlock()
// check if its being watched
if !ok {
c.Lock()
// set to watched
c.watched[service] = true
// only kick it off if not running
if !c.running {
go c.run()
}
c.Unlock()
}
// get and return services // get and return services
return get(service, cp) return get(service, cp)
} }
@ -181,6 +196,11 @@ func (c *cache) update(res *registry.Result) {
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
// only save watched services
if _, ok := c.watched[res.Service.Name]; !ok {
return
}
services, ok := c.cache[res.Service.Name] services, ok := c.cache[res.Service.Name]
if !ok { if !ok {
// we're not going to cache anything // we're not going to cache anything
@ -283,16 +303,16 @@ func (c *cache) update(res *registry.Result) {
// run starts the cache watcher loop // run starts the cache watcher loop
// it creates a new watcher if there's a problem // it creates a new watcher if there's a problem
func (c *cache) run(service string) { func (c *cache) run() {
// set watcher
c.Lock() c.Lock()
c.watched[service] = true c.running = true
c.Unlock() c.Unlock()
// delete watcher on exit // reset watcher on exit
defer func() { defer func() {
c.Lock() c.Lock()
delete(c.watched, service) c.watched = make(map[string]bool)
c.running = false
c.Unlock() c.Unlock()
}() }()
@ -309,10 +329,7 @@ func (c *cache) run(service string) {
time.Sleep(time.Duration(j) * time.Millisecond) time.Sleep(time.Duration(j) * time.Millisecond)
// create new watcher // create new watcher
w, err := c.Registry.Watch( w, err := c.Registry.Watch()
registry.WatchService(service),
)
if err != nil { if err != nil {
if c.quit() { if c.quit() {
return return
@ -414,6 +431,9 @@ func (c *cache) GetService(service string) ([]*registry.Service, error) {
} }
func (c *cache) Stop() { func (c *cache) Stop() {
c.Lock()
defer c.Unlock()
select { select {
case <-c.exit: case <-c.exit:
return return

View File

@ -10,6 +10,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/google/uuid"
"github.com/micro/mdns" "github.com/micro/mdns"
) )
@ -37,6 +38,14 @@ type mdnsRegistry struct {
sync.Mutex sync.Mutex
services map[string][]*mdnsEntry services map[string][]*mdnsEntry
mtx sync.RWMutex
// watchers
watchers map[string]*mdnsWatcher
// listener
listener chan *mdns.ServiceEntry
} }
func newRegistry(opts ...Option) Registry { func newRegistry(opts ...Option) Registry {
@ -61,6 +70,7 @@ func newRegistry(opts ...Option) Registry {
opts: options, opts: options,
domain: domain, domain: domain,
services: make(map[string][]*mdnsEntry), services: make(map[string][]*mdnsEntry),
watchers: make(map[string]*mdnsWatcher),
} }
} }
@ -346,15 +356,88 @@ func (m *mdnsRegistry) Watch(opts ...WatchOption) (Watcher, error) {
} }
md := &mdnsWatcher{ md := &mdnsWatcher{
wo: wo, id: uuid.New().String(),
ch: make(chan *mdns.ServiceEntry, 32), wo: wo,
exit: make(chan struct{}), ch: make(chan *mdns.ServiceEntry, 32),
domain: m.domain, exit: make(chan struct{}),
domain: m.domain,
registry: m,
} }
m.mtx.Lock()
defer m.mtx.Unlock()
// save the watcher
m.watchers[md.id] = md
// check of the listener exists
if m.listener != nil {
return md, nil
}
// start the listener
go func() { go func() {
if err := mdns.Listen(md.ch, md.exit); err != nil { // go to infinity
md.Stop() for {
m.mtx.Lock()
// just return if there are no watchers
if len(m.watchers) == 0 {
m.listener = nil
m.mtx.Unlock()
return
}
// check existing listener
if m.listener != nil {
m.mtx.Unlock()
return
}
// reset the listener
exit := make(chan struct{})
ch := make(chan *mdns.ServiceEntry, 32)
m.listener = ch
m.mtx.Unlock()
// send messages to the watchers
go func() {
send := func(w *mdnsWatcher, e *mdns.ServiceEntry) {
select {
case w.ch <- e:
default:
}
}
for {
select {
case <-exit:
return
case e, ok := <-ch:
if !ok {
return
}
m.mtx.RLock()
// send service entry to all watchers
for _, w := range m.watchers {
send(w, e)
}
m.mtx.RUnlock()
}
}
}()
// start listening, blocking call
mdns.Listen(ch, exit)
// mdns.Listen has unblocked
// kill the saved listener
m.mtx.Lock()
m.listener = nil
close(ch)
m.mtx.Unlock()
} }
}() }()

View File

@ -8,11 +8,14 @@ import (
) )
type mdnsWatcher struct { type mdnsWatcher struct {
id string
wo WatchOptions wo WatchOptions
ch chan *mdns.ServiceEntry ch chan *mdns.ServiceEntry
exit chan struct{} exit chan struct{}
// the mdns domain // the mdns domain
domain string domain string
// the registry
registry *mdnsRegistry
} }
func (m *mdnsWatcher) Next() (*Result, error) { func (m *mdnsWatcher) Next() (*Result, error) {
@ -76,5 +79,9 @@ func (m *mdnsWatcher) Stop() {
return return
default: default:
close(m.exit) close(m.exit)
// remove self from the registry
m.registry.mtx.Lock()
delete(m.registry.watchers, m.id)
m.registry.mtx.Unlock()
} }
} }