diff --git a/network/router/default.go b/network/router/default.go index 0d153920..ca336619 100644 --- a/network/router/default.go +++ b/network/router/default.go @@ -17,6 +17,8 @@ const ( AdvertiseEventsTick = 5 * time.Second // AdvertiseTableTick is time interval in which router advertises all routes found in routing table AdvertiseTableTick = 1 * time.Minute + // AdvertiseFlushTick is time the yet unconsumed advertisements are flush i.e. discarded + AdvertiseFlushTick = 15 * time.Second // AdvertSuppress is advert suppression threshold AdvertSuppress = 2000.0 // AdvertRecover is advert recovery threshold @@ -38,13 +40,14 @@ var ( PenaltyDecay = math.Log(2) / PenaltyHalfLife ) -// router provides default router implementation +// router implements default router type router struct { // embed the table table.Table opts Options status Status exit chan struct{} + errChan chan error eventChan chan *table.Event advertChan chan *Advert advertWg *sync.WaitGroup @@ -52,7 +55,7 @@ type router struct { sync.RWMutex } -// newRouter creates a new router and returns it +// newRouter creates new router and returns it func newRouter(opts ...Option) Router { // get default options options := DefaultOptions() @@ -62,16 +65,17 @@ func newRouter(opts ...Option) Router { o(&options) } - return &router{ - Table: options.Table, - opts: options, - status: Status{Error: nil, Code: Stopped}, - exit: make(chan struct{}), - eventChan: make(chan *table.Event), - advertChan: make(chan *Advert), - advertWg: &sync.WaitGroup{}, - wg: &sync.WaitGroup{}, + r := &router{ + Table: options.Table, + opts: options, + status: Status{Code: Stopped, Error: nil}, + advertWg: &sync.WaitGroup{}, + wg: &sync.WaitGroup{}, } + + go r.run() + + return r } // Init initializes router with given options @@ -87,7 +91,7 @@ func (r *router) Options() Options { return r.opts } -// manageRoute applies route action on the routing table +// manageRoute applies action on a given route func (r *router) manageRoute(route table.Route, action string) error { switch action { case "create": @@ -109,8 +113,8 @@ func (r *router) manageRoute(route table.Route, action string) error { return nil } -// manageServiceRoutes manages routes for a given service. -// It returns error of the routing table action fails. +// manageServiceRoutes applies action on all routes of given service. +// It returns error of the action fails with error. func (r *router) manageServiceRoutes(service *registry.Service, action string) error { // action is the routing table action action = strings.ToLower(action) @@ -134,7 +138,7 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e return nil } -// manageRegistryRoutes manages routes for each service found in the registry. +// manageRegistryRoutes applies action on all routes of each service found in the registry. // It returns error if either the services failed to be listed or the routing table action fails. func (r *router) manageRegistryRoutes(reg registry.Registry, action string) error { services, err := reg.ListServices() @@ -228,8 +232,9 @@ func (r *router) watchTable(w table.Watcher) error { return watchErr } -// advertiseEvents advertises events to event subscribers -func (r *router) advertiseEvents(advType AdvertType, events []*table.Event) { +// publishAdvert publishes router advert to advert channel +// NOTE: this might cease to be a dedicated method in the future +func (r *router) publishAdvert(advType AdvertType, events []*table.Event) { defer r.advertWg.Done() a := &Advert{ @@ -274,7 +279,7 @@ func (r *router) advertiseTable() error { // advertise all routes as Update events to subscribers if len(events) > 0 { r.advertWg.Add(1) - go r.advertiseEvents(Update, events) + go r.publishAdvert(Update, events) } case <-r.exit: return nil @@ -295,14 +300,29 @@ type routeAdvert struct { suppressTime time.Time } -// processEvents processes routing table events. +// advertiseEvents advertises routing table events // It suppresses unhealthy flapping events and advertises healthy events upstream. -func (r *router) processEvents() error { +func (r *router) advertiseEvents() error { // ticker to periodically scan event for advertising ticker := time.NewTicker(AdvertiseEventsTick) // advertMap is a map of advert events advertMap := make(map[uint64]*routeAdvert) + // routing table watcher + tableWatcher, err := r.Watch() + if err != nil { + return fmt.Errorf("failed creating routing table watcher: %v", err) + } + + r.wg.Add(1) + go func() { + defer r.wg.Done() + select { + case r.errChan <- r.watchTable(tableWatcher): + case <-r.exit: + } + }() + for { select { case <-ticker.C: @@ -344,7 +364,7 @@ func (r *router) processEvents() error { // advertise all Update events to subscribers if len(events) > 0 { r.advertWg.Add(1) - go r.advertiseEvents(Update, events) + go r.publishAdvert(Update, events) } case e := <-r.eventChan: // if event is nil, continue @@ -399,30 +419,18 @@ func (r *router) processEvents() error { } // watchErrors watches router errors and takes appropriate actions -func (r *router) watchErrors(errChan <-chan error) { - defer r.wg.Done() - - var code StatusCode +func (r *router) watchErrors() { var err error select { case <-r.exit: - code = Stopped - case err = <-errChan: - code = Error + case err = <-r.errChan: } r.Lock() defer r.Unlock() - status := Status{ - Code: code, - Error: err, - } - r.status = status - - // stop the router if some error happened - if err != nil && code != Stopped { - // this will stop watchers which will close r.advertChan + if r.status.Code != Stopped { + // notify all goroutines to finish close(r.exit) // drain the advertise channel for range r.advertChan { @@ -432,20 +440,88 @@ func (r *router) watchErrors(errChan <-chan error) { } } + if err != nil { + r.status = Status{Code: Error, Error: err} + } } -// Advertise advertises the routes to the network. -// It returns error if any of the launched goroutines fail with error. +// Run runs the router. +// It returns error if the router is already running. +func (r *router) run() { + r.Lock() + defer r.Unlock() + + switch r.status.Code { + case Stopped, Error: + // add all local service routes into the routing table + if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil { + r.status = Status{Code: Error, Error: fmt.Errorf("failed adding registry routes: %s", err)} + return + } + + // add default gateway into routing table + if r.opts.Gateway != "" { + // note, the only non-default value is the gateway + route := table.Route{ + Service: "*", + Address: "*", + Gateway: r.opts.Gateway, + Network: "*", + Metric: table.DefaultLocalMetric, + } + if err := r.Create(route); err != nil { + r.status = Status{Code: Error, Error: fmt.Errorf("failed adding default gateway route: %s", err)} + return + } + } + + // create error and exit channels + r.errChan = make(chan error, 1) + r.exit = make(chan struct{}) + + // registry watcher + regWatcher, err := r.opts.Registry.Watch() + if err != nil { + r.status = Status{Code: Error, Error: fmt.Errorf("failed creating registry watcher: %v", err)} + return + } + + r.wg.Add(1) + go func() { + defer r.wg.Done() + select { + case r.errChan <- r.watchRegistry(regWatcher): + case <-r.exit: + } + }() + + // watch for errors and cleanup + r.wg.Add(1) + go func() { + defer r.wg.Done() + r.watchErrors() + }() + + // mark router as Running and set its Error to nil + r.status = Status{Code: Running, Error: nil} + + return + } + + return +} + +// Advertise stars advertising the routes to the network and returns the advertisements channel to consume from. +// If the router is already advertising it returns the channel to consume from. +// It returns error if either the router is not running or if the routing table fails to list the routes to advertise. func (r *router) Advertise() (<-chan *Advert, error) { r.Lock() defer r.Unlock() - if r.status.Code != Running { - // add all local service routes into the routing table - if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil { - return nil, fmt.Errorf("failed adding routes: %s", err) - } - + switch r.status.Code { + case Advertising: + return r.advertChan, nil + case Running: // list routing table routes to announce routes, err := r.List() if err != nil { @@ -462,85 +538,42 @@ func (r *router) Advertise() (<-chan *Advert, error) { events[i] = event } - // add default gateway into routing table - if r.opts.Gateway != "" { - // note, the only non-default value is the gateway - route := table.Route{ - Service: "*", - Address: "*", - Gateway: r.opts.Gateway, - Network: "*", - Metric: table.DefaultLocalMetric, + // create advertise and event channels + r.advertChan = make(chan *Advert) + r.eventChan = make(chan *table.Event) + + // advertise your presence + r.advertWg.Add(1) + go r.publishAdvert(Announce, events) + + r.wg.Add(1) + go func() { + defer r.wg.Done() + select { + case r.errChan <- r.advertiseEvents(): + case <-r.exit: } - if err := r.Create(route); err != nil { - return nil, fmt.Errorf("failed adding default gateway route: %s", err) - } - } - - // NOTE: we only need to recreate these if the router errored or was stopped - // TODO: These probably dont need to be struct members - if r.status.Code == Error || r.status.Code == Stopped { - r.exit = make(chan struct{}) - r.eventChan = make(chan *table.Event) - r.advertChan = make(chan *Advert) - } - - // routing table watcher - tableWatcher, err := r.Watch() - if err != nil { - return nil, fmt.Errorf("failed creating routing table watcher: %v", err) - } - - // registry watcher - regWatcher, err := r.opts.Registry.Watch() - if err != nil { - return nil, fmt.Errorf("failed creating service registry watcher: %v", err) - } - - // error channel collecting goroutine errors - errChan := make(chan error, 4) - - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch local registry and register routes in routine table - errChan <- r.watchRegistry(regWatcher) - }() - - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch local registry and register routes in routing table - errChan <- r.watchTable(tableWatcher) - }() - - r.wg.Add(1) - go func() { - defer r.wg.Done() - // watch routing table events and process them - errChan <- r.processEvents() }() r.advertWg.Add(1) go func() { defer r.advertWg.Done() // advertise the whole routing table - errChan <- r.advertiseTable() + select { + case r.errChan <- r.advertiseTable(): + case <-r.exit: + } }() - // advertise your presence - r.advertWg.Add(1) - go r.advertiseEvents(Announce, events) + // mark router as Running and set its Error to nil + r.status = Status{Code: Advertising, Error: nil} - // watch for errors and cleanup - r.wg.Add(1) - go r.watchErrors(errChan) - - // mark router as running and set its Error to nil - r.status = Status{Code: Running, Error: nil} + return r.advertChan, nil + case Stopped: + return nil, fmt.Errorf("not running") } - return r.advertChan, nil + return nil, fmt.Errorf("error: %s", r.status.Error) } // Process updates the routing table using the advertised values @@ -579,9 +612,9 @@ func (r *router) Status() Status { // Stop stops the router func (r *router) Stop() error { - r.RLock() - // only close the channel if the router is running - if r.status.Code == Running { + r.Lock() + // only close the channel if the router is running and/or advertising + if r.status.Code == Running || r.status.Code == Advertising { // notify all goroutines to finish close(r.exit) // drain the advertise channel @@ -590,8 +623,11 @@ func (r *router) Stop() error { // drain the event channel for range r.eventChan { } + + // mark the router as Stopped and set its Error to nil + r.status = Status{Code: Stopped, Error: nil} } - r.RUnlock() + r.Unlock() // wait for all goroutines to finish r.wg.Wait() diff --git a/network/router/options.go b/network/router/options.go index 1405ed6e..d64aca01 100644 --- a/network/router/options.go +++ b/network/router/options.go @@ -19,9 +19,9 @@ type Options struct { Id string // Address is router address Address string - // Gateway is micro network gateway + // Gateway is network gateway Gateway string - // Network is micro network + // Network is network address Network string // Registry is the local registry Registry registry.Registry @@ -57,13 +57,6 @@ func Network(n string) Option { } } -// Table sets the routing table -func Table(t table.Table) Option { - return func(o *Options) { - o.Table = t - } -} - // Registry sets the local registry func Registry(r registry.Registry) Option { return func(o *Options) { @@ -71,6 +64,13 @@ func Registry(r registry.Registry) Option { } } +// Table sets the routing table +func Table(t table.Table) Option { + return func(o *Options) { + o.Table = t + } +} + // DefaultOptions returns router default options func DefaultOptions() Options { return Options{ diff --git a/network/router/router.go b/network/router/router.go index 724f38be..8ce84fb1 100644 --- a/network/router/router.go +++ b/network/router/router.go @@ -7,20 +7,9 @@ import ( "github.com/micro/go-micro/network/router/table" ) -const ( - // Status codes - // Running means the router is up and running - Running StatusCode = iota - // Stopped means the router has been stopped - Stopped - // Error means the router has encountered error - Error - - // Advert types - // Announce is advertised when the router announces itself - Announce AdvertType = iota - // Update advertises route updates - Update +var ( + // DefaultRouter is default network router + DefaultRouter = NewRouter() ) // Router is an interface for a routing control plane @@ -46,9 +35,38 @@ type Router interface { // Option used by the router type Option func(*Options) +// StatusCode defines router status +type StatusCode int + +const ( + // Running means the router is up and running + Running StatusCode = iota + // Advertising means the router is advertising + Advertising + // Stopped means the router has been stopped + Stopped + // Error means the router has encountered error + Error +) + +// Status is router status +type Status struct { + // Error is router error + Error error + // Code defines router status + Code StatusCode +} + // AdvertType is route advertisement type type AdvertType int +const ( + // Announce is advertised when the router announces itself + Announce AdvertType = iota + // Update advertises route updates + Update +) + // Advert contains a list of events advertised by the router to the network type Advert struct { // Id is the router Id @@ -63,22 +81,6 @@ type Advert struct { Events []*table.Event } -// StatusCode defines router status -type StatusCode int - -// Status is router status -type Status struct { - // Error is router error - Error error - // Code defines router status - Code StatusCode -} - -var ( - // DefaultRouter is default network router - DefaultRouter = NewRouter() -) - // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...)