1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

Strip down router code (#1627)

This commit is contained in:
Asim Aslam 2020-05-13 16:13:36 +01:00 committed by GitHub
parent ba64518ebd
commit 290595f88e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,7 +3,6 @@ package router
import (
"errors"
"fmt"
"math"
"sort"
"strings"
"sync"
@ -19,18 +18,6 @@ var (
AdvertiseEventsTick = 10 * time.Second
// DefaultAdvertTTL is default advertisement TTL
DefaultAdvertTTL = 2 * time.Minute
// AdvertSuppress is advert suppression threshold
AdvertSuppress = 200.0
// AdvertRecover is advert recovery threshold
AdvertRecover = 20.0
// Penalty for routes processed multiple times
Penalty = 100.0
// PenaltyHalfLife is the time the advert penalty decays to half its value
PenaltyHalfLife = 30.0
// MaxSuppressTime defines time after which the suppressed advert is deleted
MaxSuppressTime = 90 * time.Second
// PenaltyDecay is a coefficient which controls the speed the advert penalty decays
PenaltyDecay = math.Log(2) / PenaltyHalfLife
)
// router implements default router
@ -269,68 +256,8 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) {
r.sub.RUnlock()
}
// advert contains a route event to be advertised
type advert struct {
// event received from routing table
event *Event
// lastSeen records the time of the last advert update
lastSeen time.Time
// penalty is current advert penalty
penalty float64
// isSuppressed flags the advert suppression
isSuppressed bool
// suppressTime records the time interval the advert has been suppressed for
suppressTime time.Time
}
// adverts maintains a map of router adverts
type adverts map[uint64]*advert
// process processes advert
// It updates advert timestamp, increments its penalty and
// marks upresses or recovers it if it reaches configured thresholds
func (m adverts) process(a *advert) error {
// lookup advert in adverts
hash := a.event.Route.Hash()
a, ok := m[hash]
if !ok {
return fmt.Errorf("advert not found")
}
// decay the event penalty
delta := time.Since(a.lastSeen).Seconds()
// decay advert penalty
a.penalty = a.penalty * math.Exp(-delta*PenaltyDecay)
service := a.event.Route.Service
address := a.event.Route.Address
// suppress/recover the event based on its penalty level
switch {
case a.penalty > AdvertSuppress && !a.isSuppressed:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router suppressing advert %d %.2f for route %s %s", hash, a.penalty, service, address)
}
a.isSuppressed = true
a.suppressTime = time.Now()
case a.penalty < AdvertRecover && a.isSuppressed:
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router recovering advert %d %.2f for route %s %s", hash, a.penalty, service, address)
}
a.isSuppressed = false
}
// if suppressed, checked how long has it been suppressed for
if a.isSuppressed {
// max suppression time threshold has been reached, delete the advert
if time.Since(a.suppressTime) > MaxSuppressTime {
delete(m, hash)
return nil
}
}
return nil
}
type adverts map[uint64]*Event
// advertiseEvents advertises routing table events
// It suppresses unhealthy flapping events and advertises healthy events upstream.
@ -396,21 +323,9 @@ func (r *router) advertiseEvents() error {
var events []*Event
// collect all events which are not flapping
for key, advert := range adverts {
// process the advert
if err := adverts.process(advert); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router failed processing advert %d: %v", key, err)
}
continue
}
// if suppressed go to the next advert
if advert.isSuppressed {
continue
}
for key, event := range adverts {
// if we only advertise local routes skip processing anything not link local
if r.options.Advertise == AdvertiseLocal && advert.event.Route.Link != "local" {
if r.options.Advertise == AdvertiseLocal && event.Route.Link != "local" {
continue
}
@ -418,7 +333,7 @@ func (r *router) advertiseEvents() error {
e := new(Event)
// this is ok, because router.Event only contains builtin types
// and no references so this creates a deep copy of struct Event
*e = *(advert.event)
*e = *event
events = append(events, e)
// delete the advert from adverts
delete(adverts, key)
@ -447,44 +362,22 @@ func (r *router) advertiseEvents() error {
continue
}
now := time.Now()
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router processing table event %s for service %s %s", e.Type, e.Route.Service, e.Route.Address)
}
// check if we have already registered the route
hash := e.Route.Hash()
a, ok := adverts[hash]
ev, ok := adverts[hash]
if !ok {
a = &advert{
event: e,
penalty: Penalty,
lastSeen: now,
}
adverts[hash] = a
ev = e
adverts[hash] = e
continue
}
// override the route event only if the previous event was different
if a.event.Type != e.Type {
a.event = e
}
// process the advert
if err := adverts.process(a); err != nil {
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router error processing advert %d: %v", hash, err)
}
continue
}
// update event penalty and timestamp
a.lastSeen = now
// increment the penalty
a.penalty += Penalty
if logger.V(logger.DebugLevel, logger.DefaultLogger) {
logger.Debugf("Router advert %d for route %s %s event penalty: %f", hash, a.event.Route.Service, a.event.Route.Address, a.penalty)
if ev.Type != e.Type {
ev = e
}
case <-r.exit:
if w != nil {