1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-23 17:53:05 +02:00

fix data races in gossip registry

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Vasiliy Tolstov 2019-05-09 22:32:21 +03:00
parent dc2e150a58
commit 1a151a3348

View File

@ -159,6 +159,10 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
// shutdown old member // shutdown old member
g.Stop() g.Stop()
// lock internals
g.Lock()
// new done chan // new done chan
g.done = make(chan bool) g.done = make(chan bool)
@ -250,16 +254,12 @@ func configure(g *gossipRegistry, opts ...registry.Option) error {
events: g.events, events: g.events,
} }
} }
// create the memberlist // create the memberlist
m, err := memberlist.Create(c) m, err := memberlist.Create(c)
if err != nil { if err != nil {
return err return err
} }
// set internals
g.Lock()
if len(curAddrs) > 0 { if len(curAddrs) > 0 {
for _, addr := range curAddrs { for _, addr := range curAddrs {
g.members[addr] = nodeActionUnknown g.members[addr] = nodeActionUnknown
@ -547,9 +547,13 @@ func (g *gossipRegistry) expiryLoop(updates *updates) {
ticker := time.NewTicker(ExpiryTick) ticker := time.NewTicker(ExpiryTick)
defer ticker.Stop() defer ticker.Stop()
g.RLock()
done := g.done
g.RUnlock()
for { for {
select { select {
case <-g.done: case <-done:
return return
case <-ticker.C: case <-ticker.C:
now := uint64(time.Now().UnixNano()) now := uint64(time.Now().UnixNano())
@ -576,10 +580,13 @@ func (g *gossipRegistry) expiryLoop(updates *updates) {
// process member events // process member events
func (g *gossipRegistry) eventLoop() { func (g *gossipRegistry) eventLoop() {
g.RLock()
done := g.done
g.RUnlock()
for { for {
select { select {
// return when done // return when done
case <-g.done: case <-done:
return return
case ev := <-g.events: case ev := <-g.events:
// TODO: nonblocking update // TODO: nonblocking update
@ -603,10 +610,12 @@ func (g *gossipRegistry) run() {
// event loop // event loop
go g.eventLoop() go g.eventLoop()
g.RLock()
// connect loop // connect loop
if g.connectRetry { if g.connectRetry {
go g.connectLoop() go g.connectLoop()
} }
g.RUnlock()
// process the updates // process the updates
for u := range g.updates { for u := range g.updates {
@ -808,7 +817,6 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
watchers: make(map[string]chan *registry.Result), watchers: make(map[string]chan *registry.Result),
members: make(map[string]int32), members: make(map[string]int32),
} }
// run the updater // run the updater
go g.run() go g.run()
@ -816,7 +824,6 @@ func NewRegistry(opts ...registry.Option) registry.Registry {
if err := configure(g, opts...); err != nil { if err := configure(g, opts...); err != nil {
log.Fatalf("[gossip] Error configuring registry: %v", err) log.Fatalf("[gossip] Error configuring registry: %v", err)
} }
// wait for setup // wait for setup
<-time.After(g.interval * 2) <-time.After(g.interval * 2)