From b68f0e237f39fa9f810e331bbf19f57020d81fdf Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Sat, 6 Jul 2019 00:36:15 +0100 Subject: [PATCH] Removed event from eventMap once sent to be advertised --- network/router/default_router.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index 7ad1197c..e5dff063 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -107,6 +107,7 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric // get the service to retrieve all its info srvs, err := reg.GetService(service.Name) if err != nil { + log.Logf("r.addServiceRoutes() GetService() error: %v", err) continue } @@ -157,6 +158,8 @@ func (r *router) watchServices(w registry.Watcher) error { break } + log.Logf("r.watchServices() new service event: %s", res.Service.Name) + route := Route{ Destination: res.Service.Name, Router: r.opts.Address, @@ -238,9 +241,6 @@ func (r *router) processEvents() error { // ticker to periodically scan event for advertising ticker := time.NewTicker(AdvertiseTick) - // TODO: Need to flag already advertised events otherwise we'll keep on advertising them - // as they keep getting advertised unless deleted and are only deleted when received by upstream - // advertEvent is a table event enriched with advert data type advertEvent struct { *Event @@ -263,8 +263,8 @@ process: case <-ticker.C: var events []*Event // decay the penalties of existing events - mu.RLock() - for _, event := range eventMap { + mu.Lock() + for advert, event := range eventMap { delta := time.Since(event.timestamp).Seconds() event.penalty = event.penalty * math.Exp(delta) // suppress or recover the event based on its current penalty @@ -278,9 +278,11 @@ process: e := new(Event) *e = *event.Event events = append(events, e) + // this deletes the advertised event from the map + delete(eventMap, advert) } } - mu.RUnlock() + mu.Unlock() if len(events) > 0 { wg.Add(1) @@ -356,7 +358,9 @@ process: } } + // first wait for the advertiser to finish wg.Wait() + // close the advert channel close(r.advertChan) log.Logf("r.processEvents(): event processor stopped") @@ -390,8 +394,6 @@ func (r *router) manage(errChan <-chan error) { } r.status = status - log.Logf("r.manage(): router status: %v", r.status) - // stop the router if some error happened if err != nil && code != Stopped { // this will stop watchers which will close r.advertChan