1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-23 17:53:05 +02:00

Updated error statements; Update ships list of events.

This commit is contained in:
Milos Gajdos 2019-07-03 19:50:07 +01:00
parent 6bdc23a3aa
commit ea872f6900
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
4 changed files with 86 additions and 38 deletions

View File

@ -2,6 +2,7 @@ package router
import (
"fmt"
"sort"
"strings"
"sync"
"time"
@ -10,6 +11,15 @@ import (
"github.com/olekukonko/tablewriter"
)
const (
// UpdateRoutePenalty penalises route updates
UpdateRoutePenalty = 500
// DeleteRoutePenalty penalises route deletes
DeleteRoutePenalty = 1000
// AdvertiseTick is time interval in which we advertise route updates
AdvertiseTick = 5 * time.Second
)
// router provides default router implementation
type router struct {
opts Options
@ -79,7 +89,7 @@ func (r *router) Network() string {
func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error {
services, err := reg.ListServices()
if err != nil {
return fmt.Errorf("failed to list services: %v", err)
return fmt.Errorf("failed listing services: %v", err)
}
// add each service node as a separate route
@ -148,12 +158,12 @@ func (r *router) manageServiceRoutes(w registry.Watcher, metric int) error {
case "create":
// only return error if the route is not duplicate, but something else has failed
if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed to add route for service %v: %s", res.Service.Name, err)
return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err)
}
case "delete":
// only return error if the route is not in the table, but something else has failed
if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed to delete route for service %v: %s", res.Service.Name, err)
return fmt.Errorf("failed adding route for service %v: %s", res.Service.Name, err)
}
}
}
@ -175,7 +185,6 @@ func (r *router) watchTable(w Watcher) error {
var watchErr error
exit:
for {
event, err := w.Next()
if err != nil {
@ -188,12 +197,13 @@ exit:
u := &Update{
ID: r.ID(),
Timestamp: time.Now(),
Event: event,
Events: []*Event{event},
}
select {
case <-r.exit:
break exit
close(r.advertChan)
return watchErr
case r.advertChan <- u:
}
}
@ -258,7 +268,7 @@ func (r *router) Advertise() (<-chan *Update, error) {
Metric: DefaultLocalMetric,
}
if err := r.opts.Table.Add(route); err != nil {
return nil, fmt.Errorf("error to add default gateway route: %s", err)
return nil, fmt.Errorf("failed adding default gateway route: %s", err)
}
}
@ -271,12 +281,12 @@ func (r *router) Advertise() (<-chan *Update, error) {
// routing table watcher which watches all routes i.e. to every destination
tableWatcher, err := r.opts.Table.Watch(WatchDestination("*"))
if err != nil {
return nil, fmt.Errorf("failed to create routing table watcher: %v", err)
return nil, fmt.Errorf("failed creating routing table watcher: %v", err)
}
// registry watcher
regWatcher, err := r.opts.Registry.Watch()
if err != nil {
return nil, fmt.Errorf("failed to create registry watcher: %v", err)
return nil, fmt.Errorf("failed creating registry watcher: %v", err)
}
// error channel collecting goroutine errors
@ -311,18 +321,32 @@ func (r *router) Advertise() (<-chan *Update, error) {
}
// Update updates the routing table using the advertised values
func (r *router) Update(a *Update) error {
// we extract the route from advertisement and update the routing table
route := Route{
Destination: a.Event.Route.Destination,
Gateway: a.Event.Route.Gateway,
Router: a.Event.Route.Router,
Network: a.Event.Route.Network,
Metric: a.Event.Route.Metric,
Policy: AddIfNotExists,
func (r *router) Update(u *Update) error {
// NOTE: event sorting might not be necessary
// copy update events intp new slices
events := make([]*Event, len(u.Events))
copy(events, u.Events)
// sort events by timestamp
sort.Slice(events, func(i, j int) bool {
return events[i].Timestamp.Before(events[j].Timestamp)
})
for _, event := range events {
// we extract the route from advertisement and update the routing table
route := Route{
Destination: event.Route.Destination,
Gateway: event.Route.Gateway,
Router: event.Route.Router,
Network: event.Route.Network,
Metric: event.Route.Metric,
Policy: AddIfNotExists,
}
if err := r.opts.Table.Update(route); err != nil {
return fmt.Errorf("failed updating routing table: %v", err)
}
}
return r.opts.Table.Update(route)
return nil
}
// Status returns router status

View File

@ -138,6 +138,7 @@ func (t *table) Update(r Route) error {
return ErrRouteNotFound
}
// check if destination has this particular router in the table
if _, ok := t.m[destAddr][sum]; !ok && r.Policy == AddIfNotExists {
t.m[destAddr][sum] = r
go t.sendEvent(&Event{Type: CreateEvent, Route: r})

View File

@ -34,14 +34,39 @@ type Router interface {
String() string
}
// Option used by the router
type Option func(*Options)
// UpdateType is route advertisement update type
type UpdateType int
const (
// Announce is advertised when the router announces itself
Announce UpdateType = iota
// RouteEvent advertises route events
RouteEvent
)
// String returns string representation of update event
func (ut UpdateType) String() string {
switch ut {
case Announce:
return "ANNOUNCE"
case RouteEvent:
return "ROUTE"
default:
return "UNKNOWN"
}
}
// Update is sent by the router to the network
type Update struct {
// ID is the router ID
ID string
// Timestamp marks the time when update is sent
// Timestamp marks the time when the update is sent
Timestamp time.Time
// Event defines advertisement even
Event *Event
// Events is a list of events to advertise
Events []*Event
}
// StatusCode defines router status
@ -58,12 +83,12 @@ type Status struct {
const (
// Init means the rotuer has just been initialized
Init StatusCode = iota
// Running means the router is running
// Running means the router is up and running
Running
// Error means the router has crashed with error
Error
// Stopped means the router has stopped
// Stopped means the router has been stopped
Stopped
// Error means the router has encountered error
Error
)
// String returns human readable status code
@ -73,18 +98,15 @@ func (sc StatusCode) String() string {
return "INITIALIZED"
case Running:
return "RUNNING"
case Error:
return "ERROR"
case Stopped:
return "STOPPED"
case Error:
return "ERROR"
default:
return "UNKNOWN"
}
}
// Option used by the router
type Option func(*Options)
// NewRouter creates new Router and returns it
func NewRouter(opts ...Option) Router {
return newRouter(opts...)

View File

@ -3,6 +3,7 @@ package router
import (
"errors"
"strings"
"time"
"github.com/olekukonko/tablewriter"
)
@ -16,11 +17,11 @@ var (
type EventType int
const (
// CreateEvent is emitted when new route has been created
// CreateEvent is emitted when a new route has been created
CreateEvent EventType = iota
// DeleteEvent is emitted when an existing route has been deleted
DeleteEvent
// UpdateEvent is emitted when a routing table has been updated
// UpdateEvent is emitted when an existing route has been updated
UpdateEvent
)
@ -42,6 +43,8 @@ func (et EventType) String() string {
type Event struct {
// Type defines type of event
Type EventType
// Timestamp is event timestamp
Timestamp time.Time
// Route is table rout
Route Route
}
@ -81,18 +84,16 @@ type tableWatcher struct {
}
// Next returns the next noticed action taken on table
// TODO: this needs to be thought through properly; we only allow watching particular route destination for now
// TODO: this needs to be thought through properly;
// right now we only allow to watch destination
func (w *tableWatcher) Next() (*Event, error) {
for {
select {
case res := <-w.resChan:
switch w.opts.Destination {
case "*", "":
case res.Route.Destination, "*":
return res, nil
default:
if w.opts.Destination == res.Route.Destination {
return res, nil
}
continue
}
case <-w.done: