mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-24 10:07:04 +02:00
Rather than append to list of events just keep the last event for a route hash
This commit is contained in:
parent
8b77d62ed4
commit
2d7975a7ce
@ -347,9 +347,10 @@ func (r *router) advertiseTable() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// routeAdvert contains a list of route events to be advertised
|
// routeAdvert contains a route event to be advertised
|
||||||
type routeAdvert struct {
|
type routeAdvert struct {
|
||||||
events []*Event
|
// event received from routing table
|
||||||
|
event *Event
|
||||||
// lastUpdate records the time of the last advert update
|
// lastUpdate records the time of the last advert update
|
||||||
lastUpdate time.Time
|
lastUpdate time.Time
|
||||||
// penalty is current advert penalty
|
// penalty is current advert penalty
|
||||||
@ -398,9 +399,11 @@ func (r *router) advertiseEvents() error {
|
|||||||
// suppress/recover the event based on its penalty level
|
// suppress/recover the event based on its penalty level
|
||||||
switch {
|
switch {
|
||||||
case advert.penalty > AdvertSuppress && !advert.isSuppressed:
|
case advert.penalty > AdvertSuppress && !advert.isSuppressed:
|
||||||
|
log.Debugf("Router supressing advert %d for route %s", key, advert.event.Route.Address)
|
||||||
advert.isSuppressed = true
|
advert.isSuppressed = true
|
||||||
advert.suppressTime = time.Now()
|
advert.suppressTime = time.Now()
|
||||||
case advert.penalty < AdvertRecover && advert.isSuppressed:
|
case advert.penalty < AdvertRecover && advert.isSuppressed:
|
||||||
|
log.Debugf("Router recovering advert %d for route %s", key, advert.event.Route.Address)
|
||||||
advert.isSuppressed = false
|
advert.isSuppressed = false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -413,13 +416,11 @@ func (r *router) advertiseEvents() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !advert.isSuppressed {
|
if !advert.isSuppressed {
|
||||||
for _, event := range advert.events {
|
e := new(Event)
|
||||||
e := new(Event)
|
*e = *(advert.event)
|
||||||
*e = *event
|
events = append(events, e)
|
||||||
events = append(events, e)
|
// delete the advert from the advertMap
|
||||||
// delete the advert from the advertMap
|
delete(advertMap, key)
|
||||||
delete(advertMap, key)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -445,13 +446,11 @@ func (r *router) advertiseEvents() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check if we have already registered the route
|
// check if we have already registered the route
|
||||||
// we use the route hash as advertMap key
|
|
||||||
hash := e.Route.Hash()
|
hash := e.Route.Hash()
|
||||||
advert, ok := advertMap[hash]
|
advert, ok := advertMap[hash]
|
||||||
if !ok {
|
if !ok {
|
||||||
events := []*Event{e}
|
|
||||||
advert = &routeAdvert{
|
advert = &routeAdvert{
|
||||||
events: events,
|
event: e,
|
||||||
penalty: penalty,
|
penalty: penalty,
|
||||||
lastUpdate: time.Now(),
|
lastUpdate: time.Now(),
|
||||||
}
|
}
|
||||||
@ -459,18 +458,15 @@ func (r *router) advertiseEvents() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// attempt to squash last two events if possible
|
// override the route event only if the last event was different
|
||||||
lastEvent := advert.events[len(advert.events)-1]
|
if advert.event.Type != e.Type {
|
||||||
if lastEvent.Type == e.Type && lastEvent.Route.Hash() == hash {
|
advert.event = e
|
||||||
log.Debugf("Router squashing event %s with hash %d for service %s", e.Type, hash, e.Route.Address)
|
|
||||||
advert.events[len(advert.events)-1] = e
|
|
||||||
} else {
|
|
||||||
advert.events = append(advert.events, e)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// update event penalty and recorded timestamp
|
// update event penalty and timestamp
|
||||||
advert.lastUpdate = time.Now()
|
advert.lastUpdate = time.Now()
|
||||||
advert.penalty += penalty
|
advert.penalty += penalty
|
||||||
|
log.Debugf("Router advert %d for route %s event penalty: %f", hash, advert.event.Route.Address, advert.penalty)
|
||||||
case <-r.exit:
|
case <-r.exit:
|
||||||
// first wait for the advertiser to finish
|
// first wait for the advertiser to finish
|
||||||
r.advertWg.Wait()
|
r.advertWg.Wait()
|
||||||
|
Loading…
Reference in New Issue
Block a user