From ea872f6900b957bb815b0bcf9d78ecce714a68d6 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Wed, 3 Jul 2019 19:50:07 +0100 Subject: [PATCH 01/14] Updated error statements; Update ships list of events. --- network/router/default_router.go | 62 ++++++++++++++++++++++---------- network/router/default_table.go | 1 + network/router/router.go | 46 +++++++++++++++++------- network/router/table_watcher.go | 15 ++++---- 4 files changed, 86 insertions(+), 38 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index 9a7b933e..340c63f5 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -2,6 +2,7 @@ package router import ( "fmt" + "sort" "strings" "sync" "time" @@ -10,6 +11,15 @@ import ( "github.com/olekukonko/tablewriter" ) +const ( + // UpdateRoutePenalty penalises route updates + UpdateRoutePenalty = 500 + // DeleteRoutePenalty penalises route deletes + DeleteRoutePenalty = 1000 + // AdvertiseTick is time interval in which we advertise route updates + AdvertiseTick = 5 * time.Second +) + // router provides default router implementation type router struct { opts Options @@ -79,7 +89,7 @@ func (r *router) Network() string { func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { services, err := reg.ListServices() if err != nil { - return fmt.Errorf("failed to list services: %v", err) + return fmt.Errorf("failed listing services: %v", err) } // add each service node as a separate route @@ -148,12 +158,12 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { case "create": // only return error if the route is not duplicate, but something else has failed if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { - return fmt.Errorf("failed to add route for service %v: %s", res.Service.Name, err) + return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err) } case "delete": // only return error if the route is not in the table, but something else has failed if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { - return fmt.Errorf("failed to delete route for service %v: %s", res.Service.Name, err) + return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err) } } } @@ -175,7 +185,6 @@ func (r *router) watchTable(w Watcher) error { var watchErr error -exit: for { event, err := w.Next() if err != nil { @@ -188,12 +197,13 @@ exit: u := &Update{ ID: r.ID(), Timestamp: time.Now(), - Event: event, + Events: []*Event{event}, } select { case <-r.exit: - break exit + close(r.advertChan) + return watchErr case r.advertChan <- u: } } @@ -258,7 +268,7 @@ func (r *router) Advertise() (<-chan *Update, error) { Metric: DefaultLocalMetric, } if err := r.opts.Table.Add(route); err != nil { - return nil, fmt.Errorf("error to add default gateway route: %s", err) + return nil, fmt.Errorf("failed adding default gateway route: %s", err) } } @@ -271,12 +281,12 @@ func (r *router) Advertise() (<-chan *Update, error) { // routing table watcher which watches all routes i.e. to every destination tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) if err != nil { - return nil, fmt.Errorf("failed to create routing table watcher: %v", err) + return nil, fmt.Errorf("failed creating routing table watcher: %v", err) } // registry watcher regWatcher, err := r.opts.Registry.Watch() if err != nil { - return nil, fmt.Errorf("failed to create registry watcher: %v", err) + return nil, fmt.Errorf("failed creating registry watcher: %v", err) } // error channel collecting goroutine errors @@ -311,18 +321,32 @@ func (r *router) Advertise() (<-chan *Update, error) { } // Update updates the routing table using the advertised values -func (r *router) Update(a *Update) error { - // we extract the route from advertisement and update the routing table - route := Route{ - Destination: a.Event.Route.Destination, - Gateway: a.Event.Route.Gateway, - Router: a.Event.Route.Router, - Network: a.Event.Route.Network, - Metric: a.Event.Route.Metric, - Policy: AddIfNotExists, +func (r *router) Update(u *Update) error { + // NOTE: event sorting might not be necessary + // copy update events intp new slices + events := make([]*Event, len(u.Events)) + copy(events, u.Events) + // sort events by timestamp + sort.Slice(events, func(i, j int) bool { + return events[i].Timestamp.Before(events[j].Timestamp) + }) + + for _, event := range events { + // we extract the route from advertisement and update the routing table + route := Route{ + Destination: event.Route.Destination, + Gateway: event.Route.Gateway, + Router: event.Route.Router, + Network: event.Route.Network, + Metric: event.Route.Metric, + Policy: AddIfNotExists, + } + if err := r.opts.Table.Update(route); err != nil { + return fmt.Errorf("failed updating routing table: %v", err) + } } - return r.opts.Table.Update(route) + return nil } // Status returns router status diff --git a/network/router/default_table.go b/network/router/default_table.go index 3297c028..bf4e9646 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -138,6 +138,7 @@ func (t *table) Update(r Route) error { return ErrRouteNotFound } + // check if destination has this particular router in the table if _, ok := t.m[destAddr][sum]; !ok && r.Policy == AddIfNotExists { t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) diff --git a/network/router/router.go b/network/router/router.go index e45c0baa..e5ff9a18 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -34,14 +34,39 @@ type Router interface { String() string } +// Option used by the router +type Option func(*Options) + +// UpdateType is route advertisement update type +type UpdateType int + +const ( + // Announce is advertised when the router announces itself + Announce UpdateType = iota + // RouteEvent advertises route events + RouteEvent +) + +// String returns string representation of update event +func (ut UpdateType) String() string { + switch ut { + case Announce: + return "ANNOUNCE" + case RouteEvent: + return "ROUTE" + default: + return "UNKNOWN" + } +} + // Update is sent by the router to the network type Update struct { // ID is the router ID ID string - // Timestamp marks the time when update is sent + // Timestamp marks the time when the update is sent Timestamp time.Time - // Event defines advertisement even - Event *Event + // Events is a list of events to advertise + Events []*Event } // StatusCode defines router status @@ -58,12 +83,12 @@ type Status struct { const ( // Init means the rotuer has just been initialized Init StatusCode = iota - // Running means the router is running + // Running means the router is up and running Running - // Error means the router has crashed with error - Error - // Stopped means the router has stopped + // Stopped means the router has been stopped Stopped + // Error means the router has encountered error + Error ) // String returns human readable status code @@ -73,18 +98,15 @@ func (sc StatusCode) String() string { return "INITIALIZED" case Running: return "RUNNING" - case Error: - return "ERROR" case Stopped: return "STOPPED" + case Error: + return "ERROR" default: return "UNKNOWN" } } -// Option used by the router -type Option func(*Options) - // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...) diff --git a/network/router/table_watcher.go b/network/router/table_watcher.go index 91411247..976fa8af 100644 --- a/network/router/table_watcher.go +++ b/network/router/table_watcher.go @@ -3,6 +3,7 @@ package router import ( "errors" "strings" + "time" "github.com/olekukonko/tablewriter" ) @@ -16,11 +17,11 @@ var ( type EventType int const ( - // CreateEvent is emitted when new route has been created + // CreateEvent is emitted when a new route has been created CreateEvent EventType = iota // DeleteEvent is emitted when an existing route has been deleted DeleteEvent - // UpdateEvent is emitted when a routing table has been updated + // UpdateEvent is emitted when an existing route has been updated UpdateEvent ) @@ -42,6 +43,8 @@ func (et EventType) String() string { type Event struct { // Type defines type of event Type EventType + // Timestamp is event timestamp + Timestamp time.Time // Route is table rout Route Route } @@ -81,18 +84,16 @@ type tableWatcher struct { } // Next returns the next noticed action taken on table -// TODO: this needs to be thought through properly; we only allow watching particular route destination for now +// TODO: this needs to be thought through properly; +// right now we only allow to watch destination func (w *tableWatcher) Next() (*Event, error) { for { select { case res := <-w.resChan: switch w.opts.Destination { - case "*", "": + case res.Route.Destination, "*": return res, nil default: - if w.opts.Destination == res.Route.Destination { - return res, nil - } continue } case <-w.done: From d6c07dfb166e7b1801bd8104bd4ecb215b1c3e10 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 4 Jul 2019 02:06:59 +0100 Subject: [PATCH 02/14] Update is now Advert --- network/router/default_router.go | 16 ++++++++-------- network/router/router.go | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index 340c63f5..e1c97ccf 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -24,7 +24,7 @@ const ( type router struct { opts Options status Status - advertChan chan *Update + advertChan chan *Advert exit chan struct{} wg *sync.WaitGroup sync.RWMutex @@ -43,7 +43,7 @@ func newRouter(opts ...Option) Router { return &router{ opts: options, status: Status{Error: nil, Code: Init}, - advertChan: make(chan *Update), + advertChan: make(chan *Advert), exit: make(chan struct{}), wg: &sync.WaitGroup{}, } @@ -194,7 +194,7 @@ func (r *router) watchTable(w Watcher) error { break } - u := &Update{ + u := &Advert{ ID: r.ID(), Timestamp: time.Now(), Events: []*Event{event}, @@ -248,7 +248,7 @@ func (r *router) watchError(errChan <-chan error) { // Advertise advertises the routes to the network. // It returns error if any of the launched goroutines fail with error. -func (r *router) Advertise() (<-chan *Update, error) { +func (r *router) Advertise() (<-chan *Advert, error) { r.Lock() defer r.Unlock() @@ -275,7 +275,7 @@ func (r *router) Advertise() (<-chan *Update, error) { // NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped if r.status.Code == Error || r.status.Code == Stopped { r.exit = make(chan struct{}) - r.advertChan = make(chan *Update) + r.advertChan = make(chan *Advert) } // routing table watcher which watches all routes i.e. to every destination @@ -321,11 +321,11 @@ func (r *router) Advertise() (<-chan *Update, error) { } // Update updates the routing table using the advertised values -func (r *router) Update(u *Update) error { +func (r *router) Update(a *Advert) error { // NOTE: event sorting might not be necessary // copy update events intp new slices - events := make([]*Event, len(u.Events)) - copy(events, u.Events) + events := make([]*Event, len(a.Events)) + copy(events, a.Events) // sort events by timestamp sort.Slice(events, func(i, j int) bool { return events[i].Timestamp.Before(events[j].Timestamp) diff --git a/network/router/router.go b/network/router/router.go index e5ff9a18..20c05fa0 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -23,9 +23,9 @@ type Router interface { // Network returns the network address of the router Network() string // Advertise starts advertising routes to the network - Advertise() (<-chan *Update, error) + Advertise() (<-chan *Advert, error) // Update updates the routing table - Update(*Update) error + Update(*Advert) error // Status returns router status Status() Status // Stop stops the router @@ -43,8 +43,8 @@ type UpdateType int const ( // Announce is advertised when the router announces itself Announce UpdateType = iota - // RouteEvent advertises route events - RouteEvent + // Update advertises route updates + Update ) // String returns string representation of update event @@ -52,15 +52,15 @@ func (ut UpdateType) String() string { switch ut { case Announce: return "ANNOUNCE" - case RouteEvent: - return "ROUTE" + case Update: + return "UPDATE" default: return "UNKNOWN" } } -// Update is sent by the router to the network -type Update struct { +// Advert is sent by the router to the network +type Advert struct { // ID is the router ID ID string // Timestamp marks the time when the update is sent From 72ef0321626606d53291a6190004bcb263d96d55 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 5 Jul 2019 19:15:32 +0100 Subject: [PATCH 03/14] First shot at flapping detection and event advertising. This commit also adds Route hash function, lots of debug messages for now and String() methods for various API objects. --- network/router/default_router.go | 241 +++++++++++++++++++++++++++---- network/router/default_table.go | 6 + network/router/route.go | 12 +- network/router/table_watcher.go | 10 +- 4 files changed, 239 insertions(+), 30 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index e1c97ccf..7ad1197c 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -2,11 +2,13 @@ package router import ( "fmt" + "math" "sort" "strings" "sync" "time" + "github.com/micro/go-log" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" ) @@ -18,14 +20,21 @@ const ( DeleteRoutePenalty = 1000 // AdvertiseTick is time interval in which we advertise route updates AdvertiseTick = 5 * time.Second + // AdvertSuppress is advert suppression threshold + AdvertSuppress = 2000 + // AdvertRecover is advert suppression recovery threshold + AdvertRecover = 750 + // PenaltyDecay is the "half-life" of the penalty + PenaltyDecay = 1.15 ) // router provides default router implementation type router struct { opts Options status Status - advertChan chan *Advert exit chan struct{} + eventChan chan *Event + advertChan chan *Advert wg *sync.WaitGroup sync.RWMutex } @@ -43,8 +52,9 @@ func newRouter(opts ...Option) Router { return &router{ opts: options, status: Status{Error: nil, Code: Init}, - advertChan: make(chan *Advert), exit: make(chan struct{}), + eventChan: make(chan *Event), + advertChan: make(chan *Advert), wg: &sync.WaitGroup{}, } } @@ -83,9 +93,9 @@ func (r *router) Network() string { } // addServiceRoutes adds all services in given registry to the routing table. -// NOTE: this is a one-off operation done when bootstrapping the routing table +// NOTE: this is a one-off operation done when bootstrapping the router // It returns error if either the services failed to be listed or -// if any of the the routes could not be added to the routing table. +// if any of the the routes failed to be added to the routing table. func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { services, err := reg.ListServices() if err != nil { @@ -124,9 +134,9 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric return nil } -// manageServiceRoutes watches services in given registry and updates the routing table accordingly. -// It returns error if the service registry watcher has stopped or if the routing table failed to be updated. -func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { +// watchServices watches services in given registry and updates the routing table accordingly. +// It returns error if the service registry watcher stops or if the routing table can't be updated. +func (r *router) watchServices(w registry.Watcher) error { // wait in the background for the router to stop // when the router stops, stop the watcher and exit r.wg.Add(1) @@ -151,7 +161,7 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error { Destination: res.Service.Name, Router: r.opts.Address, Network: r.opts.Network, - Metric: metric, + Metric: DefaultLocalMetric, } switch res.Action { @@ -193,31 +203,173 @@ func (r *router) watchTable(w Watcher) error { } break } - - u := &Advert{ - ID: r.ID(), - Timestamp: time.Now(), - Events: []*Event{event}, - } - select { case <-r.exit: - close(r.advertChan) - return watchErr - case r.advertChan <- u: + close(r.eventChan) + return nil + case r.eventChan <- event: } } - // close the advertisement channel - close(r.advertChan) + // close event channel on error + close(r.eventChan) return watchErr } -// watchError watches router errors -func (r *router) watchError(errChan <-chan error) { +func eventFlap(curr, prev *Event) bool { + if curr.Type == UpdateEvent && prev.Type == UpdateEvent { + // update flap: this can be either metric or whatnot + log.Logf("eventFlap(): Update flap") + return true + } + + if curr.Type == CreateEvent && prev.Type == DeleteEvent || curr.Type == DeleteEvent && prev.Type == CreateEvent { + log.Logf("eventFlap(): Create/Delete flap") + return true + } + + return false +} + +// processEvents processes routing table events. +// It suppresses unhealthy flapping events and advertises healthy events upstream. +func (r *router) processEvents() error { + // ticker to periodically scan event for advertising + ticker := time.NewTicker(AdvertiseTick) + + // TODO: Need to flag already advertised events otherwise we'll keep on advertising them + // as they keep getting advertised unless deleted and are only deleted when received by upstream + + // advertEvent is a table event enriched with advert data + type advertEvent struct { + *Event + timestamp time.Time + penalty float64 + isSuppressed bool + isFlapping bool + } + + // eventMap is a map of advert events that might end up being advertised + eventMap := make(map[uint64]*advertEvent) + // lock to protect access to eventMap + mu := &sync.RWMutex{} + // waitgroup to manage advertisement goroutines + var wg sync.WaitGroup + +process: + for { + select { + case <-ticker.C: + var events []*Event + // decay the penalties of existing events + mu.RLock() + for _, event := range eventMap { + delta := time.Since(event.timestamp).Seconds() + event.penalty = event.penalty * math.Exp(delta) + // suppress or recover the event based on its current penalty + if !event.isSuppressed && event.penalty > AdvertSuppress { + event.isSuppressed = true + } else if event.penalty < AdvertRecover { + event.isSuppressed = false + event.isFlapping = false + } + if !event.isFlapping { + e := new(Event) + *e = *event.Event + events = append(events, e) + } + } + mu.RUnlock() + + if len(events) > 0 { + wg.Add(1) + go func(events []*Event) { + defer wg.Done() + + log.Logf("go advertise(): start") + + a := &Advert{ + ID: r.ID(), + Timestamp: time.Now(), + Events: events, + } + + select { + case r.advertChan <- a: + mu.Lock() + // once we've advertised the events, we need to delete them + for _, event := range a.Events { + delete(eventMap, event.Route.Hash()) + } + mu.Unlock() + case <-r.exit: + log.Logf("go advertise(): exit") + return + } + log.Logf("go advertise(): exit") + }(events) + } + case e := <-r.eventChan: + // if event is nil, break + if e == nil { + continue + } + log.Logf("r.processEvents(): event received:\n%s", e) + // determine the event penalty + var penalty float64 + switch e.Type { + case UpdateEvent: + penalty = UpdateRoutePenalty + case CreateEvent, DeleteEvent: + penalty = DeleteRoutePenalty + } + // we use route hash as eventMap key + hash := e.Route.Hash() + event, ok := eventMap[hash] + if !ok { + event = &advertEvent{ + Event: e, + penalty: penalty, + timestamp: time.Now(), + } + eventMap[hash] = event + continue + } + // update penalty for existing event: decay existing and add new penalty + delta := time.Since(event.timestamp).Seconds() + event.penalty = event.penalty*math.Exp(delta) + penalty + event.timestamp = time.Now() + // suppress or recover the event based on its current penalty + if !event.isSuppressed && event.penalty > AdvertSuppress { + event.isSuppressed = true + } else if event.penalty < AdvertRecover { + event.isSuppressed = false + } + // if not suppressed decide if if its flapping + if !event.isSuppressed { + // detect if its flapping + event.isFlapping = eventFlap(e, event.Event) + } + case <-r.exit: + break process + } + } + + wg.Wait() + close(r.advertChan) + + log.Logf("r.processEvents(): event processor stopped") + + return nil +} + +// manage watches router errors and takes appropriate actions +func (r *router) manage(errChan <-chan error) { defer r.wg.Done() + log.Logf("r.manage(): manage start") + var code StatusCode var err error @@ -228,6 +380,8 @@ func (r *router) watchError(errChan <-chan error) { code = Error } + log.Logf("r.manage(): manage exiting") + r.Lock() defer r.Unlock() status := Status{ @@ -236,6 +390,8 @@ func (r *router) watchError(errChan <-chan error) { } r.status = status + log.Logf("r.manage(): router status: %v", r.status) + // stop the router if some error happened if err != nil && code != Stopped { // this will stop watchers which will close r.advertChan @@ -243,7 +399,12 @@ func (r *router) watchError(errChan <-chan error) { // drain the advertise channel for range r.advertChan { } + // drain the event channel + for range r.eventChan { + } } + + log.Logf("r.manage(): manage exit") } // Advertise advertises the routes to the network. @@ -257,6 +418,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { return nil, fmt.Errorf("failed adding routes: %v", err) } + log.Logf("Routing table:\n%s", r.opts.Table) // add default gateway into routing table if r.opts.Gateway != "" { // note, the only non-default value is the gateway @@ -273,8 +435,10 @@ func (r *router) Advertise() (<-chan *Advert, error) { } // NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped + // TODO: these channels most likely won't have to be the struct fields if r.status.Code == Error || r.status.Code == Stopped { r.exit = make(chan struct{}) + r.eventChan = make(chan *Event) r.advertChan = make(chan *Advert) } @@ -283,31 +447,44 @@ func (r *router) Advertise() (<-chan *Advert, error) { if err != nil { return nil, fmt.Errorf("failed creating routing table watcher: %v", err) } - // registry watcher - regWatcher, err := r.opts.Registry.Watch() + // service registry watcher + svcWatcher, err := r.opts.Registry.Watch() if err != nil { - return nil, fmt.Errorf("failed creating registry watcher: %v", err) + return nil, fmt.Errorf("failed creating service registry watcher: %v", err) } // error channel collecting goroutine errors - errChan := make(chan error, 2) + errChan := make(chan error, 3) r.wg.Add(1) go func() { defer r.wg.Done() + log.Logf("r.Advertise(): r.watchServices() start") // watch local registry and register routes in routine table - errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric) + errChan <- r.watchServices(svcWatcher) + log.Logf("r.Advertise(): r.watchServices() exit") }() r.wg.Add(1) go func() { defer r.wg.Done() + log.Logf("r.Advertise(): r.watchTable() start") // watch local registry and register routes in routing table errChan <- r.watchTable(tableWatcher) + log.Logf("r.Advertise(): r.watchTable() exit") }() r.wg.Add(1) - go r.watchError(errChan) + go func() { + defer r.wg.Done() + log.Logf("r.Advertise(): r.processEvents() start") + // listen to routing table events and process them + errChan <- r.processEvents() + log.Logf("r.Advertise(): r.processEvents() exit") + }() + + r.wg.Add(1) + go r.manage(errChan) // mark router as running and set its Error to nil status := Status{ @@ -362,20 +539,28 @@ func (r *router) Status() Status { // Stop stops the router func (r *router) Stop() error { + log.Logf("r.Stop(): Stopping router") r.RLock() // only close the channel if the router is running if r.status.Code == Running { // notify all goroutines to finish close(r.exit) + log.Logf("r.Stop(): exit closed") // drain the advertise channel for range r.advertChan { } + log.Logf("r.Stop(): advert channel drained") + // drain the event channel + for range r.eventChan { + } + log.Logf("r.Stop(): event channel drained") } r.RUnlock() // wait for all goroutines to finish r.wg.Wait() + log.Logf("r.Stop(): Router stopped") return nil } diff --git a/network/router/default_table.go b/network/router/default_table.go index bf4e9646..28659cbd 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -8,6 +8,7 @@ import ( "sync" "github.com/google/uuid" + "github.com/micro/go-log" "github.com/olekukonko/tablewriter" ) @@ -19,6 +20,7 @@ type TableOptions struct{} type table struct { // opts are table options opts TableOptions + // TODO: we should stop key-ing on destination // m stores routing table map m map[string]map[uint64]Route // h hashes route entries @@ -242,12 +244,16 @@ func (t *table) sendEvent(r *Event) { t.RLock() defer t.RUnlock() + log.Logf("sending event to %d registered table watchers", len(t.w)) + for _, w := range t.w { select { case w.resChan <- r: case <-w.done: } } + + log.Logf("sending event done") } // Size returns the size of the routing table diff --git a/network/router/route.go b/network/router/route.go index 7ee3559c..5e3cd8e5 100644 --- a/network/router/route.go +++ b/network/router/route.go @@ -2,6 +2,7 @@ package router import ( "fmt" + "hash/fnv" "strings" "github.com/olekukonko/tablewriter" @@ -56,8 +57,17 @@ type Route struct { Policy RoutePolicy } +// Hash returns route hash sum. +func (r *Route) Hash() uint64 { + h := fnv.New64() + h.Reset() + h.Write([]byte(r.Destination + r.Gateway + r.Network)) + + return h.Sum64() +} + // String allows to print the route -func (r *Route) String() string { +func (r Route) String() string { // this will help us build routing table string sb := &strings.Builder{} diff --git a/network/router/table_watcher.go b/network/router/table_watcher.go index 976fa8af..2c5d8989 100644 --- a/network/router/table_watcher.go +++ b/network/router/table_watcher.go @@ -2,9 +2,11 @@ package router import ( "errors" + "fmt" "strings" "time" + "github.com/micro/go-log" "github.com/olekukonko/tablewriter" ) @@ -45,10 +47,15 @@ type Event struct { Type EventType // Timestamp is event timestamp Timestamp time.Time - // Route is table rout + // Route is table route Route Route } +// String prints human readable Event +func (e Event) String() string { + return fmt.Sprintf("[EVENT] Type: %s\nRoute:\n%s", e.Type, e.Route) +} + // WatchOption is used to define what routes to watch in the table type WatchOption func(*WatchOptions) @@ -94,6 +101,7 @@ func (w *tableWatcher) Next() (*Event, error) { case res.Route.Destination, "*": return res, nil default: + log.Logf("no table watcher available to receive the event") continue } case <-w.done: From b68f0e237f39fa9f810e331bbf19f57020d81fdf Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Sat, 6 Jul 2019 00:36:15 +0100 Subject: [PATCH 04/14] Removed event from eventMap once sent to be advertised --- network/router/default_router.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index 7ad1197c..e5dff063 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -107,6 +107,7 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric // get the service to retrieve all its info srvs, err := reg.GetService(service.Name) if err != nil { + log.Logf("r.addServiceRoutes() GetService() error: %v", err) continue } @@ -157,6 +158,8 @@ func (r *router) watchServices(w registry.Watcher) error { break } + log.Logf("r.watchServices() new service event: %s", res.Service.Name) + route := Route{ Destination: res.Service.Name, Router: r.opts.Address, @@ -238,9 +241,6 @@ func (r *router) processEvents() error { // ticker to periodically scan event for advertising ticker := time.NewTicker(AdvertiseTick) - // TODO: Need to flag already advertised events otherwise we'll keep on advertising them - // as they keep getting advertised unless deleted and are only deleted when received by upstream - // advertEvent is a table event enriched with advert data type advertEvent struct { *Event @@ -263,8 +263,8 @@ process: case <-ticker.C: var events []*Event // decay the penalties of existing events - mu.RLock() - for _, event := range eventMap { + mu.Lock() + for advert, event := range eventMap { delta := time.Since(event.timestamp).Seconds() event.penalty = event.penalty * math.Exp(delta) // suppress or recover the event based on its current penalty @@ -278,9 +278,11 @@ process: e := new(Event) *e = *event.Event events = append(events, e) + // this deletes the advertised event from the map + delete(eventMap, advert) } } - mu.RUnlock() + mu.Unlock() if len(events) > 0 { wg.Add(1) @@ -356,7 +358,9 @@ process: } } + // first wait for the advertiser to finish wg.Wait() + // close the advert channel close(r.advertChan) log.Logf("r.processEvents(): event processor stopped") @@ -390,8 +394,6 @@ func (r *router) manage(errChan <-chan error) { } r.status = status - log.Logf("r.manage(): router status: %v", r.status) - // stop the router if some error happened if err != nil && code != Stopped { // this will stop watchers which will close r.advertChan From 30d05e34a94efcc1f183184c001fc6fec08116a0 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Sat, 6 Jul 2019 01:31:59 +0100 Subject: [PATCH 05/14] Read and remove routes based on registry event deltas --- network/router/default_router.go | 43 +++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index e5dff063..872dfc93 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -158,25 +158,40 @@ func (r *router) watchServices(w registry.Watcher) error { break } - log.Logf("r.watchServices() new service event: %s", res.Service.Name) - - route := Route{ - Destination: res.Service.Name, - Router: r.opts.Address, - Network: r.opts.Network, - Metric: DefaultLocalMetric, - } + log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service) switch res.Action { case "create": - // only return error if the route is not duplicate, but something else has failed - if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { - return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err) + // range over the flat slice of nodes + for _, node := range res.Service.Nodes { + gateway := node.Address + if node.Port > 0 { + gateway = fmt.Sprintf("%s:%d", node.Address, node.Port) + } + route := Route{ + Destination: res.Service.Name, + Gateway: gateway, + Router: r.opts.Address, + Network: r.opts.Network, + Metric: DefaultLocalMetric, + } + if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { + return fmt.Errorf("error adding route for service %s: %s", res.Service.Name, err) + } } case "delete": - // only return error if the route is not in the table, but something else has failed - if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { - return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err) + for _, node := range res.Service.Nodes { + route := Route{ + Destination: res.Service.Name, + Gateway: node.Address, + Router: r.opts.Address, + Network: r.opts.Network, + Metric: DefaultLocalMetric, + } + // only return error if the route is not in the table, but something else has failed + if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { + return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err) + } } } } From 0c1a28a9b615f4ce0ca780865d2322a636295f43 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 8 Jul 2019 16:16:50 +0100 Subject: [PATCH 06/14] Router routing table management. Table route hashes. Status codes changed. We now manage routing table actions using dedicated functions run on either registry or services in the registry. Routing table now uses Route.Hash() instead of maintaining its own hash struct filed which previously performed these operations. Various names of variables have been changed to make them more concise. --- network/router/default_router.go | 105 +++++++++++---------------- network/router/default_table.go | 54 +++++--------- network/router/default_table_test.go | 23 +++--- network/router/options.go | 2 +- network/router/route.go | 36 ++++----- network/router/router.go | 12 +-- 6 files changed, 96 insertions(+), 136 deletions(-) diff --git a/network/router/default_router.go b/network/router/default_router.go index 872dfc93..a9517f9d 100644 --- a/network/router/default_router.go +++ b/network/router/default_router.go @@ -39,7 +39,7 @@ type router struct { sync.RWMutex } -// newRouter creates new router and returns it +// newRouter creates a new router and returns it func newRouter(opts ...Option) Router { // get default options options := DefaultOptions() @@ -51,7 +51,7 @@ func newRouter(opts ...Option) Router { return &router{ opts: options, - status: Status{Error: nil, Code: Init}, + status: Status{Error: nil, Code: Stopped}, exit: make(chan struct{}), eventChan: make(chan *Event), advertChan: make(chan *Advert), @@ -92,11 +92,39 @@ func (r *router) Network() string { return r.opts.Network } -// addServiceRoutes adds all services in given registry to the routing table. -// NOTE: this is a one-off operation done when bootstrapping the router -// It returns error if either the services failed to be listed or -// if any of the the routes failed to be added to the routing table. -func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { +// manageServiceRoutes manages the routes for a given service. +// It returns error of the routing table action fails with error. +func (r *router) manageServiceRoutes(service *registry.Service, action string, metric int) error { + // action is the routing table action + action = strings.ToLower(action) + // take route action on each service node + for _, node := range service.Nodes { + route := Route{ + Destination: service.Name, + Gateway: node.Address, + Router: r.opts.Address, + Network: r.opts.Network, + Metric: metric, + } + switch action { + case "insert", "create": + if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { + return fmt.Errorf("failed adding route for service %s: %s", service.Name, err) + } + case "delete": + if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { + return fmt.Errorf("failed deleting route for service %v: %s", service.Name, err) + } + default: + return fmt.Errorf("failed to manage route for service %v. Unknown action: %s", service.Name, action) + } + } + return nil +} + +// manageRegistryRoutes manages routes for each service found in the registry. +// It returns error if either the services failed to be listed or if the routing table action fails wirh error +func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metric int) error { services, err := reg.ListServices() if err != nil { return fmt.Errorf("failed listing services: %v", err) @@ -107,27 +135,13 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric // get the service to retrieve all its info srvs, err := reg.GetService(service.Name) if err != nil { - log.Logf("r.addServiceRoutes() GetService() error: %v", err) + log.Logf("r.manageRegistryRoutes() GetService() error: %v", err) continue } - - // create a flat slide of nodes - var nodes []*registry.Node + // manage the routes for all return services for _, s := range srvs { - nodes = append(nodes, s.Nodes...) - } - - // range over the flat slice of nodes - for _, node := range nodes { - route := Route{ - Destination: service.Name, - Gateway: node.Address, - Router: r.opts.Address, - Network: r.opts.Network, - Metric: metric, - } - if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { - return fmt.Errorf("error adding route for service %s: %s", service.Name, err) + if err := r.manageServiceRoutes(s, action, metric); err != nil { + return err } } } @@ -160,39 +174,8 @@ func (r *router) watchServices(w registry.Watcher) error { log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service) - switch res.Action { - case "create": - // range over the flat slice of nodes - for _, node := range res.Service.Nodes { - gateway := node.Address - if node.Port > 0 { - gateway = fmt.Sprintf("%s:%d", node.Address, node.Port) - } - route := Route{ - Destination: res.Service.Name, - Gateway: gateway, - Router: r.opts.Address, - Network: r.opts.Network, - Metric: DefaultLocalMetric, - } - if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { - return fmt.Errorf("error adding route for service %s: %s", res.Service.Name, err) - } - } - case "delete": - for _, node := range res.Service.Nodes { - route := Route{ - Destination: res.Service.Name, - Gateway: node.Address, - Router: r.opts.Address, - Network: r.opts.Network, - Metric: DefaultLocalMetric, - } - // only return error if the route is not in the table, but something else has failed - if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { - return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err) - } - } + if err := r.manageServiceRoutes(res.Service, res.Action, DefaultLocalMetric); err != nil { + return err } } @@ -431,8 +414,8 @@ func (r *router) Advertise() (<-chan *Advert, error) { defer r.Unlock() if r.status.Code != Running { - // add local service routes into the routing table - if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil { + // add all local service routes into the routing table + if err := r.manageRegistryRoutes(r.opts.Registry, "insert", DefaultLocalMetric); err != nil { return nil, fmt.Errorf("failed adding routes: %v", err) } log.Logf("Routing table:\n%s", r.opts.Table) @@ -533,7 +516,7 @@ func (r *router) Update(a *Advert) error { Router: event.Route.Router, Network: event.Route.Network, Metric: event.Route.Metric, - Policy: AddIfNotExists, + Policy: Insert, } if err := r.opts.Table.Update(route); err != nil { return fmt.Errorf("failed updating routing table: %v", err) diff --git a/network/router/default_table.go b/network/router/default_table.go index 28659cbd..9ae35e41 100644 --- a/network/router/default_table.go +++ b/network/router/default_table.go @@ -2,8 +2,6 @@ package router import ( "fmt" - "hash" - "hash/fnv" "strings" "sync" @@ -12,25 +10,22 @@ import ( "github.com/olekukonko/tablewriter" ) -// TableOptions are routing table options +// TableOptions specify routing table options // TODO: table options TBD in the future type TableOptions struct{} -// table is in memory routing table +// table is an in memory routing table type table struct { // opts are table options opts TableOptions - // TODO: we should stop key-ing on destination // m stores routing table map m map[string]map[uint64]Route - // h hashes route entries - h hash.Hash64 // w is a list of table watchers w map[string]*tableWatcher sync.RWMutex } -// newTable creates in memory routing table and returns it +// newTable creates a new routing table and returns it func newTable(opts ...TableOption) Table { // default options var options TableOptions @@ -40,14 +35,10 @@ func newTable(opts ...TableOption) Table { o(&options) } - h := fnv.New64() - h.Reset() - return &table{ opts: options, m: make(map[string]map[uint64]Route), w: make(map[string]*tableWatcher), - h: h, } } @@ -67,12 +58,12 @@ func (t *table) Options() TableOptions { // Add adds a route to the routing table func (t *table) Add(r Route) error { destAddr := r.Destination - sum := t.hash(r) + sum := r.Hash() t.Lock() defer t.Unlock() - // check if the destination has any routes in the table + // check if there are any routes in the table for the route destination if _, ok := t.m[destAddr]; !ok { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r @@ -80,7 +71,7 @@ func (t *table) Add(r Route) error { return nil } - // add new route to the table for the given destination + // add new route to the table for the route destination if _, ok := t.m[destAddr][sum]; !ok { t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) @@ -88,15 +79,15 @@ func (t *table) Add(r Route) error { } // only add the route if the route override is explicitly requested - if _, ok := t.m[destAddr][sum]; ok && r.Policy == OverrideIfExists { + if _, ok := t.m[destAddr][sum]; ok && r.Policy == Override { t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) return nil } - // if we reached this point without already returning the route already exists + // if we reached this point the route must already exist // we return nil only if explicitly requested by the client - if r.Policy == IgnoreIfExists { + if r.Policy == Skip { return nil } @@ -105,12 +96,12 @@ func (t *table) Add(r Route) error { // Delete deletes the route from the routing table func (t *table) Delete(r Route) error { + destAddr := r.Destination + sum := r.Hash() + t.Lock() defer t.Unlock() - destAddr := r.Destination - sum := t.hash(r) - if _, ok := t.m[destAddr]; !ok { return ErrRouteNotFound } @@ -121,17 +112,17 @@ func (t *table) Delete(r Route) error { return nil } -// Update updates routing table with new route +// Update updates routing table with the new route func (t *table) Update(r Route) error { destAddr := r.Destination - sum := t.hash(r) + sum := r.Hash() t.Lock() defer t.Unlock() - // check if the destAddr has ANY routes in the table + // check if the route destination has any routes in the table if _, ok := t.m[destAddr]; !ok { - if r.Policy == AddIfNotExists { + if r.Policy == Insert { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) @@ -140,8 +131,9 @@ func (t *table) Update(r Route) error { return ErrRouteNotFound } - // check if destination has this particular router in the table - if _, ok := t.m[destAddr][sum]; !ok && r.Policy == AddIfNotExists { + // check if the route for the route destination already exists + // NOTE: We only insert the route if explicitly requested by the client + if _, ok := t.m[destAddr][sum]; !ok && r.Policy == Insert { t.m[destAddr][sum] = r go t.sendEvent(&Event{Type: CreateEvent, Route: r}) return nil @@ -299,11 +291,3 @@ func (t *table) String() string { return sb.String() } - -// hash hashes the route using router gateway and network address -func (t *table) hash(r Route) uint64 { - t.h.Reset() - t.h.Write([]byte(r.Destination + r.Gateway + r.Network)) - - return t.h.Sum64() -} diff --git a/network/router/default_table_test.go b/network/router/default_table_test.go index 9098e5ad..46b8b034 100644 --- a/network/router/default_table_test.go +++ b/network/router/default_table_test.go @@ -2,7 +2,6 @@ package router import "testing" -// creates routing table and test route func testSetup() (Table, Route) { table := NewTable() @@ -35,32 +34,32 @@ func TestAdd(t *testing.T) { testTableSize += 1 // overrides an existing route - // NOTE: the size of the table should not change route.Metric = 100 - route.Policy = OverrideIfExists + route.Policy = Override if err := table.Add(route); err != nil { t.Errorf("error adding route: %s", err) } + // the size of the table should not change when Override policy is used if table.Size() != testTableSize { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } // dont add new route if it already exists - // NOTE: The size of the table should not change - route.Policy = IgnoreIfExists + route.Policy = Skip if err := table.Add(route); err != nil { t.Errorf("error adding route: %s", err) } + // the size of the table should not change if Skip policy is used if table.Size() != testTableSize { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } - // adding the same route under AddIfNotExists policy must error - route.Policy = AddIfNotExists + // adding the same route under Insert policy must error + route.Policy = Insert if err := table.Add(route); err != ErrDuplicateRoute { t.Errorf("error adding route. Expected error: %s, Given: %s", ErrDuplicateRoute, err) @@ -107,18 +106,17 @@ func TestUpdate(t *testing.T) { testTableSize += 1 // change the metric of the original route - // NOTE: this should NOT change the size of the table route.Metric = 200 if err := table.Update(route); err != nil { t.Errorf("error updating route: %s", err) } + // the size of the table should not change as we're only updating the metric of an existing route if table.Size() != testTableSize { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } - // NOTE: routing table routes on // this should add a new route route.Destination = "new.dest" @@ -127,12 +125,11 @@ func TestUpdate(t *testing.T) { } testTableSize += 1 - // NOTE: default policy is AddIfNotExists so the new route will be added here + // Default policy is Insert so the new route will be added here since the route does not exist if table.Size() != testTableSize { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } - // NOTE: we are hashing routes on // this should add a new route route.Gateway = "new.gw" @@ -145,9 +142,9 @@ func TestUpdate(t *testing.T) { t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) } - // this should NOT add a new route as we are setting the policy to IgnoreIfExists + // this should NOT add a new route as we are setting the policy to Skip route.Destination = "rand.dest" - route.Policy = IgnoreIfExists + route.Policy = Skip if err := table.Update(route); err != ErrRouteNotFound { t.Errorf("error updating route. Expected error: %s, given: %s", ErrRouteNotFound, err) diff --git a/network/router/options.go b/network/router/options.go index 2426fa32..f6413a53 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -9,7 +9,7 @@ var ( // DefaultAddress is default router address DefaultAddress = ":9093" // DefaultNetwork is default micro network - DefaultNetwork = "local" + DefaultNetwork = "micro.mu" ) // Options are router options diff --git a/network/router/route.go b/network/router/route.go index 5e3cd8e5..f995d9d4 100644 --- a/network/router/route.go +++ b/network/router/route.go @@ -9,33 +9,33 @@ import ( ) var ( - // DefaultLocalMetric is default route cost for local network + // DefaultLocalMetric is default route cost metric for the local network DefaultLocalMetric = 1 - // DefaultNetworkMetric is default route cost for micro network + // DefaultNetworkMetric is default route cost metric for the micro network DefaultNetworkMetric = 10 ) -// RoutePolicy defines routing table addition policy +// RoutePolicy defines routing table policy type RoutePolicy int const ( - // AddIfNotExist adds the route if it does not exist - AddIfNotExists RoutePolicy = iota - // OverrideIfExists overrides route if it already exists - OverrideIfExists - // IgnoreIfExists instructs to not modify existing route - IgnoreIfExists + // Insert inserts a new route if it does not already exist + Insert RoutePolicy = iota + // Override overrides the route if it already exists + Override + // Skip skips modifying the route if it already exists + Skip ) // String returns human reprensentation of policy func (p RoutePolicy) String() string { switch p { - case AddIfNotExists: - return "ADD_IF_NOT_EXISTS" - case OverrideIfExists: - return "OVERRIDE_IF_EXISTS" - case IgnoreIfExists: - return "IGNORE_IF_EXISTS" + case Insert: + return "INSERT" + case Override: + return "OVERRIDE" + case Skip: + return "SKIP" default: return "UNKNOWN" } @@ -47,9 +47,9 @@ type Route struct { Destination string // Gateway is route gateway Gateway string - // Router is the network router address + // Router is the router address Router string - // Network is micro network address + // Network is network address Network string // Metric is the route cost metric Metric int @@ -66,7 +66,7 @@ func (r *Route) Hash() uint64 { return h.Sum64() } -// String allows to print the route +// String returns human readable route func (r Route) String() string { // this will help us build routing table string sb := &strings.Builder{} diff --git a/network/router/router.go b/network/router/router.go index 20c05fa0..ea91538c 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -14,7 +14,7 @@ type Router interface { Init(...Option) error // Options returns the router options Options() Options - // ID returns the id of the router + // ID returns the ID of the router ID() string // Table returns the routing table Table() Table @@ -22,7 +22,7 @@ type Router interface { Address() string // Network returns the network address of the router Network() string - // Advertise starts advertising routes to the network + // Advertise advertises routes to the network Advertise() (<-chan *Advert, error) // Update updates the routing table Update(*Advert) error @@ -59,7 +59,7 @@ func (ut UpdateType) String() string { } } -// Advert is sent by the router to the network +// Advert contains a list of events advertised by the router to the network type Advert struct { // ID is the router ID ID string @@ -81,10 +81,8 @@ type Status struct { } const ( - // Init means the rotuer has just been initialized - Init StatusCode = iota // Running means the router is up and running - Running + Running StatusCode = iota // Stopped means the router has been stopped Stopped // Error means the router has encountered error @@ -94,8 +92,6 @@ const ( // String returns human readable status code func (sc StatusCode) String() string { switch sc { - case Init: - return "INITIALIZED" case Running: return "RUNNING" case Stopped: From cc590f5f2c8a63a55876604d6f19103eb61b839c Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 8 Jul 2019 16:51:55 +0100 Subject: [PATCH 07/14] Table now has a dedicated package inside router package. --- .../router/{default_router.go => default.go} | 68 ++++++++++--------- network/router/options.go | 7 +- network/router/router.go | 12 ++-- .../{default_table.go => table/default.go} | 16 ++--- .../default_test.go} | 2 +- network/router/{ => table}/query.go | 2 +- network/router/{ => table}/route.go | 2 +- network/router/{ => table}/table.go | 2 +- .../{table_watcher.go => table/watcher.go} | 22 +++--- 9 files changed, 71 insertions(+), 62 deletions(-) rename network/router/{default_router.go => default.go} (89%) rename network/router/{default_table.go => table/default.go} (93%) rename network/router/{default_table_test.go => table/default_test.go} (99%) rename network/router/{ => table}/query.go (99%) rename network/router/{ => table}/route.go (99%) rename network/router/{ => table}/table.go (98%) rename network/router/{table_watcher.go => table/watcher.go} (87%) diff --git a/network/router/default_router.go b/network/router/default.go similarity index 89% rename from network/router/default_router.go rename to network/router/default.go index a9517f9d..44e07108 100644 --- a/network/router/default_router.go +++ b/network/router/default.go @@ -9,6 +9,7 @@ import ( "time" "github.com/micro/go-log" + "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" ) @@ -33,7 +34,7 @@ type router struct { opts Options status Status exit chan struct{} - eventChan chan *Event + eventChan chan *table.Event advertChan chan *Advert wg *sync.WaitGroup sync.RWMutex @@ -53,7 +54,7 @@ func newRouter(opts ...Option) Router { opts: options, status: Status{Error: nil, Code: Stopped}, exit: make(chan struct{}), - eventChan: make(chan *Event), + eventChan: make(chan *table.Event), advertChan: make(chan *Advert), wg: &sync.WaitGroup{}, } @@ -78,7 +79,7 @@ func (r *router) ID() string { } // Table returns routing table -func (r *router) Table() Table { +func (r *router) Table() table.Table { return r.opts.Table } @@ -99,7 +100,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m action = strings.ToLower(action) // take route action on each service node for _, node := range service.Nodes { - route := Route{ + route := table.Route{ Destination: service.Name, Gateway: node.Address, Router: r.opts.Address, @@ -108,11 +109,11 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m } switch action { case "insert", "create": - if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { + if err := r.opts.Table.Add(route); err != nil && err != table.ErrDuplicateRoute { return fmt.Errorf("failed adding route for service %s: %s", service.Name, err) } case "delete": - if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { + if err := r.opts.Table.Delete(route); err != nil && err != table.ErrRouteNotFound { return fmt.Errorf("failed deleting route for service %v: %s", service.Name, err) } default: @@ -138,7 +139,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metr log.Logf("r.manageRegistryRoutes() GetService() error: %v", err) continue } - // manage the routes for all return services + // manage the routes for all returned services for _, s := range srvs { if err := r.manageServiceRoutes(s, action, metric); err != nil { return err @@ -174,7 +175,7 @@ func (r *router) watchServices(w registry.Watcher) error { log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service) - if err := r.manageServiceRoutes(res.Service, res.Action, DefaultLocalMetric); err != nil { + if err := r.manageServiceRoutes(res.Service, res.Action, table.DefaultLocalMetric); err != nil { return err } } @@ -184,7 +185,7 @@ func (r *router) watchServices(w registry.Watcher) error { // watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry // It returns error if the locally registered services either fails to be added/deleted to/from network registry. -func (r *router) watchTable(w Watcher) error { +func (r *router) watchTable(w table.Watcher) error { // wait in the background for the router to stop // when the router stops, stop the watcher and exit r.wg.Add(1) @@ -199,11 +200,14 @@ func (r *router) watchTable(w Watcher) error { for { event, err := w.Next() if err != nil { - if err != ErrWatcherStopped { + if err != table.ErrWatcherStopped { watchErr = err } break } + + log.Logf("r.watchTable() new table event: %s", event) + select { case <-r.exit: close(r.eventChan) @@ -218,14 +222,14 @@ func (r *router) watchTable(w Watcher) error { return watchErr } -func eventFlap(curr, prev *Event) bool { - if curr.Type == UpdateEvent && prev.Type == UpdateEvent { +func eventFlap(curr, prev *table.Event) bool { + if curr.Type == table.Update && prev.Type == table.Update { // update flap: this can be either metric or whatnot log.Logf("eventFlap(): Update flap") return true } - if curr.Type == CreateEvent && prev.Type == DeleteEvent || curr.Type == DeleteEvent && prev.Type == CreateEvent { + if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create { log.Logf("eventFlap(): Create/Delete flap") return true } @@ -241,7 +245,7 @@ func (r *router) processEvents() error { // advertEvent is a table event enriched with advert data type advertEvent struct { - *Event + *table.Event timestamp time.Time penalty float64 isSuppressed bool @@ -259,7 +263,7 @@ process: for { select { case <-ticker.C: - var events []*Event + var events []*table.Event // decay the penalties of existing events mu.Lock() for advert, event := range eventMap { @@ -273,7 +277,7 @@ process: event.isFlapping = false } if !event.isFlapping { - e := new(Event) + e := new(table.Event) *e = *event.Event events = append(events, e) // this deletes the advertised event from the map @@ -284,7 +288,7 @@ process: if len(events) > 0 { wg.Add(1) - go func(events []*Event) { + go func(events []*table.Event) { defer wg.Done() log.Logf("go advertise(): start") @@ -319,9 +323,9 @@ process: // determine the event penalty var penalty float64 switch e.Type { - case UpdateEvent: + case table.Update: penalty = UpdateRoutePenalty - case CreateEvent, DeleteEvent: + case table.Create, table.Delete: penalty = DeleteRoutePenalty } // we use route hash as eventMap key @@ -366,8 +370,8 @@ process: return nil } -// manage watches router errors and takes appropriate actions -func (r *router) manage(errChan <-chan error) { +// watchErrors watches router errors and takes appropriate actions +func (r *router) watchErrors(errChan <-chan error) { defer r.wg.Done() log.Logf("r.manage(): manage start") @@ -382,7 +386,7 @@ func (r *router) manage(errChan <-chan error) { code = Error } - log.Logf("r.manage(): manage exiting") + log.Logf("r.watchErrors(): watchErrors exiting") r.Lock() defer r.Unlock() @@ -404,7 +408,7 @@ func (r *router) manage(errChan <-chan error) { } } - log.Logf("r.manage(): manage exit") + log.Logf("r.watchErrors(): watchErrors exit") } // Advertise advertises the routes to the network. @@ -415,19 +419,19 @@ func (r *router) Advertise() (<-chan *Advert, error) { if r.status.Code != Running { // add all local service routes into the routing table - if err := r.manageRegistryRoutes(r.opts.Registry, "insert", DefaultLocalMetric); err != nil { + if err := r.manageRegistryRoutes(r.opts.Registry, "insert", table.DefaultLocalMetric); err != nil { return nil, fmt.Errorf("failed adding routes: %v", err) } log.Logf("Routing table:\n%s", r.opts.Table) // add default gateway into routing table if r.opts.Gateway != "" { // note, the only non-default value is the gateway - route := Route{ + route := table.Route{ Destination: "*", Gateway: r.opts.Gateway, Router: "*", Network: "*", - Metric: DefaultLocalMetric, + Metric: table.DefaultLocalMetric, } if err := r.opts.Table.Add(route); err != nil { return nil, fmt.Errorf("failed adding default gateway route: %s", err) @@ -438,12 +442,12 @@ func (r *router) Advertise() (<-chan *Advert, error) { // TODO: these channels most likely won't have to be the struct fields if r.status.Code == Error || r.status.Code == Stopped { r.exit = make(chan struct{}) - r.eventChan = make(chan *Event) + r.eventChan = make(chan *table.Event) r.advertChan = make(chan *Advert) } // routing table watcher which watches all routes i.e. to every destination - tableWatcher, err := r.opts.Table.Watch(WatchDestination("*")) + tableWatcher, err := r.opts.Table.Watch(table.WatchDestination("*")) if err != nil { return nil, fmt.Errorf("failed creating routing table watcher: %v", err) } @@ -484,7 +488,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { }() r.wg.Add(1) - go r.manage(errChan) + go r.watchErrors(errChan) // mark router as running and set its Error to nil status := Status{ @@ -501,7 +505,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { func (r *router) Update(a *Advert) error { // NOTE: event sorting might not be necessary // copy update events intp new slices - events := make([]*Event, len(a.Events)) + events := make([]*table.Event, len(a.Events)) copy(events, a.Events) // sort events by timestamp sort.Slice(events, func(i, j int) bool { @@ -510,13 +514,13 @@ func (r *router) Update(a *Advert) error { for _, event := range events { // we extract the route from advertisement and update the routing table - route := Route{ + route := table.Route{ Destination: event.Route.Destination, Gateway: event.Route.Gateway, Router: event.Route.Router, Network: event.Route.Network, Metric: event.Route.Metric, - Policy: Insert, + Policy: table.Insert, } if err := r.opts.Table.Update(route); err != nil { return fmt.Errorf("failed updating routing table: %v", err) diff --git a/network/router/options.go b/network/router/options.go index f6413a53..eb287075 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -2,6 +2,7 @@ package router import ( "github.com/google/uuid" + "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" ) @@ -25,7 +26,7 @@ type Options struct { // Registry is the local registry Registry registry.Registry // Table is routing table - Table Table + Table table.Table } // ID sets Router ID @@ -57,7 +58,7 @@ func Gateway(g string) Option { } // RoutingTable sets the routing table -func RoutingTable(t Table) Option { +func RoutingTable(t table.Table) Option { return func(o *Options) { o.Table = t } @@ -77,6 +78,6 @@ func DefaultOptions() Options { Address: DefaultAddress, Network: DefaultNetwork, Registry: registry.DefaultRegistry, - Table: NewTable(), + Table: table.NewTable(), } } diff --git a/network/router/router.go b/network/router/router.go index ea91538c..e0f7d676 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -1,7 +1,11 @@ // Package router provides a network routing control plane package router -import "time" +import ( + "time" + + "github.com/micro/go-micro/network/router/table" +) var ( // DefaultRouter is default network router @@ -17,7 +21,7 @@ type Router interface { // ID returns the ID of the router ID() string // Table returns the routing table - Table() Table + Table() table.Table // Address returns the router adddress Address() string // Network returns the network address of the router @@ -65,8 +69,8 @@ type Advert struct { ID string // Timestamp marks the time when the update is sent Timestamp time.Time - // Events is a list of events to advertise - Events []*Event + // Events is a list of routing table events to advertise + Events []*table.Event } // StatusCode defines router status diff --git a/network/router/default_table.go b/network/router/table/default.go similarity index 93% rename from network/router/default_table.go rename to network/router/table/default.go index 9ae35e41..3480183e 100644 --- a/network/router/default_table.go +++ b/network/router/table/default.go @@ -1,4 +1,4 @@ -package router +package table import ( "fmt" @@ -67,21 +67,21 @@ func (t *table) Add(r Route) error { if _, ok := t.m[destAddr]; !ok { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + go t.sendEvent(&Event{Type: Create, Route: r}) return nil } // add new route to the table for the route destination if _, ok := t.m[destAddr][sum]; !ok { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + go t.sendEvent(&Event{Type: Create, Route: r}) return nil } // only add the route if the route override is explicitly requested if _, ok := t.m[destAddr][sum]; ok && r.Policy == Override { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) + go t.sendEvent(&Event{Type: Update, Route: r}) return nil } @@ -107,7 +107,7 @@ func (t *table) Delete(r Route) error { } delete(t.m[destAddr], sum) - go t.sendEvent(&Event{Type: DeleteEvent, Route: r}) + go t.sendEvent(&Event{Type: Delete, Route: r}) return nil } @@ -125,7 +125,7 @@ func (t *table) Update(r Route) error { if r.Policy == Insert { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + go t.sendEvent(&Event{Type: Create, Route: r}) return nil } return ErrRouteNotFound @@ -135,14 +135,14 @@ func (t *table) Update(r Route) error { // NOTE: We only insert the route if explicitly requested by the client if _, ok := t.m[destAddr][sum]; !ok && r.Policy == Insert { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: CreateEvent, Route: r}) + go t.sendEvent(&Event{Type: Create, Route: r}) return nil } // if the route has been found update it if _, ok := t.m[destAddr][sum]; ok { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: UpdateEvent, Route: r}) + go t.sendEvent(&Event{Type: Update, Route: r}) return nil } diff --git a/network/router/default_table_test.go b/network/router/table/default_test.go similarity index 99% rename from network/router/default_table_test.go rename to network/router/table/default_test.go index 46b8b034..a0c364b7 100644 --- a/network/router/default_table_test.go +++ b/network/router/table/default_test.go @@ -1,4 +1,4 @@ -package router +package table import "testing" diff --git a/network/router/query.go b/network/router/table/query.go similarity index 99% rename from network/router/query.go rename to network/router/table/query.go index befdd1d9..ccb7d396 100644 --- a/network/router/query.go +++ b/network/router/table/query.go @@ -1,4 +1,4 @@ -package router +package table import ( "fmt" diff --git a/network/router/route.go b/network/router/table/route.go similarity index 99% rename from network/router/route.go rename to network/router/table/route.go index f995d9d4..3d9c3bcb 100644 --- a/network/router/route.go +++ b/network/router/table/route.go @@ -1,4 +1,4 @@ -package router +package table import ( "fmt" diff --git a/network/router/table.go b/network/router/table/table.go similarity index 98% rename from network/router/table.go rename to network/router/table/table.go index c00484cd..9d353a54 100644 --- a/network/router/table.go +++ b/network/router/table/table.go @@ -1,4 +1,4 @@ -package router +package table import ( "errors" diff --git a/network/router/table_watcher.go b/network/router/table/watcher.go similarity index 87% rename from network/router/table_watcher.go rename to network/router/table/watcher.go index 2c5d8989..850a9089 100644 --- a/network/router/table_watcher.go +++ b/network/router/table/watcher.go @@ -1,4 +1,4 @@ -package router +package table import ( "errors" @@ -19,22 +19,22 @@ var ( type EventType int const ( - // CreateEvent is emitted when a new route has been created - CreateEvent EventType = iota - // DeleteEvent is emitted when an existing route has been deleted - DeleteEvent - // UpdateEvent is emitted when an existing route has been updated - UpdateEvent + // Create is emitted when a new route has been created + Create EventType = iota + // Delete is emitted when an existing route has been deleted + Delete + // Update is emitted when an existing route has been updated + Update ) // String returns string representation of the event func (et EventType) String() string { switch et { - case CreateEvent: + case Create: return "CREATE" - case DeleteEvent: + case Delete: return "DELETE" - case UpdateEvent: + case Update: return "UPDATE" default: return "UNKNOWN" @@ -53,7 +53,7 @@ type Event struct { // String prints human readable Event func (e Event) String() string { - return fmt.Sprintf("[EVENT] Type: %s\nRoute:\n%s", e.Type, e.Route) + return fmt.Sprintf("[EVENT] %s:\nRoute:\n%s", e.Type, e.Route) } // WatchOption is used to define what routes to watch in the table From b82245429eb158eee541223bc32f257d5d714e0f Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Mon, 8 Jul 2019 21:03:54 +0100 Subject: [PATCH 08/14] Simplified table logic. Lookup tests. mucp/cient update --- client/selector/router/router.go | 11 +- network/proxy/mucp/mucp.go | 11 +- network/router/default.go | 111 +++++++++--------- network/router/router.go | 7 +- network/router/table/default.go | 31 +---- network/router/table/default_test.go | 163 +++++++++++++++++---------- network/router/table/query.go | 1 + network/router/table/route.go | 32 +----- network/router/table/watcher.go | 10 +- 9 files changed, 183 insertions(+), 194 deletions(-) diff --git a/client/selector/router/router.go b/client/selector/router/router.go index 4bcdecd1..65e69702 100644 --- a/client/selector/router/router.go +++ b/client/selector/router/router.go @@ -11,6 +11,7 @@ import ( "github.com/micro/go-micro/client/selector" "github.com/micro/go-micro/network/router" pb "github.com/micro/go-micro/network/router/proto" + "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" ) @@ -40,11 +41,11 @@ type clientKey struct{} type routerKey struct{} // getRoutes returns the routes whether they are remote or local -func (r *routerSelector) getRoutes(service string) ([]router.Route, error) { +func (r *routerSelector) getRoutes(service string) ([]table.Route, error) { if !r.remote { // lookup router for routes for the service - return r.r.Table().Lookup(router.NewQuery( - router.QueryDestination(service), + return r.r.Table().Lookup(table.NewQuery( + table.QueryDestination(service), )) } @@ -101,11 +102,11 @@ func (r *routerSelector) getRoutes(service string) ([]router.Route, error) { return nil, selector.ErrNoneAvailable } - var routes []router.Route + var routes []table.Route // convert from pb to []*router.Route for _, r := range pbRoutes.Routes { - routes = append(routes, router.Route{ + routes = append(routes, table.Route{ Destination: r.Destination, Gateway: r.Gateway, Router: r.Router, diff --git a/network/proxy/mucp/mucp.go b/network/proxy/mucp/mucp.go index 1abe976b..d0ecb81f 100644 --- a/network/proxy/mucp/mucp.go +++ b/network/proxy/mucp/mucp.go @@ -18,6 +18,7 @@ import ( "github.com/micro/go-micro/server" pb "github.com/micro/go-micro/network/router/proto" + "github.com/micro/go-micro/network/router/table" ) // Proxy will transparently proxy requests to an endpoint. @@ -40,7 +41,7 @@ type Proxy struct { // A fib of routes service:address sync.RWMutex - Routes map[string][]router.Route + Routes map[string][]table.Route } // read client request and write to server @@ -80,7 +81,7 @@ func readLoop(r server.Request, s client.Stream) error { func (p *Proxy) getRoute(service string) ([]string, error) { // converts routes to just addresses - toNodes := func(routes []router.Route) []string { + toNodes := func(routes []table.Route) []string { var nodes []string for _, node := range routes { nodes = append(nodes, node.Gateway) @@ -106,7 +107,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) { if p.Router != nil { // lookup the router routes, err := p.Router.Table().Lookup( - router.NewQuery(router.QueryDestination(service)), + table.NewQuery(table.QueryDestination(service)), ) if err != nil { return nil, err @@ -114,7 +115,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) { p.Lock() if p.Routes == nil { - p.Routes = make(map[string][]router.Route) + p.Routes = make(map[string][]table.Route) } p.Routes[service] = routes p.Unlock() @@ -203,7 +204,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) { // convert from pb to []*router.Route for _, r := range pbRoutes.Routes { - routes = append(routes, router.Route{ + routes = append(routes, table.Route{ Destination: r.Destination, Gateway: r.Gateway, Router: r.Router, diff --git a/network/router/default.go b/network/router/default.go index 44e07108..b77b40fe 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -15,18 +15,20 @@ import ( ) const ( - // UpdateRoutePenalty penalises route updates - UpdateRoutePenalty = 500 - // DeleteRoutePenalty penalises route deletes - DeleteRoutePenalty = 1000 // AdvertiseTick is time interval in which we advertise route updates AdvertiseTick = 5 * time.Second // AdvertSuppress is advert suppression threshold AdvertSuppress = 2000 - // AdvertRecover is advert suppression recovery threshold + // AdvertRecover is advert recovery threshold AdvertRecover = 750 - // PenaltyDecay is the "half-life" of the penalty + // DefaultAdvertTTL is default advertisement TTL + DefaultAdvertTTL = time.Minute + // PenaltyDecay is the penalty decay PenaltyDecay = 1.15 + // Delete penalises route addition and deletion + Delete = 1000 + // UpdatePenalty penalises route updates + UpdatePenalty = 500 ) // router provides default router implementation @@ -93,8 +95,8 @@ func (r *router) Network() string { return r.opts.Network } -// manageServiceRoutes manages the routes for a given service. -// It returns error of the routing table action fails with error. +// manageServiceRoutes manages routes for a given service. +// It returns error of the routing table action fails. func (r *router) manageServiceRoutes(service *registry.Service, action string, metric int) error { // action is the routing table action action = strings.ToLower(action) @@ -124,7 +126,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m } // manageRegistryRoutes manages routes for each service found in the registry. -// It returns error if either the services failed to be listed or if the routing table action fails wirh error +// It returns error if either the services failed to be listed or the routing table action fails. func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metric int) error { services, err := reg.ListServices() if err != nil { @@ -222,66 +224,60 @@ func (r *router) watchTable(w table.Watcher) error { return watchErr } -func eventFlap(curr, prev *table.Event) bool { +// isFlapping detects if the event is flapping based on the current and previous event status. +func isFlapping(curr, prev *table.Event) bool { if curr.Type == table.Update && prev.Type == table.Update { - // update flap: this can be either metric or whatnot - log.Logf("eventFlap(): Update flap") + log.Logf("isFlapping(): Update flap") return true } - if curr.Type == table.Create && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Create { - log.Logf("eventFlap(): Create/Delete flap") + if curr.Type == table.Insert && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Insert { + log.Logf("isFlapping(): Create/Delete flap") return true } return false } +// updateEvent is a table event enriched with advertisement data +type updateEvent struct { + *table.Event + // timestamp marks the time the event has been received + timestamp time.Time + // penalty is current event penalty + penalty float64 + // isSuppressed flags if the event should be considered for flap detection + isSuppressed bool + // isFlapping marks the event as flapping event + isFlapping bool +} + // processEvents processes routing table events. // It suppresses unhealthy flapping events and advertises healthy events upstream. func (r *router) processEvents() error { // ticker to periodically scan event for advertising ticker := time.NewTicker(AdvertiseTick) - - // advertEvent is a table event enriched with advert data - type advertEvent struct { - *table.Event - timestamp time.Time - penalty float64 - isSuppressed bool - isFlapping bool - } - - // eventMap is a map of advert events that might end up being advertised - eventMap := make(map[uint64]*advertEvent) + // eventMap is a map of advert events + eventMap := make(map[uint64]*updateEvent) // lock to protect access to eventMap mu := &sync.RWMutex{} // waitgroup to manage advertisement goroutines var wg sync.WaitGroup -process: +processLoop: for { select { case <-ticker.C: var events []*table.Event - // decay the penalties of existing events + // collect all events which are not flapping mu.Lock() - for advert, event := range eventMap { - delta := time.Since(event.timestamp).Seconds() - event.penalty = event.penalty * math.Exp(delta) - // suppress or recover the event based on its current penalty - if !event.isSuppressed && event.penalty > AdvertSuppress { - event.isSuppressed = true - } else if event.penalty < AdvertRecover { - event.isSuppressed = false - event.isFlapping = false - } - if !event.isFlapping { + for key, event := range eventMap { + if !event.isFlapping && !event.isSuppressed { e := new(table.Event) *e = *event.Event events = append(events, e) // this deletes the advertised event from the map - delete(eventMap, advert) + delete(eventMap, key) } } mu.Unlock() @@ -301,12 +297,6 @@ process: select { case r.advertChan <- a: - mu.Lock() - // once we've advertised the events, we need to delete them - for _, event := range a.Events { - delete(eventMap, event.Route.Hash()) - } - mu.Unlock() case <-r.exit: log.Logf("go advertise(): exit") return @@ -315,7 +305,9 @@ process: }(events) } case e := <-r.eventChan: - // if event is nil, break + // event timestamp + now := time.Now() + // if event is nil, continue if e == nil { continue } @@ -324,15 +316,15 @@ process: var penalty float64 switch e.Type { case table.Update: - penalty = UpdateRoutePenalty - case table.Create, table.Delete: - penalty = DeleteRoutePenalty + penalty = UpdatePenalty + case table.Delete: + penalty = Delete } // we use route hash as eventMap key hash := e.Route.Hash() event, ok := eventMap[hash] if !ok { - event = &advertEvent{ + event = &updateEvent{ Event: e, penalty: penalty, timestamp: time.Now(), @@ -342,8 +334,8 @@ process: } // update penalty for existing event: decay existing and add new penalty delta := time.Since(event.timestamp).Seconds() - event.penalty = event.penalty*math.Exp(delta) + penalty - event.timestamp = time.Now() + event.penalty = event.penalty*math.Exp(-delta) + penalty + event.timestamp = now // suppress or recover the event based on its current penalty if !event.isSuppressed && event.penalty > AdvertSuppress { event.isSuppressed = true @@ -352,11 +344,11 @@ process: } // if not suppressed decide if if its flapping if !event.isSuppressed { - // detect if its flapping - event.isFlapping = eventFlap(e, event.Event) + // detect if its flapping by comparing current and previous event + event.isFlapping = isFlapping(e, event.Event) } case <-r.exit: - break process + break processLoop } } @@ -438,8 +430,7 @@ func (r *router) Advertise() (<-chan *Advert, error) { } } - // NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped - // TODO: these channels most likely won't have to be the struct fields + // NOTE: we only need to recreate these if the router errored or was stopped if r.status.Code == Error || r.status.Code == Stopped { r.exit = make(chan struct{}) r.eventChan = make(chan *table.Event) @@ -490,6 +481,9 @@ func (r *router) Advertise() (<-chan *Advert, error) { r.wg.Add(1) go r.watchErrors(errChan) + // TODO: send router announcement update comes here + // the announcement update contains routes from routing table + // mark router as running and set its Error to nil status := Status{ Code: Running, @@ -520,7 +514,6 @@ func (r *router) Update(a *Advert) error { Router: event.Route.Router, Network: event.Route.Network, Metric: event.Route.Metric, - Policy: table.Insert, } if err := r.opts.Table.Update(route); err != nil { return fmt.Errorf("failed updating routing table: %v", err) diff --git a/network/router/router.go b/network/router/router.go index e0f7d676..4fd53c07 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -20,12 +20,12 @@ type Router interface { Options() Options // ID returns the ID of the router ID() string - // Table returns the routing table - Table() table.Table // Address returns the router adddress Address() string // Network returns the network address of the router Network() string + // Table returns the routing table + Table() table.Table // Advertise advertises routes to the network Advertise() (<-chan *Advert, error) // Update updates the routing table @@ -69,6 +69,9 @@ type Advert struct { ID string // Timestamp marks the time when the update is sent Timestamp time.Time + // TTL is Advert TTL + // TODO: not used + TTL time.Time // Events is a list of routing table events to advertise Events []*table.Event } diff --git a/network/router/table/default.go b/network/router/table/default.go index 3480183e..7a95716c 100644 --- a/network/router/table/default.go +++ b/network/router/table/default.go @@ -67,27 +67,14 @@ func (t *table) Add(r Route) error { if _, ok := t.m[destAddr]; !ok { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Create, Route: r}) + go t.sendEvent(&Event{Type: Insert, Route: r}) return nil } // add new route to the table for the route destination if _, ok := t.m[destAddr][sum]; !ok { t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Create, Route: r}) - return nil - } - - // only add the route if the route override is explicitly requested - if _, ok := t.m[destAddr][sum]; ok && r.Policy == Override { - t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Update, Route: r}) - return nil - } - - // if we reached this point the route must already exist - // we return nil only if explicitly requested by the client - if r.Policy == Skip { + go t.sendEvent(&Event{Type: Insert, Route: r}) return nil } @@ -122,23 +109,9 @@ func (t *table) Update(r Route) error { // check if the route destination has any routes in the table if _, ok := t.m[destAddr]; !ok { - if r.Policy == Insert { - t.m[destAddr] = make(map[uint64]Route) - t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Create, Route: r}) - return nil - } return ErrRouteNotFound } - // check if the route for the route destination already exists - // NOTE: We only insert the route if explicitly requested by the client - if _, ok := t.m[destAddr][sum]; !ok && r.Policy == Insert { - t.m[destAddr][sum] = r - go t.sendEvent(&Event{Type: Create, Route: r}) - return nil - } - // if the route has been found update it if _, ok := t.m[destAddr][sum]; ok { t.m[destAddr][sum] = r diff --git a/network/router/table/default_test.go b/network/router/table/default_test.go index a0c364b7..6447cf03 100644 --- a/network/router/table/default_test.go +++ b/network/router/table/default_test.go @@ -33,36 +33,13 @@ func TestAdd(t *testing.T) { } testTableSize += 1 - // overrides an existing route - route.Metric = 100 - route.Policy = Override - - if err := table.Add(route); err != nil { - t.Errorf("error adding route: %s", err) - } - - // the size of the table should not change when Override policy is used if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) - } - - // dont add new route if it already exists - route.Policy = Skip - - if err := table.Add(route); err != nil { - t.Errorf("error adding route: %s", err) - } - - // the size of the table should not change if Skip policy is used - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) } // adding the same route under Insert policy must error - route.Policy = Insert - if err := table.Add(route); err != ErrDuplicateRoute { - t.Errorf("error adding route. Expected error: %s, Given: %s", ErrDuplicateRoute, err) + t.Errorf("error adding route. Expected error: %s, found: %s", ErrDuplicateRoute, err) } } @@ -80,7 +57,7 @@ func TestDelete(t *testing.T) { route.Destination = "randDest" if err := table.Delete(route); err != ErrRouteNotFound { - t.Errorf("error deleting route. Expected error: %s, given: %s", ErrRouteNotFound, err) + t.Errorf("error deleting route. Expected error: %s, found: %s", ErrRouteNotFound, err) } // we should be able to delete the existing route @@ -92,7 +69,7 @@ func TestDelete(t *testing.T) { testTableSize -= 1 if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) } } @@ -114,44 +91,18 @@ func TestUpdate(t *testing.T) { // the size of the table should not change as we're only updating the metric of an existing route if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) } - // this should add a new route - route.Destination = "new.dest" - - if err := table.Update(route); err != nil { - t.Errorf("error updating route: %s", err) - } - testTableSize += 1 - - // Default policy is Insert so the new route will be added here since the route does not exist - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) - } - - // this should add a new route - route.Gateway = "new.gw" - - if err := table.Update(route); err != nil { - t.Errorf("error updating route: %s", err) - } - testTableSize += 1 - - if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) - } - - // this should NOT add a new route as we are setting the policy to Skip + // this should error as the destination does not exist route.Destination = "rand.dest" - route.Policy = Skip if err := table.Update(route); err != ErrRouteNotFound { - t.Errorf("error updating route. Expected error: %s, given: %s", ErrRouteNotFound, err) + t.Errorf("error updating route. Expected error: %s, found: %s", ErrRouteNotFound, err) } - if table.Size() != 3 { - t.Errorf("invalid number of routes. expected: %d, given: %d", testTableSize, table.Size()) + if table.Size() != testTableSize { + t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) } } @@ -173,10 +124,104 @@ func TestList(t *testing.T) { } if len(routes) != len(dest) { - t.Errorf("incorrect number of routes listed. Expected: %d, Given: %d", len(dest), len(routes)) + t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(dest), len(routes)) } if len(routes) != table.Size() { t.Errorf("mismatch number of routes and table size. Routes: %d, Size: %d", len(routes), table.Size()) } } + +func TestLookup(t *testing.T) { + table, route := testSetup() + + dest := []string{"svc1", "svc2", "svc3"} + net := []string{"net1", "net2", "net1"} + rtr := []string{"router1", "router2", "router3"} + + for i := 0; i < len(dest); i++ { + route.Destination = dest[i] + route.Network = net[i] + route.Router = rtr[i] + if err := table.Add(route); err != nil { + t.Errorf("error adding route: %s", err) + } + } + + // return all routes + query := NewQuery() + + routes, err := table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != table.Size() { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", table.Size(), len(routes)) + } + + // query particular net + query = NewQuery(QueryNetwork("net1")) + + routes, err = table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != 2 { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 2, len(routes)) + } + + // query particular router + router := "router1" + query = NewQuery(QueryRouter(router)) + + routes, err = table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != 1 { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 1, len(routes)) + } + + if routes[0].Router != router { + t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router) + } + + // query particular route + network := "net1" + query = NewQuery( + QueryRouter(router), + QueryNetwork(network), + ) + + routes, err = table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != 1 { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 1, len(routes)) + } + + if routes[0].Router != router { + t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router) + } + + if routes[0].Network != network { + t.Errorf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network) + } + + // bullshit route query + query = NewQuery(QueryDestination("foobar")) + + routes, err = table.Lookup(query) + if err != nil { + t.Errorf("error looking up routes: %s", err) + } + + if len(routes) != 0 { + t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 0, len(routes)) + } +} diff --git a/network/router/table/query.go b/network/router/table/query.go index ccb7d396..42b44db6 100644 --- a/network/router/table/query.go +++ b/network/router/table/query.go @@ -90,6 +90,7 @@ func NewQuery(opts ...QueryOption) Query { // NOTE: by default we use DefaultNetworkMetric qopts := QueryOptions{ Destination: "*", + Router: "*", Network: "*", Policy: DiscardIfNone, } diff --git a/network/router/table/route.go b/network/router/table/route.go index 3d9c3bcb..cbb32bf1 100644 --- a/network/router/table/route.go +++ b/network/router/table/route.go @@ -15,46 +15,18 @@ var ( DefaultNetworkMetric = 10 ) -// RoutePolicy defines routing table policy -type RoutePolicy int - -const ( - // Insert inserts a new route if it does not already exist - Insert RoutePolicy = iota - // Override overrides the route if it already exists - Override - // Skip skips modifying the route if it already exists - Skip -) - -// String returns human reprensentation of policy -func (p RoutePolicy) String() string { - switch p { - case Insert: - return "INSERT" - case Override: - return "OVERRIDE" - case Skip: - return "SKIP" - default: - return "UNKNOWN" - } -} - // Route is network route type Route struct { // Destination is destination address Destination string // Gateway is route gateway Gateway string - // Router is the router address - Router string // Network is network address Network string + // Router is the router address + Router string // Metric is the route cost metric Metric int - // Policy defines route policy - Policy RoutePolicy } // Hash returns route hash sum. diff --git a/network/router/table/watcher.go b/network/router/table/watcher.go index 850a9089..90e46360 100644 --- a/network/router/table/watcher.go +++ b/network/router/table/watcher.go @@ -19,8 +19,8 @@ var ( type EventType int const ( - // Create is emitted when a new route has been created - Create EventType = iota + // Insert is emitted when a new route has been inserted + Insert EventType = iota // Delete is emitted when an existing route has been deleted Delete // Update is emitted when an existing route has been updated @@ -30,8 +30,8 @@ const ( // String returns string representation of the event func (et EventType) String() string { switch et { - case Create: - return "CREATE" + case Insert: + return "INSERT" case Delete: return "DELETE" case Update: @@ -126,7 +126,7 @@ func (w *tableWatcher) Stop() { } // String prints debug information -func (w *tableWatcher) String() string { +func (w tableWatcher) String() string { sb := &strings.Builder{} table := tablewriter.NewWriter(sb) From 265271008e75677fb522efd44d1fb35fd918778e Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 9 Jul 2019 12:46:15 +0100 Subject: [PATCH 09/14] Simplified processEvents loop; Added router Announcement. --- network/router/default.go | 110 +++++++++++++++++++++++--------------- network/router/router.go | 12 +++-- 2 files changed, 73 insertions(+), 49 deletions(-) diff --git a/network/router/default.go b/network/router/default.go index b77b40fe..34462e6d 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -38,6 +38,7 @@ type router struct { exit chan struct{} eventChan chan *table.Event advertChan chan *Advert + advertWg *sync.WaitGroup wg *sync.WaitGroup sync.RWMutex } @@ -58,6 +59,7 @@ func newRouter(opts ...Option) Router { exit: make(chan struct{}), eventChan: make(chan *table.Event), advertChan: make(chan *Advert), + advertWg: &sync.WaitGroup{}, wg: &sync.WaitGroup{}, } } @@ -97,7 +99,7 @@ func (r *router) Network() string { // manageServiceRoutes manages routes for a given service. // It returns error of the routing table action fails. -func (r *router) manageServiceRoutes(service *registry.Service, action string, metric int) error { +func (r *router) manageServiceRoutes(service *registry.Service, action string) error { // action is the routing table action action = strings.ToLower(action) // take route action on each service node @@ -107,7 +109,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m Gateway: node.Address, Router: r.opts.Address, Network: r.opts.Network, - Metric: metric, + Metric: table.DefaultLocalMetric, } switch action { case "insert", "create": @@ -127,7 +129,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string, m // manageRegistryRoutes manages routes for each service found in the registry. // It returns error if either the services failed to be listed or the routing table action fails. -func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metric int) error { +func (r *router) manageRegistryRoutes(reg registry.Registry, action string) error { services, err := reg.ListServices() if err != nil { return fmt.Errorf("failed listing services: %v", err) @@ -143,7 +145,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string, metr } // manage the routes for all returned services for _, s := range srvs { - if err := r.manageServiceRoutes(s, action, metric); err != nil { + if err := r.manageServiceRoutes(s, action); err != nil { return err } } @@ -177,7 +179,7 @@ func (r *router) watchServices(w registry.Watcher) error { log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service) - if err := r.manageServiceRoutes(res.Service, res.Action, table.DefaultLocalMetric); err != nil { + if err := r.manageServiceRoutes(res.Service, res.Action); err != nil { return err } } @@ -224,6 +226,29 @@ func (r *router) watchTable(w table.Watcher) error { return watchErr } +func (r *router) advertEvents(advType AdvertType, events []*table.Event) { + defer r.advertWg.Done() + + log.Logf("r.advertEvents(): start event: %s", advType) + + a := &Advert{ + ID: r.ID(), + Type: advType, + Timestamp: time.Now(), + Events: events, + } + + select { + case r.advertChan <- a: + log.Logf("r.advertEvents(): advertised event: %s", advType) + case <-r.exit: + log.Logf("r.advertEvents(): DONE exit") + return + } + + log.Logf("r.advertEvents(): REGULAR exit") +} + // isFlapping detects if the event is flapping based on the current and previous event status. func isFlapping(curr, prev *table.Event) bool { if curr.Type == table.Update && prev.Type == table.Update { @@ -259,18 +284,12 @@ func (r *router) processEvents() error { ticker := time.NewTicker(AdvertiseTick) // eventMap is a map of advert events eventMap := make(map[uint64]*updateEvent) - // lock to protect access to eventMap - mu := &sync.RWMutex{} - // waitgroup to manage advertisement goroutines - var wg sync.WaitGroup -processLoop: for { select { case <-ticker.C: var events []*table.Event // collect all events which are not flapping - mu.Lock() for key, event := range eventMap { if !event.isFlapping && !event.isSuppressed { e := new(table.Event) @@ -280,29 +299,10 @@ processLoop: delete(eventMap, key) } } - mu.Unlock() if len(events) > 0 { - wg.Add(1) - go func(events []*table.Event) { - defer wg.Done() - - log.Logf("go advertise(): start") - - a := &Advert{ - ID: r.ID(), - Timestamp: time.Now(), - Events: events, - } - - select { - case r.advertChan <- a: - case <-r.exit: - log.Logf("go advertise(): exit") - return - } - log.Logf("go advertise(): exit") - }(events) + r.advertWg.Add(1) + go r.advertEvents(Update, events) } case e := <-r.eventChan: // event timestamp @@ -348,15 +348,16 @@ processLoop: event.isFlapping = isFlapping(e, event.Event) } case <-r.exit: - break processLoop + // first wait for the advertiser to finish + r.advertWg.Wait() + // close the advert channel + close(r.advertChan) + log.Logf("r.processEvents(): event processor stopped") + return nil } } - // first wait for the advertiser to finish - wg.Wait() - // close the advert channel - close(r.advertChan) - + // we probably never reach this place log.Logf("r.processEvents(): event processor stopped") return nil @@ -395,9 +396,11 @@ func (r *router) watchErrors(errChan <-chan error) { // drain the advertise channel for range r.advertChan { } + log.Logf("r.watchErrors(): advert channel drained") // drain the event channel for range r.eventChan { } + log.Logf("r.watchErrors(): event channel drained") } log.Logf("r.watchErrors(): watchErrors exit") @@ -411,10 +414,15 @@ func (r *router) Advertise() (<-chan *Advert, error) { if r.status.Code != Running { // add all local service routes into the routing table - if err := r.manageRegistryRoutes(r.opts.Registry, "insert", table.DefaultLocalMetric); err != nil { - return nil, fmt.Errorf("failed adding routes: %v", err) + if err := r.manageRegistryRoutes(r.opts.Registry, "insert"); err != nil { + return nil, fmt.Errorf("failed adding routes: %s", err) } log.Logf("Routing table:\n%s", r.opts.Table) + // list routing table routes to announce + routes, err := r.opts.Table.List() + if err != nil { + return nil, fmt.Errorf("failed listing routes: %s", err) + } // add default gateway into routing table if r.opts.Gateway != "" { // note, the only non-default value is the gateway @@ -431,14 +439,15 @@ func (r *router) Advertise() (<-chan *Advert, error) { } // NOTE: we only need to recreate these if the router errored or was stopped + // TODO: These probably dont need to be struct members if r.status.Code == Error || r.status.Code == Stopped { r.exit = make(chan struct{}) r.eventChan = make(chan *table.Event) r.advertChan = make(chan *Advert) } - // routing table watcher which watches all routes i.e. to every destination - tableWatcher, err := r.opts.Table.Watch(table.WatchDestination("*")) + // routing table watcher + tableWatcher, err := r.opts.Table.Watch() if err != nil { return nil, fmt.Errorf("failed creating routing table watcher: %v", err) } @@ -478,11 +487,24 @@ func (r *router) Advertise() (<-chan *Advert, error) { log.Logf("r.Advertise(): r.processEvents() exit") }() + // watch for errors and cleanup r.wg.Add(1) go r.watchErrors(errChan) - // TODO: send router announcement update comes here - // the announcement update contains routes from routing table + // announce yourself with all the existing routes + events := make([]*table.Event, len(routes)) + for i, route := range routes { + event := &table.Event{ + Type: table.Insert, + Timestamp: time.Now(), + Route: route, + } + events[i] = event + } + + // advertise your presence + r.advertWg.Add(1) + go r.advertEvents(Announce, events) // mark router as running and set its Error to nil status := Status{ diff --git a/network/router/router.go b/network/router/router.go index 4fd53c07..715bcc56 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -41,19 +41,19 @@ type Router interface { // Option used by the router type Option func(*Options) -// UpdateType is route advertisement update type -type UpdateType int +// AdvertType is route advertisement type +type AdvertType int const ( // Announce is advertised when the router announces itself - Announce UpdateType = iota + Announce AdvertType = iota // Update advertises route updates Update ) // String returns string representation of update event -func (ut UpdateType) String() string { - switch ut { +func (at AdvertType) String() string { + switch at { case Announce: return "ANNOUNCE" case Update: @@ -67,6 +67,8 @@ func (ut UpdateType) String() string { type Advert struct { // ID is the router ID ID string + // Type is type of advert + Type AdvertType // Timestamp marks the time when the update is sent Timestamp time.Time // TTL is Advert TTL From 449aa0a3395b27f7d2de3b1a834e10e097ef0fb9 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 9 Jul 2019 15:01:52 +0100 Subject: [PATCH 10/14] Collect ANNOUNCE mesage events before adding default gateway. --- network/router/default.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/network/router/default.go b/network/router/default.go index 34462e6d..bcdca8e3 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -418,11 +418,23 @@ func (r *router) Advertise() (<-chan *Advert, error) { return nil, fmt.Errorf("failed adding routes: %s", err) } log.Logf("Routing table:\n%s", r.opts.Table) + // list routing table routes to announce routes, err := r.opts.Table.List() if err != nil { return nil, fmt.Errorf("failed listing routes: %s", err) } + // collect all the added routes before we attempt to add default gateway + events := make([]*table.Event, len(routes)) + for i, route := range routes { + event := &table.Event{ + Type: table.Insert, + Timestamp: time.Now(), + Route: route, + } + events[i] = event + } + // add default gateway into routing table if r.opts.Gateway != "" { // note, the only non-default value is the gateway @@ -491,17 +503,6 @@ func (r *router) Advertise() (<-chan *Advert, error) { r.wg.Add(1) go r.watchErrors(errChan) - // announce yourself with all the existing routes - events := make([]*table.Event, len(routes)) - for i, route := range routes { - event := &table.Event{ - Type: table.Insert, - Timestamp: time.Now(), - Route: route, - } - events[i] = event - } - // advertise your presence r.advertWg.Add(1) go r.advertEvents(Announce, events) From 70665e5a7de1b7c8b9a1aac81466928239843180 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 9 Jul 2019 15:45:42 +0100 Subject: [PATCH 11/14] Route has changed to accomodate Link, Service and Address --- network/router/default.go | 31 +++++------ network/router/options.go | 20 +++---- network/router/table/default.go | 45 ++++++++-------- network/router/table/default_test.go | 78 ++++++++++++++-------------- network/router/table/query.go | 46 ++++++++-------- network/router/table/route.go | 21 +++++--- network/router/table/watcher.go | 22 ++++---- 7 files changed, 132 insertions(+), 131 deletions(-) diff --git a/network/router/default.go b/network/router/default.go index bcdca8e3..82e24aba 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -105,11 +105,12 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e // take route action on each service node for _, node := range service.Nodes { route := table.Route{ - Destination: service.Name, - Gateway: node.Address, - Router: r.opts.Address, - Network: r.opts.Network, - Metric: table.DefaultLocalMetric, + Service: service.Name, + Address: node.Address, + Gateway: "", + Network: r.opts.Network, + Link: table.DefaultLink, + Metric: table.DefaultLocalMetric, } switch action { case "insert", "create": @@ -439,11 +440,11 @@ func (r *router) Advertise() (<-chan *Advert, error) { if r.opts.Gateway != "" { // note, the only non-default value is the gateway route := table.Route{ - Destination: "*", - Gateway: r.opts.Gateway, - Router: "*", - Network: "*", - Metric: table.DefaultLocalMetric, + Service: "*", + Address: "*", + Gateway: r.opts.Gateway, + Network: "*", + Metric: table.DefaultLocalMetric, } if err := r.opts.Table.Add(route); err != nil { return nil, fmt.Errorf("failed adding default gateway route: %s", err) @@ -530,14 +531,8 @@ func (r *router) Update(a *Advert) error { }) for _, event := range events { - // we extract the route from advertisement and update the routing table - route := table.Route{ - Destination: event.Route.Destination, - Gateway: event.Route.Gateway, - Router: event.Route.Router, - Network: event.Route.Network, - Metric: event.Route.Metric, - } + // create a copy of the route + route := event.Route if err := r.opts.Table.Update(route); err != nil { return fmt.Errorf("failed updating routing table: %v", err) } diff --git a/network/router/options.go b/network/router/options.go index eb287075..96d956f6 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -10,7 +10,7 @@ var ( // DefaultAddress is default router address DefaultAddress = ":9093" // DefaultNetwork is default micro network - DefaultNetwork = "micro.mu" + DefaultNetwork = "go.micro" ) // Options are router options @@ -19,10 +19,10 @@ type Options struct { ID string // Address is router address Address string - // Network is micro network - Network string // Gateway is micro network gateway Gateway string + // Network is micro network + Network string // Registry is the local registry Registry registry.Registry // Table is routing table @@ -43,13 +43,6 @@ func Address(a string) Option { } } -// Network sets router network -func Network(n string) Option { - return func(o *Options) { - o.Network = n - } -} - // Gateway sets network gateway func Gateway(g string) Option { return func(o *Options) { @@ -57,6 +50,13 @@ func Gateway(g string) Option { } } +// Network sets router network +func Network(n string) Option { + return func(o *Options) { + o.Network = n + } +} + // RoutingTable sets the routing table func RoutingTable(t table.Table) Option { return func(o *Options) { diff --git a/network/router/table/default.go b/network/router/table/default.go index 7a95716c..54228404 100644 --- a/network/router/table/default.go +++ b/network/router/table/default.go @@ -57,23 +57,23 @@ func (t *table) Options() TableOptions { // Add adds a route to the routing table func (t *table) Add(r Route) error { - destAddr := r.Destination + service := r.Service sum := r.Hash() t.Lock() defer t.Unlock() // check if there are any routes in the table for the route destination - if _, ok := t.m[destAddr]; !ok { - t.m[destAddr] = make(map[uint64]Route) - t.m[destAddr][sum] = r + if _, ok := t.m[service]; !ok { + t.m[service] = make(map[uint64]Route) + t.m[service][sum] = r go t.sendEvent(&Event{Type: Insert, Route: r}) return nil } // add new route to the table for the route destination - if _, ok := t.m[destAddr][sum]; !ok { - t.m[destAddr][sum] = r + if _, ok := t.m[service][sum]; !ok { + t.m[service][sum] = r go t.sendEvent(&Event{Type: Insert, Route: r}) return nil } @@ -83,17 +83,17 @@ func (t *table) Add(r Route) error { // Delete deletes the route from the routing table func (t *table) Delete(r Route) error { - destAddr := r.Destination + service := r.Service sum := r.Hash() t.Lock() defer t.Unlock() - if _, ok := t.m[destAddr]; !ok { + if _, ok := t.m[service]; !ok { return ErrRouteNotFound } - delete(t.m[destAddr], sum) + delete(t.m[service], sum) go t.sendEvent(&Event{Type: Delete, Route: r}) return nil @@ -101,20 +101,20 @@ func (t *table) Delete(r Route) error { // Update updates routing table with the new route func (t *table) Update(r Route) error { - destAddr := r.Destination + service := r.Service sum := r.Hash() t.Lock() defer t.Unlock() // check if the route destination has any routes in the table - if _, ok := t.m[destAddr]; !ok { + if _, ok := t.m[service]; !ok { return ErrRouteNotFound } // if the route has been found update it - if _, ok := t.m[destAddr][sum]; ok { - t.m[destAddr][sum] = r + if _, ok := t.m[service][sum]; ok { + t.m[service][sum] = r go t.sendEvent(&Event{Type: Update, Route: r}) return nil } @@ -140,7 +140,7 @@ func (t *table) List() ([]Route, error) { // isMatch checks if the route matches given network and router func isMatch(route Route, network, router string) bool { if network == "*" || network == route.Network { - if router == "*" || router == route.Router { + if router == "*" || router == route.Gateway { return true } } @@ -163,18 +163,18 @@ func (t *table) Lookup(q Query) ([]Route, error) { t.RLock() defer t.RUnlock() - if q.Options().Destination != "*" { + if q.Options().Service != "*" { // no routes found for the destination and query policy is not a DiscardIfNone - if _, ok := t.m[q.Options().Destination]; !ok && q.Options().Policy != DiscardIfNone { + if _, ok := t.m[q.Options().Service]; !ok && q.Options().Policy != DiscardIfNone { return nil, ErrRouteNotFound } - return findRoutes(t.m[q.Options().Destination], q.Options().Network, q.Options().Router), nil + return findRoutes(t.m[q.Options().Service], q.Options().Network, q.Options().Gateway), nil } var results []Route // search through all destinations for _, routes := range t.m { - results = append(results, findRoutes(routes, q.Options().Network, q.Options().Router)...) + results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...) } return results, nil @@ -184,7 +184,7 @@ func (t *table) Lookup(q Query) ([]Route, error) { func (t *table) Watch(opts ...WatchOption) (Watcher, error) { // by default watch everything wopts := WatchOptions{ - Destination: "*", + Service: "*", } for _, o := range opts { @@ -244,15 +244,16 @@ func (t *table) String() string { // create nice table printing structure table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Destination", "Gateway", "Router", "Network", "Metric"}) + table.SetHeader([]string{"Service", "Address", "Gateway", "Network", "Link", "Metric"}) for _, destRoute := range t.m { for _, route := range destRoute { strRoute := []string{ - route.Destination, + route.Service, + route.Address, route.Gateway, - route.Router, route.Network, + route.Link, fmt.Sprintf("%d", route.Metric), } table.Append(strRoute) diff --git a/network/router/table/default_test.go b/network/router/table/default_test.go index 6447cf03..3f7acdb6 100644 --- a/network/router/table/default_test.go +++ b/network/router/table/default_test.go @@ -6,11 +6,11 @@ func testSetup() (Table, Route) { table := NewTable() route := Route{ - Destination: "dest.svc", - Gateway: "dest.gw", - Router: "dest.router", - Network: "dest.network", - Metric: 10, + Service: "dest.svc", + Gateway: "dest.gw", + Network: "dest.network", + Link: "det.link", + Metric: 10, } return table, route @@ -34,7 +34,7 @@ func TestAdd(t *testing.T) { testTableSize += 1 if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) } // adding the same route under Insert policy must error @@ -53,15 +53,15 @@ func TestDelete(t *testing.T) { testTableSize += 1 // should fail to delete non-existant route - prevDest := route.Destination - route.Destination = "randDest" + prevSvc := route.Service + route.Service = "randDest" if err := table.Delete(route); err != ErrRouteNotFound { - t.Errorf("error deleting route. Expected error: %s, found: %s", ErrRouteNotFound, err) + t.Errorf("error deleting route. Expected: %s, found: %s", ErrRouteNotFound, err) } // we should be able to delete the existing route - route.Destination = prevDest + route.Service = prevSvc if err := table.Delete(route); err != nil { t.Errorf("error deleting route: %s", err) @@ -69,7 +69,7 @@ func TestDelete(t *testing.T) { testTableSize -= 1 if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) } } @@ -91,28 +91,28 @@ func TestUpdate(t *testing.T) { // the size of the table should not change as we're only updating the metric of an existing route if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) } // this should error as the destination does not exist - route.Destination = "rand.dest" + route.Service = "rand.dest" if err := table.Update(route); err != ErrRouteNotFound { t.Errorf("error updating route. Expected error: %s, found: %s", ErrRouteNotFound, err) } if table.Size() != testTableSize { - t.Errorf("invalid number of routes. expected: %d, found: %d", testTableSize, table.Size()) + t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size()) } } func TestList(t *testing.T) { table, route := testSetup() - dest := []string{"one.svc", "two.svc", "three.svc"} + svc := []string{"one.svc", "two.svc", "three.svc"} - for i := 0; i < len(dest); i++ { - route.Destination = dest[i] + for i := 0; i < len(svc); i++ { + route.Service = svc[i] if err := table.Add(route); err != nil { t.Errorf("error adding route: %s", err) } @@ -123,26 +123,26 @@ func TestList(t *testing.T) { t.Errorf("error listing routes: %s", err) } - if len(routes) != len(dest) { - t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(dest), len(routes)) + if len(routes) != len(svc) { + t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(svc), len(routes)) } if len(routes) != table.Size() { - t.Errorf("mismatch number of routes and table size. Routes: %d, Size: %d", len(routes), table.Size()) + t.Errorf("mismatch number of routes and table size. Expected: %d, found: %d", len(routes), table.Size()) } } func TestLookup(t *testing.T) { table, route := testSetup() - dest := []string{"svc1", "svc2", "svc3"} + svc := []string{"svc1", "svc2", "svc3"} net := []string{"net1", "net2", "net1"} - rtr := []string{"router1", "router2", "router3"} + gw := []string{"gw1", "gw2", "gw3"} - for i := 0; i < len(dest); i++ { - route.Destination = dest[i] + for i := 0; i < len(svc); i++ { + route.Service = svc[i] route.Network = net[i] - route.Router = rtr[i] + route.Gateway = gw[i] if err := table.Add(route); err != nil { t.Errorf("error adding route: %s", err) } @@ -157,7 +157,7 @@ func TestLookup(t *testing.T) { } if len(routes) != table.Size() { - t.Errorf("incorrect number of routes returned. expected: %d, found: %d", table.Size(), len(routes)) + t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", table.Size(), len(routes)) } // query particular net @@ -169,12 +169,12 @@ func TestLookup(t *testing.T) { } if len(routes) != 2 { - t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 2, len(routes)) + t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes)) } - // query particular router - router := "router1" - query = NewQuery(QueryRouter(router)) + // query particular gateway + gateway := "gw1" + query = NewQuery(QueryGateway(gateway)) routes, err = table.Lookup(query) if err != nil { @@ -182,17 +182,17 @@ func TestLookup(t *testing.T) { } if len(routes) != 1 { - t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 1, len(routes)) + t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) } - if routes[0].Router != router { - t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router) + if routes[0].Gateway != gateway { + t.Errorf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway) } // query particular route network := "net1" query = NewQuery( - QueryRouter(router), + QueryGateway(gateway), QueryNetwork(network), ) @@ -202,11 +202,11 @@ func TestLookup(t *testing.T) { } if len(routes) != 1 { - t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 1, len(routes)) + t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes)) } - if routes[0].Router != router { - t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router) + if routes[0].Gateway != gateway { + t.Errorf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway) } if routes[0].Network != network { @@ -214,7 +214,7 @@ func TestLookup(t *testing.T) { } // bullshit route query - query = NewQuery(QueryDestination("foobar")) + query = NewQuery(QueryService("foobar")) routes, err = table.Lookup(query) if err != nil { @@ -222,6 +222,6 @@ func TestLookup(t *testing.T) { } if len(routes) != 0 { - t.Errorf("incorrect number of routes returned. expected: %d, found: %d", 0, len(routes)) + t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 0, len(routes)) } } diff --git a/network/router/table/query.go b/network/router/table/query.go index 42b44db6..82afd5ff 100644 --- a/network/router/table/query.go +++ b/network/router/table/query.go @@ -34,34 +34,34 @@ type QueryOption func(*QueryOptions) // QueryOptions are routing table query options type QueryOptions struct { - // Destination is destination address - Destination string + // Service is destination service name + Service string + // Gateway is route gateway + Gateway string // Network is network address Network string - // Router is router address - Router string // Policy is query lookup policy Policy LookupPolicy } -// QueryDestination sets destination address -func QueryDestination(d string) QueryOption { +// QueryService sets destination address +func QueryService(s string) QueryOption { return func(o *QueryOptions) { - o.Destination = d + o.Service = s + } +} + +// QueryGateway sets route gateway +func QueryGateway(g string) QueryOption { + return func(o *QueryOptions) { + o.Gateway = g } } // QueryNetwork sets route network address -func QueryNetwork(a string) QueryOption { +func QueryNetwork(n string) QueryOption { return func(o *QueryOptions) { - o.Network = a - } -} - -// QueryRouter sets route router address -func QueryRouter(r string) QueryOption { - return func(o *QueryOptions) { - o.Router = r + o.Network = n } } @@ -89,10 +89,10 @@ func NewQuery(opts ...QueryOption) Query { // default options // NOTE: by default we use DefaultNetworkMetric qopts := QueryOptions{ - Destination: "*", - Router: "*", - Network: "*", - Policy: DiscardIfNone, + Service: "*", + Gateway: "*", + Network: "*", + Policy: DiscardIfNone, } for _, o := range opts { @@ -116,12 +116,12 @@ func (q query) String() string { // create nice table printing structure table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Destination", "Network", "Router", "Policy"}) + table.SetHeader([]string{"Service", "Gateway", "Network", "Policy"}) strQuery := []string{ - q.opts.Destination, + q.opts.Service, + q.opts.Gateway, q.opts.Network, - q.opts.Router, fmt.Sprintf("%s", q.opts.Policy), } table.Append(strQuery) diff --git a/network/router/table/route.go b/network/router/table/route.go index cbb32bf1..0054b3b9 100644 --- a/network/router/table/route.go +++ b/network/router/table/route.go @@ -9,6 +9,8 @@ import ( ) var ( + // DefaultLink is default network link + DefaultLink = "local" // DefaultLocalMetric is default route cost metric for the local network DefaultLocalMetric = 1 // DefaultNetworkMetric is default route cost metric for the micro network @@ -17,14 +19,16 @@ var ( // Route is network route type Route struct { - // Destination is destination address - Destination string + // Service is destination service name + Service string + // Address is service node address + Address string // Gateway is route gateway Gateway string // Network is network address Network string - // Router is the router address - Router string + // Link is network link + Link string // Metric is the route cost metric Metric int } @@ -33,7 +37,7 @@ type Route struct { func (r *Route) Hash() uint64 { h := fnv.New64() h.Reset() - h.Write([]byte(r.Destination + r.Gateway + r.Network)) + h.Write([]byte(r.Service + r.Address + r.Gateway + r.Network + r.Link)) return h.Sum64() } @@ -45,13 +49,14 @@ func (r Route) String() string { // create nice table printing structure table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Destination", "Gateway", "Router", "Network", "Metric"}) + table.SetHeader([]string{"Service", "Address", "Gateway", "Network", "Link", "Metric"}) strRoute := []string{ - r.Destination, + r.Service, + r.Address, r.Gateway, - r.Router, r.Network, + r.Link, fmt.Sprintf("%d", r.Metric), } table.Append(strRoute) diff --git a/network/router/table/watcher.go b/network/router/table/watcher.go index 90e46360..391c0a0c 100644 --- a/network/router/table/watcher.go +++ b/network/router/table/watcher.go @@ -53,7 +53,7 @@ type Event struct { // String prints human readable Event func (e Event) String() string { - return fmt.Sprintf("[EVENT] %s:\nRoute:\n%s", e.Type, e.Route) + return fmt.Sprintf("[EVENT] time: %s type: %s", e.Timestamp, e.Type) } // WatchOption is used to define what routes to watch in the table @@ -72,15 +72,15 @@ type Watcher interface { // WatchOptions are table watcher options type WatchOptions struct { - // Specify destination address to watch - Destination string + // Service allows to watch specific service routes + Service string } -// WatchDestination sets what destination to watch -// Destination is usually microservice name -func WatchDestination(d string) WatchOption { +// WatchService sets what service routes to watch +// Service is the microservice name +func WatchService(s string) WatchOption { return func(o *WatchOptions) { - o.Destination = d + o.Service = s } } @@ -97,8 +97,8 @@ func (w *tableWatcher) Next() (*Event, error) { for { select { case res := <-w.resChan: - switch w.opts.Destination { - case res.Route.Destination, "*": + switch w.opts.Service { + case res.Route.Service, "*": return res, nil default: log.Logf("no table watcher available to receive the event") @@ -130,10 +130,10 @@ func (w tableWatcher) String() string { sb := &strings.Builder{} table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Destination"}) + table.SetHeader([]string{"Service"}) data := []string{ - w.opts.Destination, + w.opts.Service, } table.Append(data) From c5fb409760dc152f31881c5cfbca3223dc4c1f79 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 9 Jul 2019 15:55:39 +0100 Subject: [PATCH 12/14] Removed debug logs --- network/router/default.go | 37 ++------------------------------- network/router/table/default.go | 5 ----- network/router/table/watcher.go | 2 -- 3 files changed, 2 insertions(+), 42 deletions(-) diff --git a/network/router/default.go b/network/router/default.go index 82e24aba..d8ad3a60 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/micro/go-log" "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" "github.com/olekukonko/tablewriter" @@ -141,7 +140,6 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro // get the service to retrieve all its info srvs, err := reg.GetService(service.Name) if err != nil { - log.Logf("r.manageRegistryRoutes() GetService() error: %v", err) continue } // manage the routes for all returned services @@ -178,8 +176,6 @@ func (r *router) watchServices(w registry.Watcher) error { break } - log.Logf("r.watchServices() new service event: Action: %s Service: %v", res.Action, res.Service) - if err := r.manageServiceRoutes(res.Service, res.Action); err != nil { return err } @@ -211,8 +207,6 @@ func (r *router) watchTable(w table.Watcher) error { break } - log.Logf("r.watchTable() new table event: %s", event) - select { case <-r.exit: close(r.eventChan) @@ -230,8 +224,6 @@ func (r *router) watchTable(w table.Watcher) error { func (r *router) advertEvents(advType AdvertType, events []*table.Event) { defer r.advertWg.Done() - log.Logf("r.advertEvents(): start event: %s", advType) - a := &Advert{ ID: r.ID(), Type: advType, @@ -241,24 +233,19 @@ func (r *router) advertEvents(advType AdvertType, events []*table.Event) { select { case r.advertChan <- a: - log.Logf("r.advertEvents(): advertised event: %s", advType) case <-r.exit: - log.Logf("r.advertEvents(): DONE exit") return } - log.Logf("r.advertEvents(): REGULAR exit") } // isFlapping detects if the event is flapping based on the current and previous event status. func isFlapping(curr, prev *table.Event) bool { if curr.Type == table.Update && prev.Type == table.Update { - log.Logf("isFlapping(): Update flap") return true } if curr.Type == table.Insert && prev.Type == table.Delete || curr.Type == table.Delete && prev.Type == table.Insert { - log.Logf("isFlapping(): Create/Delete flap") return true } @@ -312,7 +299,7 @@ func (r *router) processEvents() error { if e == nil { continue } - log.Logf("r.processEvents(): event received:\n%s", e) + // determine the event penalty var penalty float64 switch e.Type { @@ -337,6 +324,7 @@ func (r *router) processEvents() error { delta := time.Since(event.timestamp).Seconds() event.penalty = event.penalty*math.Exp(-delta) + penalty event.timestamp = now + // suppress or recover the event based on its current penalty if !event.isSuppressed && event.penalty > AdvertSuppress { event.isSuppressed = true @@ -353,13 +341,11 @@ func (r *router) processEvents() error { r.advertWg.Wait() // close the advert channel close(r.advertChan) - log.Logf("r.processEvents(): event processor stopped") return nil } } // we probably never reach this place - log.Logf("r.processEvents(): event processor stopped") return nil } @@ -368,8 +354,6 @@ func (r *router) processEvents() error { func (r *router) watchErrors(errChan <-chan error) { defer r.wg.Done() - log.Logf("r.manage(): manage start") - var code StatusCode var err error @@ -380,8 +364,6 @@ func (r *router) watchErrors(errChan <-chan error) { code = Error } - log.Logf("r.watchErrors(): watchErrors exiting") - r.Lock() defer r.Unlock() status := Status{ @@ -397,14 +379,11 @@ func (r *router) watchErrors(errChan <-chan error) { // drain the advertise channel for range r.advertChan { } - log.Logf("r.watchErrors(): advert channel drained") // drain the event channel for range r.eventChan { } - log.Logf("r.watchErrors(): event channel drained") } - log.Logf("r.watchErrors(): watchErrors exit") } // Advertise advertises the routes to the network. @@ -418,7 +397,6 @@ func (r *router) Advertise() (<-chan *Advert, error) { if err := r.manageRegistryRoutes(r.opts.Registry, "insert"); err != nil { return nil, fmt.Errorf("failed adding routes: %s", err) } - log.Logf("Routing table:\n%s", r.opts.Table) // list routing table routes to announce routes, err := r.opts.Table.List() @@ -476,28 +454,22 @@ func (r *router) Advertise() (<-chan *Advert, error) { r.wg.Add(1) go func() { defer r.wg.Done() - log.Logf("r.Advertise(): r.watchServices() start") // watch local registry and register routes in routine table errChan <- r.watchServices(svcWatcher) - log.Logf("r.Advertise(): r.watchServices() exit") }() r.wg.Add(1) go func() { defer r.wg.Done() - log.Logf("r.Advertise(): r.watchTable() start") // watch local registry and register routes in routing table errChan <- r.watchTable(tableWatcher) - log.Logf("r.Advertise(): r.watchTable() exit") }() r.wg.Add(1) go func() { defer r.wg.Done() - log.Logf("r.Advertise(): r.processEvents() start") // listen to routing table events and process them errChan <- r.processEvents() - log.Logf("r.Advertise(): r.processEvents() exit") }() // watch for errors and cleanup @@ -554,28 +526,23 @@ func (r *router) Status() Status { // Stop stops the router func (r *router) Stop() error { - log.Logf("r.Stop(): Stopping router") r.RLock() // only close the channel if the router is running if r.status.Code == Running { // notify all goroutines to finish close(r.exit) - log.Logf("r.Stop(): exit closed") // drain the advertise channel for range r.advertChan { } - log.Logf("r.Stop(): advert channel drained") // drain the event channel for range r.eventChan { } - log.Logf("r.Stop(): event channel drained") } r.RUnlock() // wait for all goroutines to finish r.wg.Wait() - log.Logf("r.Stop(): Router stopped") return nil } diff --git a/network/router/table/default.go b/network/router/table/default.go index 54228404..79416dc1 100644 --- a/network/router/table/default.go +++ b/network/router/table/default.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/google/uuid" - "github.com/micro/go-log" "github.com/olekukonko/tablewriter" ) @@ -209,16 +208,12 @@ func (t *table) sendEvent(r *Event) { t.RLock() defer t.RUnlock() - log.Logf("sending event to %d registered table watchers", len(t.w)) - for _, w := range t.w { select { case w.resChan <- r: case <-w.done: } } - - log.Logf("sending event done") } // Size returns the size of the routing table diff --git a/network/router/table/watcher.go b/network/router/table/watcher.go index 391c0a0c..773593e4 100644 --- a/network/router/table/watcher.go +++ b/network/router/table/watcher.go @@ -6,7 +6,6 @@ import ( "strings" "time" - "github.com/micro/go-log" "github.com/olekukonko/tablewriter" ) @@ -101,7 +100,6 @@ func (w *tableWatcher) Next() (*Event, error) { case res.Route.Service, "*": return res, nil default: - log.Logf("no table watcher available to receive the event") continue } case <-w.done: From 23cb811f60e9a84199e03f65422e14a37e3f96e0 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 9 Jul 2019 16:17:18 +0100 Subject: [PATCH 13/14] Removed fmt.Stringer artistry from all roouter and table structs --- network/router/default.go | 22 ++------------------- network/router/table/default.go | 34 ++------------------------------- network/router/table/query.go | 27 +------------------------- network/router/table/route.go | 26 +------------------------ network/router/table/watcher.go | 18 +---------------- 5 files changed, 7 insertions(+), 120 deletions(-) diff --git a/network/router/default.go b/network/router/default.go index d8ad3a60..c37fd49d 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -10,7 +10,6 @@ import ( "github.com/micro/go-micro/network/router/table" "github.com/micro/go-micro/registry" - "github.com/olekukonko/tablewriter" ) const ( @@ -547,23 +546,6 @@ func (r *router) Stop() error { } // String prints debugging information about router -func (r *router) String() string { - sb := &strings.Builder{} - - table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"ID", "Address", "Network", "Table", "Status"}) - - data := []string{ - r.opts.ID, - r.opts.Address, - r.opts.Network, - fmt.Sprintf("%d", r.opts.Table.Size()), - r.status.Code.String(), - } - table.Append(data) - - // render table into sb - table.Render() - - return sb.String() +func (r router) String() string { + return "router" } diff --git a/network/router/table/default.go b/network/router/table/default.go index 79416dc1..76de3f97 100644 --- a/network/router/table/default.go +++ b/network/router/table/default.go @@ -1,12 +1,9 @@ package table import ( - "fmt" - "strings" "sync" "github.com/google/uuid" - "github.com/olekukonko/tablewriter" ) // TableOptions specify routing table options @@ -230,33 +227,6 @@ func (t *table) Size() int { } // String returns debug information -func (t *table) String() string { - t.RLock() - defer t.RUnlock() - - // this will help us build routing table string - sb := &strings.Builder{} - - // create nice table printing structure - table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Service", "Address", "Gateway", "Network", "Link", "Metric"}) - - for _, destRoute := range t.m { - for _, route := range destRoute { - strRoute := []string{ - route.Service, - route.Address, - route.Gateway, - route.Network, - route.Link, - fmt.Sprintf("%d", route.Metric), - } - table.Append(strRoute) - } - } - - // render table into sb - table.Render() - - return sb.String() +func (t table) String() string { + return "table" } diff --git a/network/router/table/query.go b/network/router/table/query.go index 82afd5ff..7703e3b3 100644 --- a/network/router/table/query.go +++ b/network/router/table/query.go @@ -1,12 +1,5 @@ package table -import ( - "fmt" - "strings" - - "github.com/olekukonko/tablewriter" -) - // LookupPolicy defines query policy type LookupPolicy int @@ -111,23 +104,5 @@ func (q *query) Options() QueryOptions { // String prints routing table query in human readable form func (q query) String() string { - // this will help us build routing table string - sb := &strings.Builder{} - - // create nice table printing structure - table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Service", "Gateway", "Network", "Policy"}) - - strQuery := []string{ - q.opts.Service, - q.opts.Gateway, - q.opts.Network, - fmt.Sprintf("%s", q.opts.Policy), - } - table.Append(strQuery) - - // render table into sb - table.Render() - - return sb.String() + return "query" } diff --git a/network/router/table/route.go b/network/router/table/route.go index 0054b3b9..652abd28 100644 --- a/network/router/table/route.go +++ b/network/router/table/route.go @@ -1,11 +1,7 @@ package table import ( - "fmt" "hash/fnv" - "strings" - - "github.com/olekukonko/tablewriter" ) var ( @@ -44,25 +40,5 @@ func (r *Route) Hash() uint64 { // String returns human readable route func (r Route) String() string { - // this will help us build routing table string - sb := &strings.Builder{} - - // create nice table printing structure - table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Service", "Address", "Gateway", "Network", "Link", "Metric"}) - - strRoute := []string{ - r.Service, - r.Address, - r.Gateway, - r.Network, - r.Link, - fmt.Sprintf("%d", r.Metric), - } - table.Append(strRoute) - - // render table into sb - table.Render() - - return sb.String() + return "route" } diff --git a/network/router/table/watcher.go b/network/router/table/watcher.go index 773593e4..c089ddfc 100644 --- a/network/router/table/watcher.go +++ b/network/router/table/watcher.go @@ -3,10 +3,7 @@ package table import ( "errors" "fmt" - "strings" "time" - - "github.com/olekukonko/tablewriter" ) var ( @@ -125,18 +122,5 @@ func (w *tableWatcher) Stop() { // String prints debug information func (w tableWatcher) String() string { - sb := &strings.Builder{} - - table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Service"}) - - data := []string{ - w.opts.Service, - } - table.Append(data) - - // render table into sb - table.Render() - - return sb.String() + return "watcher" } From 6cf8bde6128ada66986effa8887c245568a8480d Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Tue, 9 Jul 2019 16:45:31 +0100 Subject: [PATCH 14/14] Router selector and proxy modifications due to Route struct changes. --- client/selector/router/router.go | 15 +-- network/proxy/mucp/mucp.go | 15 +-- network/router/proto/router.micro.go | 2 +- network/router/proto/router.pb.go | 164 ++++++++------------------- network/router/proto/router.proto | 16 +-- 5 files changed, 74 insertions(+), 138 deletions(-) diff --git a/client/selector/router/router.go b/client/selector/router/router.go index 65e69702..14146fba 100644 --- a/client/selector/router/router.go +++ b/client/selector/router/router.go @@ -45,7 +45,7 @@ func (r *routerSelector) getRoutes(service string) ([]table.Route, error) { if !r.remote { // lookup router for routes for the service return r.r.Table().Lookup(table.NewQuery( - table.QueryDestination(service), + table.QueryService(service), )) } @@ -83,7 +83,7 @@ func (r *routerSelector) getRoutes(service string) ([]table.Route, error) { // call the router pbRoutes, err = r.rs.Lookup(context.Background(), &pb.LookupRequest{ Query: &pb.Query{ - Destination: service, + Service: service, }, }, client.WithAddress(addr)) if err != nil { @@ -107,11 +107,12 @@ func (r *routerSelector) getRoutes(service string) ([]table.Route, error) { // convert from pb to []*router.Route for _, r := range pbRoutes.Routes { routes = append(routes, table.Route{ - Destination: r.Destination, - Gateway: r.Gateway, - Router: r.Router, - Network: r.Network, - Metric: int(r.Metric), + Service: r.Service, + Address: r.Address, + Gateway: r.Gateway, + Network: r.Network, + Link: r.Link, + Metric: int(r.Metric), }) } diff --git a/network/proxy/mucp/mucp.go b/network/proxy/mucp/mucp.go index d0ecb81f..05880fc9 100644 --- a/network/proxy/mucp/mucp.go +++ b/network/proxy/mucp/mucp.go @@ -107,7 +107,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) { if p.Router != nil { // lookup the router routes, err := p.Router.Table().Lookup( - table.NewQuery(table.QueryDestination(service)), + table.NewQuery(table.QueryService(service)), ) if err != nil { return nil, err @@ -180,7 +180,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) { // call the router proutes, err := p.RouterService.Lookup(context.Background(), &pb.LookupRequest{ Query: &pb.Query{ - Destination: service, + Service: service, }, }, client.WithAddress(addr)) if err != nil { @@ -205,11 +205,12 @@ func (p *Proxy) getRoute(service string) ([]string, error) { // convert from pb to []*router.Route for _, r := range pbRoutes.Routes { routes = append(routes, table.Route{ - Destination: r.Destination, - Gateway: r.Gateway, - Router: r.Router, - Network: r.Network, - Metric: int(r.Metric), + Service: r.Service, + Address: r.Address, + Gateway: r.Gateway, + Network: r.Network, + Link: r.Link, + Metric: int(r.Metric), }) } diff --git a/network/router/proto/router.micro.go b/network/router/proto/router.micro.go index 06736f82..9c4fdb1e 100644 --- a/network/router/proto/router.micro.go +++ b/network/router/proto/router.micro.go @@ -1,5 +1,5 @@ // Code generated by protoc-gen-micro. DO NOT EDIT. -// source: go-micro/network/router/proto/router.proto +// source: router.proto package router diff --git a/network/router/proto/router.pb.go b/network/router/proto/router.pb.go index d0aff457..d7d7a520 100644 --- a/network/router/proto/router.pb.go +++ b/network/router/proto/router.pb.go @@ -1,13 +1,11 @@ // Code generated by protoc-gen-go. DO NOT EDIT. -// source: go-micro/network/router/proto/router.proto +// source: router.proto package router import ( - context "context" fmt "fmt" proto "github.com/golang/protobuf/proto" - grpc "google.golang.org/grpc" math "math" ) @@ -34,7 +32,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} } func (m *LookupRequest) String() string { return proto.CompactTextString(m) } func (*LookupRequest) ProtoMessage() {} func (*LookupRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_fc08514fc6dadd29, []int{0} + return fileDescriptor_367072455c71aedc, []int{0} } func (m *LookupRequest) XXX_Unmarshal(b []byte) error { @@ -74,7 +72,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} } func (m *LookupResponse) String() string { return proto.CompactTextString(m) } func (*LookupResponse) ProtoMessage() {} func (*LookupResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_fc08514fc6dadd29, []int{1} + return fileDescriptor_367072455c71aedc, []int{1} } func (m *LookupResponse) XXX_Unmarshal(b []byte) error { @@ -104,8 +102,8 @@ func (m *LookupResponse) GetRoutes() []*Route { // Query is passed in a LookupRequest type Query struct { - // destination to lookup - Destination string `protobuf:"bytes,1,opt,name=destination,proto3" json:"destination,omitempty"` + // service to lookup + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -115,7 +113,7 @@ func (m *Query) Reset() { *m = Query{} } func (m *Query) String() string { return proto.CompactTextString(m) } func (*Query) ProtoMessage() {} func (*Query) Descriptor() ([]byte, []int) { - return fileDescriptor_fc08514fc6dadd29, []int{2} + return fileDescriptor_367072455c71aedc, []int{2} } func (m *Query) XXX_Unmarshal(b []byte) error { @@ -136,9 +134,9 @@ func (m *Query) XXX_DiscardUnknown() { var xxx_messageInfo_Query proto.InternalMessageInfo -func (m *Query) GetDestination() string { +func (m *Query) GetService() string { if m != nil { - return m.Destination + return m.Service } return "" } @@ -146,15 +144,17 @@ func (m *Query) GetDestination() string { // Route is a service route type Route struct { // service for the route - Destination string `protobuf:"bytes,1,opt,name=destination,proto3" json:"destination,omitempty"` + Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` + // the address that advertise this route + Address string `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` // gateway as the next hop - Gateway string `protobuf:"bytes,2,opt,name=gateway,proto3" json:"gateway,omitempty"` - // the router that advertise this route - Router string `protobuf:"bytes,3,opt,name=router,proto3" json:"router,omitempty"` + Gateway string `protobuf:"bytes,3,opt,name=gateway,proto3" json:"gateway,omitempty"` // the network for this destination Network string `protobuf:"bytes,4,opt,name=network,proto3" json:"network,omitempty"` + // the network link + Link string `protobuf:"bytes,5,opt,name=link,proto3" json:"link,omitempty"` // the metric / score of this route - Metric int64 `protobuf:"varint,5,opt,name=metric,proto3" json:"metric,omitempty"` + Metric int64 `protobuf:"varint,6,opt,name=metric,proto3" json:"metric,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -164,7 +164,7 @@ func (m *Route) Reset() { *m = Route{} } func (m *Route) String() string { return proto.CompactTextString(m) } func (*Route) ProtoMessage() {} func (*Route) Descriptor() ([]byte, []int) { - return fileDescriptor_fc08514fc6dadd29, []int{3} + return fileDescriptor_367072455c71aedc, []int{3} } func (m *Route) XXX_Unmarshal(b []byte) error { @@ -185,9 +185,16 @@ func (m *Route) XXX_DiscardUnknown() { var xxx_messageInfo_Route proto.InternalMessageInfo -func (m *Route) GetDestination() string { +func (m *Route) GetService() string { if m != nil { - return m.Destination + return m.Service + } + return "" +} + +func (m *Route) GetAddress() string { + if m != nil { + return m.Address } return "" } @@ -199,16 +206,16 @@ func (m *Route) GetGateway() string { return "" } -func (m *Route) GetRouter() string { +func (m *Route) GetNetwork() string { if m != nil { - return m.Router + return m.Network } return "" } -func (m *Route) GetNetwork() string { +func (m *Route) GetLink() string { if m != nil { - return m.Network + return m.Link } return "" } @@ -227,98 +234,23 @@ func init() { proto.RegisterType((*Route)(nil), "Route") } -func init() { - proto.RegisterFile("go-micro/network/router/proto/router.proto", fileDescriptor_fc08514fc6dadd29) -} - -var fileDescriptor_fc08514fc6dadd29 = []byte{ - // 242 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x90, 0xc1, 0x4a, 0xc3, 0x40, - 0x10, 0x86, 0x5d, 0x63, 0x56, 0x9c, 0x62, 0x85, 0x3d, 0xc8, 0x22, 0x22, 0x61, 0x4f, 0x55, 0x69, - 0x22, 0x15, 0xdf, 0xc2, 0x8b, 0xfb, 0x06, 0xb1, 0x0e, 0x25, 0x94, 0x66, 0xd2, 0xdd, 0x09, 0xa5, - 0x0f, 0xe1, 0x3b, 0x4b, 0x26, 0x5b, 0x30, 0xa7, 0x1e, 0xbf, 0x99, 0xf9, 0x7e, 0x76, 0x7f, 0x78, - 0xd9, 0xd0, 0x72, 0xd7, 0xac, 0x03, 0x55, 0x2d, 0xf2, 0x81, 0xc2, 0xb6, 0x0a, 0xd4, 0x33, 0x86, - 0xaa, 0x0b, 0xc4, 0x94, 0xa0, 0x14, 0x70, 0x4b, 0xb8, 0xfd, 0x24, 0xda, 0xf6, 0x9d, 0xc7, 0x7d, - 0x8f, 0x91, 0xcd, 0x23, 0xe4, 0xfb, 0x1e, 0xc3, 0xd1, 0xaa, 0x42, 0x2d, 0x66, 0x2b, 0x5d, 0x7e, - 0x0d, 0xe4, 0xc7, 0xa1, 0x7b, 0x83, 0xf9, 0xe9, 0x3c, 0x76, 0xd4, 0x46, 0x34, 0x4f, 0xa0, 0x25, - 0x30, 0x5a, 0x55, 0x64, 0x22, 0xf8, 0x01, 0x7d, 0x9a, 0xba, 0x67, 0xc8, 0x25, 0xc1, 0x14, 0x30, - 0xfb, 0xc1, 0xc8, 0x4d, 0x5b, 0x73, 0x43, 0xad, 0xc4, 0xdf, 0xf8, 0xff, 0x23, 0xf7, 0xab, 0x20, - 0x17, 0xf9, 0xfc, 0xad, 0xb1, 0x70, 0xbd, 0xa9, 0x19, 0x0f, 0xf5, 0xd1, 0x5e, 0xca, 0xf6, 0x84, - 0xe6, 0x3e, 0x3d, 0x28, 0xd8, 0x4c, 0x16, 0x89, 0x06, 0x23, 0xd5, 0x61, 0xaf, 0x46, 0x23, 0xe1, - 0x60, 0xec, 0x90, 0x43, 0xb3, 0xb6, 0x79, 0xa1, 0x16, 0x99, 0x4f, 0xb4, 0xfa, 0x00, 0xed, 0x47, - 0xf7, 0x15, 0xf4, 0xf8, 0x6d, 0x33, 0x2f, 0x27, 0x75, 0x3d, 0xdc, 0x95, 0xd3, 0x3e, 0xdc, 0xc5, - 0xb7, 0x96, 0x66, 0xdf, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x4d, 0x73, 0x18, 0x9e, 0x87, 0x01, - 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// RouterClient is the client API for Router service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type RouterClient interface { - Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error) -} - -type routerClient struct { - cc *grpc.ClientConn -} - -func NewRouterClient(cc *grpc.ClientConn) RouterClient { - return &routerClient{cc} -} - -func (c *routerClient) Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error) { - out := new(LookupResponse) - err := c.cc.Invoke(ctx, "/Router/Lookup", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// RouterServer is the server API for Router service. -type RouterServer interface { - Lookup(context.Context, *LookupRequest) (*LookupResponse, error) -} - -func RegisterRouterServer(s *grpc.Server, srv RouterServer) { - s.RegisterService(&_Router_serviceDesc, srv) -} - -func _Router_Lookup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(LookupRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(RouterServer).Lookup(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/Router/Lookup", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(RouterServer).Lookup(ctx, req.(*LookupRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _Router_serviceDesc = grpc.ServiceDesc{ - ServiceName: "Router", - HandlerType: (*RouterServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Lookup", - Handler: _Router_Lookup_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "go-micro/network/router/proto/router.proto", +func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) } + +var fileDescriptor_367072455c71aedc = []byte{ + // 238 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x90, 0xc1, 0x4a, 0xc4, 0x30, + 0x10, 0x86, 0x8d, 0xdd, 0x46, 0x1c, 0x75, 0x85, 0x1c, 0x24, 0x88, 0x48, 0xcd, 0x69, 0x41, 0x2c, + 0xb2, 0xe2, 0x5b, 0x78, 0x31, 0x6f, 0x50, 0x77, 0x07, 0x29, 0xd5, 0xa6, 0x3b, 0x49, 0x5c, 0xf6, + 0x59, 0x7c, 0x59, 0xc9, 0x24, 0x7b, 0xe8, 0xc1, 0x5b, 0xbf, 0xf9, 0x66, 0x7e, 0x9a, 0x1f, 0x2e, + 0xc9, 0xc5, 0x80, 0xd4, 0x4e, 0xe4, 0x82, 0x33, 0x4f, 0x70, 0xf5, 0xe6, 0xdc, 0x10, 0x27, 0x8b, + 0xbb, 0x88, 0x3e, 0xa8, 0x3b, 0xa8, 0x77, 0x11, 0xe9, 0xa0, 0x45, 0x23, 0x56, 0x17, 0x6b, 0xd9, + 0xbe, 0x27, 0xb2, 0x79, 0x68, 0x9e, 0x61, 0x79, 0x5c, 0xf7, 0x93, 0x1b, 0x3d, 0xaa, 0x7b, 0x90, + 0x1c, 0xe8, 0xb5, 0x68, 0x2a, 0x3e, 0xb0, 0x09, 0x6d, 0x99, 0x9a, 0x07, 0xa8, 0x39, 0x41, 0x69, + 0x38, 0xf3, 0x48, 0x3f, 0xfd, 0x06, 0x39, 0xfa, 0xdc, 0x1e, 0xd1, 0xfc, 0x0a, 0xa8, 0xf9, 0xe8, + 0xff, 0x9d, 0x64, 0xba, 0xed, 0x96, 0xd0, 0x7b, 0x7d, 0x9a, 0x4d, 0xc1, 0x64, 0x3e, 0xbb, 0x80, + 0xfb, 0xee, 0xa0, 0xab, 0x6c, 0x0a, 0x26, 0x33, 0x62, 0xd8, 0x3b, 0x1a, 0xf4, 0x22, 0x9b, 0x82, + 0x4a, 0xc1, 0xe2, 0xab, 0x1f, 0x07, 0x5d, 0xf3, 0x98, 0xbf, 0xd5, 0x0d, 0xc8, 0x6f, 0x0c, 0xd4, + 0x6f, 0xb4, 0x6c, 0xc4, 0xaa, 0xb2, 0x85, 0xd6, 0xaf, 0x20, 0xf9, 0xe7, 0x48, 0x3d, 0x82, 0xcc, + 0x8f, 0x57, 0xcb, 0x76, 0x56, 0xda, 0xed, 0x75, 0x3b, 0x6f, 0xc5, 0x9c, 0x7c, 0x48, 0xee, 0xf7, + 0xe5, 0x2f, 0x00, 0x00, 0xff, 0xff, 0xd3, 0x99, 0xfb, 0x2d, 0x6f, 0x01, 0x00, 0x00, } diff --git a/network/router/proto/router.proto b/network/router/proto/router.proto index 6f9e7323..4d278d0f 100644 --- a/network/router/proto/router.proto +++ b/network/router/proto/router.proto @@ -17,20 +17,22 @@ message LookupResponse { // Query is passed in a LookupRequest message Query { - // destination to lookup - string destination = 1; + // service to lookup + string service = 1; } // Route is a service route message Route { // service for the route - string destination = 1; + string service = 1; + // the address that advertise this route + string address = 2; // gateway as the next hop - string gateway = 2; - // the router that advertise this route - string router = 3; + string gateway = 3; // the network for this destination string network = 4; + // the network link + string link = 5; // the metric / score of this route - int64 metric = 5; + int64 metric = 6; }