1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-05 10:20:53 +02:00
go-micro/router/default.go

734 lines
17 KiB
Go

package router
import (
"fmt"
"math"
"sort"
"strings"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/registry"
)
const (
// AdvertiseEventsTick is time interval in which the router advertises route updates
AdvertiseEventsTick = 5 * time.Second
// AdvertiseTableTick is time interval in which router advertises all routes found in routing table
AdvertiseTableTick = 1 * time.Minute
// AdvertiseFlushTick is time the yet unconsumed advertisements are flush i.e. discarded
AdvertiseFlushTick = 15 * time.Second
// AdvertSuppress is advert suppression threshold
AdvertSuppress = 2000.0
// AdvertRecover is advert recovery threshold
AdvertRecover = 750.0
// DefaultAdvertTTL is default advertisement TTL
DefaultAdvertTTL = 1 * time.Minute
// DeletePenalty penalises route deletion
DeletePenalty = 1000.0
// UpdatePenalty penalises route updates
UpdatePenalty = 500.0
// PenaltyHalfLife is the time the advert penalty decays to half its value
PenaltyHalfLife = 2.0
// MaxSuppressTime defines time after which the suppressed advert is deleted
MaxSuppressTime = 5 * time.Minute
)
var (
// PenaltyDecay is a coefficient which controls the speed the advert penalty decays
PenaltyDecay = math.Log(2) / PenaltyHalfLife
)
// router implements default router
type router struct {
sync.RWMutex
options Options
status Status
table *table
exit chan struct{}
errChan chan error
eventChan chan *Event
advertWg *sync.WaitGroup
wg *sync.WaitGroup
// advert subscribers
subscribers map[string]chan *Advert
}
// newRouter creates new router and returns it
func newRouter(opts ...Option) Router {
// get default options
options := DefaultOptions()
// apply requested options
for _, o := range opts {
o(&options)
}
// set initial status to Stopped
status := Status{Code: Stopped, Error: nil}
return &router{
options: options,
status: status,
table: newTable(),
advertWg: &sync.WaitGroup{},
wg: &sync.WaitGroup{},
subscribers: make(map[string]chan *Advert),
}
}
// Init initializes router with given options
func (r *router) Init(opts ...Option) error {
r.Lock()
defer r.Unlock()
for _, o := range opts {
o(&r.options)
}
return nil
}
// Options returns router options
func (r *router) Options() Options {
r.Lock()
options := r.options
r.Unlock()
return options
}
// Table returns routing table
func (r *router) Table() Table {
return r.table
}
// manageRoute applies action on a given route
func (r *router) manageRoute(route Route, action string) error {
switch action {
case "create":
if err := r.table.Create(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", route.Service, err)
}
case "update":
if err := r.table.Update(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
}
case "delete":
if err := r.table.Delete(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
}
default:
return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", route.Service, action)
}
return nil
}
// manageServiceRoutes applies action to all routes of the service.
// It returns error of the action fails with error.
func (r *router) manageServiceRoutes(service *registry.Service, action string) error {
// action is the routing table action
action = strings.ToLower(action)
// take route action on each service node
for _, node := range service.Nodes {
route := Route{
Service: service.Name,
Address: node.Address,
Gateway: "",
Network: r.options.Network,
Router: r.options.Id,
Link: DefaultLink,
Metric: DefaultLocalMetric,
}
if err := r.manageRoute(route, action); err != nil {
return err
}
}
return nil
}
// manageRegistryRoutes applies action to all routes of each service found in the registry.
// It returns error if either the services failed to be listed or the routing table action fails.
func (r *router) manageRegistryRoutes(reg registry.Registry, action string) error {
services, err := reg.ListServices()
if err != nil {
return fmt.Errorf("failed listing services: %v", err)
}
// add each service node as a separate route
for _, service := range services {
// get the service to retrieve all its info
srvs, err := reg.GetService(service.Name)
if err != nil {
continue
}
// manage the routes for all returned services
for _, srv := range srvs {
if err := r.manageServiceRoutes(srv, action); err != nil {
return err
}
}
}
return nil
}
// watchRegistry watches registry and updates routing table based on the received events.
// It returns error if either the registry watcher fails with error or if the routing table update fails.
func (r *router) watchRegistry(w registry.Watcher) error {
exit := make(chan bool)
defer func() {
// close the exit channel when the go routine finishes
close(exit)
r.wg.Done()
}()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer w.Stop()
select {
case <-r.exit:
return
case <-exit:
return
}
}()
var watchErr error
for {
res, err := w.Next()
if err != nil {
if err != registry.ErrWatcherStopped {
watchErr = err
}
break
}
if err := r.manageServiceRoutes(res.Service, res.Action); err != nil {
return err
}
}
return watchErr
}
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
// It returns error if the locally registered services either fails to be added/deleted to/from network registry.
func (r *router) watchTable(w Watcher) error {
exit := make(chan bool)
defer func() {
// close the exit channel when the go routine finishes
close(exit)
r.wg.Done()
}()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer w.Stop()
select {
case <-r.exit:
return
case <-exit:
return
}
}()
var watchErr error
for {
event, err := w.Next()
if err != nil {
if err != ErrWatcherStopped {
watchErr = err
}
break
}
select {
case <-r.exit:
close(r.eventChan)
return nil
case r.eventChan <- event:
}
}
// close event channel on error
close(r.eventChan)
return watchErr
}
// publishAdvert publishes router advert to advert channel
// NOTE: this might cease to be a dedicated method in the future
func (r *router) publishAdvert(advType AdvertType, events []*Event) {
defer r.advertWg.Done()
a := &Advert{
Id: r.options.Id,
Type: advType,
TTL: DefaultAdvertTTL,
Timestamp: time.Now(),
Events: events,
}
r.RLock()
for _, sub := range r.subscribers {
// check the exit chan first
select {
case <-r.exit:
r.RUnlock()
return
default:
}
// now send the message
select {
case sub <- a:
default:
}
}
r.RUnlock()
}
// advertiseTable advertises the whole routing table to the network
func (r *router) advertiseTable() error {
// create table advertisement ticker
ticker := time.NewTicker(AdvertiseTableTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// list routing table routes to announce
routes, err := r.table.List()
if err != nil {
return fmt.Errorf("failed listing routes: %s", err)
}
// collect all the added routes before we attempt to add default gateway
events := make([]*Event, len(routes))
for i, route := range routes {
event := &Event{
Type: Update,
Timestamp: time.Now(),
Route: route,
}
events[i] = event
}
// advertise all routes as Update events to subscribers
if len(events) > 0 {
r.advertWg.Add(1)
go r.publishAdvert(RouteUpdate, events)
}
case <-r.exit:
return nil
}
}
}
// routeAdvert contains a list of route events to be advertised
type routeAdvert struct {
events []*Event
// lastUpdate records the time of the last advert update
lastUpdate time.Time
// penalty is current advert penalty
penalty float64
// isSuppressed flags the advert suppression
isSuppressed bool
// suppressTime records the time interval the advert has been suppressed for
suppressTime time.Time
}
// advertiseEvents advertises routing table events
// It suppresses unhealthy flapping events and advertises healthy events upstream.
func (r *router) advertiseEvents() error {
// ticker to periodically scan event for advertising
ticker := time.NewTicker(AdvertiseEventsTick)
defer ticker.Stop()
// advertMap is a map of advert events
advertMap := make(map[uint64]*routeAdvert)
// routing table watcher
tableWatcher, err := r.Watch()
if err != nil {
return fmt.Errorf("failed creating routing table watcher: %v", err)
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchTable(tableWatcher):
case <-r.exit:
}
}()
for {
select {
case <-ticker.C:
var events []*Event
// collect all events which are not flapping
for key, advert := range advertMap {
// decay the event penalty
delta := time.Since(advert.lastUpdate).Seconds()
advert.penalty = advert.penalty * math.Exp(-delta*PenaltyDecay)
// suppress/recover the event based on its penalty level
switch {
case advert.penalty > AdvertSuppress && !advert.isSuppressed:
advert.isSuppressed = true
advert.suppressTime = time.Now()
case advert.penalty < AdvertRecover && advert.isSuppressed:
advert.isSuppressed = false
}
// max suppression time threshold has been reached, delete the advert
if advert.isSuppressed {
if time.Since(advert.suppressTime) > MaxSuppressTime {
delete(advertMap, key)
continue
}
}
if !advert.isSuppressed {
for _, event := range advert.events {
e := new(Event)
*e = *event
events = append(events, e)
// delete the advert from the advertMap
delete(advertMap, key)
}
}
}
// advertise all Update events to subscribers
if len(events) > 0 {
r.advertWg.Add(1)
go r.publishAdvert(RouteUpdate, events)
}
case e := <-r.eventChan:
// if event is nil, continue
if e == nil {
continue
}
// determine the event penalty
var penalty float64
switch e.Type {
case Update:
penalty = UpdatePenalty
case Delete:
penalty = DeletePenalty
}
// check if we have already registered the route
// we use the route hash as advertMap key
hash := e.Route.Hash()
advert, ok := advertMap[hash]
if !ok {
events := []*Event{e}
advert = &routeAdvert{
events: events,
penalty: penalty,
lastUpdate: time.Now(),
}
advertMap[hash] = advert
continue
}
// attempt to squash last two events if possible
lastEvent := advert.events[len(advert.events)-1]
if lastEvent.Type == e.Type {
advert.events[len(advert.events)-1] = e
} else {
advert.events = append(advert.events, e)
}
// update event penalty and recorded timestamp
advert.lastUpdate = time.Now()
advert.penalty += penalty
case <-r.exit:
// first wait for the advertiser to finish
r.advertWg.Wait()
return nil
}
}
}
// close closes exit channels
func (r *router) close() {
// notify all goroutines to finish
close(r.exit)
// drain the advertise channel only if advertising
if r.status.Code == Advertising {
// drain the event channel
for range r.eventChan {
}
// close advert subscribers
for id, sub := range r.subscribers {
// close the channel
close(sub)
// delete the subscriber
delete(r.subscribers, id)
}
}
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
}
// watchErrors watches router errors and takes appropriate actions
func (r *router) watchErrors() {
var err error
select {
case <-r.exit:
return
case err = <-r.errChan:
}
r.Lock()
defer r.Unlock()
// if the router is not stopped, stop it
if r.status.Code != Stopped {
// close all the channels
r.close()
// set the status error
if err != nil {
r.status.Error = err
}
}
}
// Start starts the router
func (r *router) Start() error {
r.Lock()
defer r.Unlock()
// only start if we're stopped
if r.status.Code != Stopped {
return nil
}
// add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.options.Registry, "create"); err != nil {
e := fmt.Errorf("failed adding registry routes: %s", err)
r.status = Status{Code: Error, Error: e}
return e
}
// add default gateway into routing table
if r.options.Gateway != "" {
// note, the only non-default value is the gateway
route := Route{
Service: "*",
Address: "*",
Gateway: r.options.Gateway,
Network: "*",
Router: r.options.Id,
Link: DefaultLink,
Metric: DefaultLocalMetric,
}
if err := r.table.Create(route); err != nil {
e := fmt.Errorf("failed adding default gateway route: %s", err)
r.status = Status{Code: Error, Error: e}
return e
}
}
// create error and exit channels
r.errChan = make(chan error, 1)
r.exit = make(chan struct{})
// registry watcher
regWatcher, err := r.options.Registry.Watch()
if err != nil {
e := fmt.Errorf("failed creating registry watcher: %v", err)
r.status = Status{Code: Error, Error: e}
return e
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchRegistry(regWatcher):
case <-r.exit:
}
}()
// watch for errors and cleanup
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.watchErrors()
}()
// mark router as Running
r.status = Status{Code: Running, Error: nil}
return nil
}
// Advertise stars advertising the routes to the network and returns the advertisements channel to consume from.
// If the router is already advertising it returns the channel to consume from.
// It returns error if either the router is not running or if the routing table fails to list the routes to advertise.
func (r *router) Advertise() (<-chan *Advert, error) {
r.Lock()
defer r.Unlock()
switch r.status.Code {
case Advertising:
advertChan := make(chan *Advert)
r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil
case Running:
// list routing table routes to announce
routes, err := r.table.List()
if err != nil {
return nil, fmt.Errorf("failed listing routes: %s", err)
}
// collect all the added routes before we attempt to add default gateway
events := make([]*Event, len(routes))
for i, route := range routes {
event := &Event{
Type: Create,
Timestamp: time.Now(),
Route: route,
}
events[i] = event
}
// create event channels
r.eventChan = make(chan *Event)
// advertise your presence
r.advertWg.Add(1)
go r.publishAdvert(Announce, events)
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.advertiseEvents():
case <-r.exit:
}
}()
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
// advertise the whole routing table
select {
case r.errChan <- r.advertiseTable():
case <-r.exit:
}
}()
// mark router as Running and set its Error to nil
r.status = Status{Code: Advertising, Error: nil}
// create advert channel
advertChan := make(chan *Advert)
r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil
case Stopped:
return nil, fmt.Errorf("not running")
}
return nil, fmt.Errorf("error: %s", r.status.Error)
}
// Process updates the routing table using the advertised values
func (r *router) Process(a *Advert) error {
// NOTE: event sorting might not be necessary
// copy update events intp new slices
events := make([]*Event, len(a.Events))
copy(events, a.Events)
// sort events by timestamp
sort.Slice(events, func(i, j int) bool {
return events[i].Timestamp.Before(events[j].Timestamp)
})
for _, event := range events {
// skip if the router is the origin of this route
if event.Route.Router == r.options.Id {
continue
}
// create a copy of the route
route := event.Route
action := event.Type
if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil {
return fmt.Errorf("failed applying action %s to routing table: %s", action, err)
}
}
return nil
}
// Lookup routes in the routing table
func (r *router) Lookup(q Query) ([]Route, error) {
return r.table.Query(q)
}
// Watch routes
func (r *router) Watch(opts ...WatchOption) (Watcher, error) {
return r.table.Watch(opts...)
}
// Status returns router status
func (r *router) Status() Status {
r.RLock()
defer r.RUnlock()
// make a copy of the status
status := r.status
return status
}
// Stop stops the router
func (r *router) Stop() error {
r.Lock()
defer r.Unlock()
switch r.status.Code {
case Stopped, Error:
return r.status.Error
case Running, Advertising:
// close all the channels
r.close()
}
// wait for all goroutines to finish
r.wg.Wait()
return nil
}
// String prints debugging information about router
func (r *router) String() string {
return "memory"
}