mirror of
https://github.com/go-micro/go-micro.git
synced 2024-11-24 08:02:32 +02:00
Merge pull request #591 from milosgajdos83/advert-damp
[WIP] Fixes advert route event dampening behaviour
This commit is contained in:
commit
96cf14ed53
@ -18,17 +18,24 @@ const (
|
||||
// AdvertiseTableTick is time interval in which router advertises all routes found in routing table
|
||||
AdvertiseTableTick = 1 * time.Minute
|
||||
// AdvertSuppress is advert suppression threshold
|
||||
AdvertSuppress = 2000
|
||||
AdvertSuppress = 2000.0
|
||||
// AdvertRecover is advert recovery threshold
|
||||
AdvertRecover = 750
|
||||
AdvertRecover = 750.0
|
||||
// DefaultAdvertTTL is default advertisement TTL
|
||||
DefaultAdvertTTL = 1 * time.Minute
|
||||
// PenaltyDecay is the penalty decay
|
||||
PenaltyDecay = 1.15
|
||||
// Delete penalises route addition and deletion
|
||||
Delete = 1000
|
||||
// DeletePenalty penalises route deletion
|
||||
DeletePenalty = 1000.0
|
||||
// UpdatePenalty penalises route updates
|
||||
UpdatePenalty = 500
|
||||
UpdatePenalty = 500.0
|
||||
// PenaltyHalfLife is the time the advert penalty decays to half its value
|
||||
PenaltyHalfLife = 2.0
|
||||
// MaxSuppressTime defines time after which the suppressed advert is deleted
|
||||
MaxSuppressTime = 5 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
// PenaltyDecay is a coefficient which controls the speed the advert penalty decays
|
||||
PenaltyDecay = math.Log(2) / PenaltyHalfLife
|
||||
)
|
||||
|
||||
// router provides default router implementation
|
||||
@ -153,9 +160,9 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
|
||||
return nil
|
||||
}
|
||||
|
||||
// watchServices watches services in given registry and updates the routing table accordingly.
|
||||
// It returns error if the service registry watcher stops or if the routing table can't be updated.
|
||||
func (r *router) watchServices(w registry.Watcher) error {
|
||||
// watchRegistry watches registry and updates routing table based on the received events.
|
||||
// It returns error if either the registry watcher fails with error or if the routing table update fails.
|
||||
func (r *router) watchRegistry(w registry.Watcher) error {
|
||||
// wait in the background for the router to stop
|
||||
// when the router stops, stop the watcher and exit
|
||||
r.wg.Add(1)
|
||||
@ -266,43 +273,26 @@ func (r *router) advertiseTable() error {
|
||||
|
||||
// advertise all routes as Update events to subscribers
|
||||
if len(events) > 0 {
|
||||
go func() {
|
||||
r.advertWg.Add(1)
|
||||
r.advertiseEvents(Update, events)
|
||||
}()
|
||||
go r.advertiseEvents(Update, events)
|
||||
}
|
||||
case <-r.exit:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isFlapping detects if the event is flapping based on the current and previous event status.
|
||||
func isFlapping(curr, prev *table.Event) bool {
|
||||
if curr.Type == table.Update && prev.Type == table.Update {
|
||||
return true
|
||||
}
|
||||
|
||||
if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// advertEvent is a table event enriched with advertisement data
|
||||
type advertEvent struct {
|
||||
*table.Event
|
||||
// timestamp marks the time the event has been received
|
||||
timestamp time.Time
|
||||
// penalty is current event penalty
|
||||
// routeAdvert contains a list of route events to be advertised
|
||||
type routeAdvert struct {
|
||||
events []*table.Event
|
||||
// lastUpdate records the time of the last advert update
|
||||
lastUpdate time.Time
|
||||
// penalty is current advert penalty
|
||||
penalty float64
|
||||
// isSuppressed flags if the event should be considered for flap detection
|
||||
// isSuppressed flags the advert suppression
|
||||
isSuppressed bool
|
||||
// isFlapping marks the event as flapping event
|
||||
isFlapping bool
|
||||
// suppressTime records the time interval the advert has been suppressed for
|
||||
suppressTime time.Time
|
||||
}
|
||||
|
||||
// processEvents processes routing table events.
|
||||
@ -310,22 +300,44 @@ type advertEvent struct {
|
||||
func (r *router) processEvents() error {
|
||||
// ticker to periodically scan event for advertising
|
||||
ticker := time.NewTicker(AdvertiseEventsTick)
|
||||
// eventMap is a map of advert events
|
||||
eventMap := make(map[uint64]*advertEvent)
|
||||
// advertMap is a map of advert events
|
||||
advertMap := make(map[uint64]*routeAdvert)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
var events []*table.Event
|
||||
// collect all events which are not flapping
|
||||
// TODO: decay the events and update suppression
|
||||
for key, event := range eventMap {
|
||||
if !event.isFlapping && !event.isSuppressed {
|
||||
for key, advert := range advertMap {
|
||||
// decay the event penalty
|
||||
delta := time.Since(advert.lastUpdate).Seconds()
|
||||
advert.penalty = advert.penalty * math.Exp(-delta*PenaltyDecay)
|
||||
|
||||
// suppress/recover the event based on its penalty level
|
||||
switch {
|
||||
case advert.penalty > AdvertSuppress && !advert.isSuppressed:
|
||||
advert.isSuppressed = true
|
||||
advert.suppressTime = time.Now()
|
||||
case advert.penalty < AdvertRecover && advert.isSuppressed:
|
||||
advert.isSuppressed = false
|
||||
}
|
||||
|
||||
// max suppression time threshold has been reached, delete the advert
|
||||
if advert.isSuppressed {
|
||||
if time.Since(advert.suppressTime) > MaxSuppressTime {
|
||||
delete(advertMap, key)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if !advert.isSuppressed {
|
||||
for _, event := range advert.events {
|
||||
e := new(table.Event)
|
||||
*e = *event.Event
|
||||
*e = *event
|
||||
events = append(events, e)
|
||||
// this deletes the advertised event from the map
|
||||
delete(eventMap, key)
|
||||
// delete the advert from the advertMap
|
||||
delete(advertMap, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -335,8 +347,6 @@ func (r *router) processEvents() error {
|
||||
go r.advertiseEvents(Update, events)
|
||||
}
|
||||
case e := <-r.eventChan:
|
||||
// event timestamp
|
||||
now := time.Now()
|
||||
// if event is nil, continue
|
||||
if e == nil {
|
||||
continue
|
||||
@ -348,36 +358,36 @@ func (r *router) processEvents() error {
|
||||
case table.Update:
|
||||
penalty = UpdatePenalty
|
||||
case table.Delete:
|
||||
penalty = Delete
|
||||
penalty = DeletePenalty
|
||||
}
|
||||
// we use route hash as eventMap key
|
||||
|
||||
// check if we have already registered the route
|
||||
// we use the route hash as advertMap key
|
||||
hash := e.Route.Hash()
|
||||
event, ok := eventMap[hash]
|
||||
advert, ok := advertMap[hash]
|
||||
if !ok {
|
||||
event = &advertEvent{
|
||||
Event: e,
|
||||
events := []*table.Event{e}
|
||||
advert = &routeAdvert{
|
||||
events: events,
|
||||
penalty: penalty,
|
||||
timestamp: time.Now(),
|
||||
lastUpdate: time.Now(),
|
||||
}
|
||||
eventMap[hash] = event
|
||||
advertMap[hash] = advert
|
||||
continue
|
||||
}
|
||||
// update penalty for existing event: decay existing and add new penalty
|
||||
delta := time.Since(event.timestamp).Seconds()
|
||||
event.penalty = event.penalty*math.Exp(-delta) + penalty
|
||||
event.timestamp = now
|
||||
|
||||
// suppress or recover the event based on its current penalty
|
||||
if !event.isSuppressed && event.penalty > AdvertSuppress {
|
||||
event.isSuppressed = true
|
||||
} else if event.penalty < AdvertRecover {
|
||||
event.isSuppressed = false
|
||||
}
|
||||
// if not suppressed decide if if its flapping
|
||||
if !event.isSuppressed {
|
||||
// detect if its flapping by comparing current and previous event
|
||||
event.isFlapping = isFlapping(e, event.Event)
|
||||
// attempt to squash last two events if possible
|
||||
lastEvent := advert.events[len(advert.events)-1]
|
||||
if lastEvent.Type == e.Type {
|
||||
advert.events[len(advert.events)-1] = e
|
||||
} else {
|
||||
advert.events = append(advert.events, e)
|
||||
}
|
||||
|
||||
// update event penalty and recorded timestamp
|
||||
advert.lastUpdate = time.Now()
|
||||
advert.penalty += penalty
|
||||
|
||||
case <-r.exit:
|
||||
// first wait for the advertiser to finish
|
||||
r.advertWg.Wait()
|
||||
@ -386,10 +396,6 @@ func (r *router) processEvents() error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// we probably never reach this code path
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// watchErrors watches router errors and takes appropriate actions
|
||||
@ -484,8 +490,9 @@ func (r *router) Advertise() (<-chan *Advert, error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed creating routing table watcher: %v", err)
|
||||
}
|
||||
// service registry watcher
|
||||
svcWatcher, err := r.opts.Registry.Watch()
|
||||
|
||||
// registry watcher
|
||||
regWatcher, err := r.opts.Registry.Watch()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed creating service registry watcher: %v", err)
|
||||
}
|
||||
@ -497,7 +504,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
|
||||
go func() {
|
||||
defer r.wg.Done()
|
||||
// watch local registry and register routes in routine table
|
||||
errChan <- r.watchServices(svcWatcher)
|
||||
errChan <- r.watchRegistry(regWatcher)
|
||||
}()
|
||||
|
||||
r.wg.Add(1)
|
||||
@ -594,5 +601,5 @@ func (r *router) Stop() error {
|
||||
|
||||
// String prints debugging information about router
|
||||
func (r *router) String() string {
|
||||
return "router"
|
||||
return "default"
|
||||
}
|
||||
|
@ -7,14 +7,14 @@ import (
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// TableOptions specify routing table options
|
||||
// Options specify routing table options
|
||||
// TODO: table options TBD in the future
|
||||
type TableOptions struct{}
|
||||
type Options struct{}
|
||||
|
||||
// table is an in memory routing table
|
||||
type table struct {
|
||||
// opts are table options
|
||||
opts TableOptions
|
||||
opts Options
|
||||
// m stores routing table map
|
||||
m map[string]map[uint64]Route
|
||||
// w is a list of table watchers
|
||||
@ -23,9 +23,9 @@ type table struct {
|
||||
}
|
||||
|
||||
// newTable creates a new routing table and returns it
|
||||
func newTable(opts ...TableOption) Table {
|
||||
func newTable(opts ...Option) Table {
|
||||
// default options
|
||||
var options TableOptions
|
||||
var options Options
|
||||
|
||||
// apply requested options
|
||||
for _, o := range opts {
|
||||
@ -40,7 +40,7 @@ func newTable(opts ...TableOption) Table {
|
||||
}
|
||||
|
||||
// Init initializes routing table with options
|
||||
func (t *table) Init(opts ...TableOption) error {
|
||||
func (t *table) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&t.opts)
|
||||
}
|
||||
@ -48,7 +48,7 @@ func (t *table) Init(opts ...TableOption) error {
|
||||
}
|
||||
|
||||
// Options returns routing table options
|
||||
func (t *table) Options() TableOptions {
|
||||
func (t *table) Options() Options {
|
||||
return t.opts
|
||||
}
|
||||
|
||||
@ -219,7 +219,7 @@ func (t *table) Size() int {
|
||||
defer t.RUnlock()
|
||||
|
||||
size := 0
|
||||
for dest, _ := range t.m {
|
||||
for dest := range t.m {
|
||||
size += len(t.m[dest])
|
||||
}
|
||||
|
||||
@ -227,6 +227,6 @@ func (t *table) Size() int {
|
||||
}
|
||||
|
||||
// String returns debug information
|
||||
func (t table) String() string {
|
||||
return "table"
|
||||
func (t *table) String() string {
|
||||
return "default"
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ func TestCreate(t *testing.T) {
|
||||
if err := table.Create(route); err != nil {
|
||||
t.Errorf("error adding route: %s", err)
|
||||
}
|
||||
testTableSize += 1
|
||||
testTableSize++
|
||||
|
||||
// adds new route for the original destination
|
||||
route.Gateway = "dest.gw2"
|
||||
@ -31,7 +31,7 @@ func TestCreate(t *testing.T) {
|
||||
if err := table.Create(route); err != nil {
|
||||
t.Errorf("error adding route: %s", err)
|
||||
}
|
||||
testTableSize += 1
|
||||
testTableSize++
|
||||
|
||||
if table.Size() != testTableSize {
|
||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
||||
@ -50,7 +50,7 @@ func TestDelete(t *testing.T) {
|
||||
if err := table.Create(route); err != nil {
|
||||
t.Errorf("error adding route: %s", err)
|
||||
}
|
||||
testTableSize += 1
|
||||
testTableSize++
|
||||
|
||||
// should fail to delete non-existant route
|
||||
prevSvc := route.Service
|
||||
@ -66,7 +66,7 @@ func TestDelete(t *testing.T) {
|
||||
if err := table.Delete(route); err != nil {
|
||||
t.Errorf("error deleting route: %s", err)
|
||||
}
|
||||
testTableSize -= 1
|
||||
testTableSize--
|
||||
|
||||
if table.Size() != testTableSize {
|
||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
||||
@ -80,7 +80,7 @@ func TestUpdate(t *testing.T) {
|
||||
if err := table.Create(route); err != nil {
|
||||
t.Errorf("error adding route: %s", err)
|
||||
}
|
||||
testTableSize += 1
|
||||
testTableSize++
|
||||
|
||||
// change the metric of the original route
|
||||
route.Metric = 200
|
||||
@ -100,7 +100,7 @@ func TestUpdate(t *testing.T) {
|
||||
if err := table.Update(route); err != nil {
|
||||
t.Errorf("error updating route: %s", err)
|
||||
}
|
||||
testTableSize += 1
|
||||
testTableSize++
|
||||
|
||||
if table.Size() != testTableSize {
|
||||
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
|
||||
|
@ -29,10 +29,10 @@ type Table interface {
|
||||
Size() int
|
||||
}
|
||||
|
||||
// TableOption used by the routing table
|
||||
type TableOption func(*TableOptions)
|
||||
// Option used by the routing table
|
||||
type Option func(*Options)
|
||||
|
||||
// NewTable creates new routing table and returns it
|
||||
func NewTable(opts ...TableOption) Table {
|
||||
func NewTable(opts ...Option) Table {
|
||||
return newTable(opts...)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user