1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

Added Init state. Recreate exit and advertise channels when recovering

In order to differentiate between intialized and other states we
introduced a new state: Init. The router is in this state only when it's
created.

We have cleaned up router status management which is now handled by
manageStatus function only.
This commit is contained in:
Milos Gajdos 2019-06-29 00:46:22 +01:00
parent 32300eadc1
commit cff46c3fd8
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
2 changed files with 138 additions and 102 deletions

View File

@ -32,7 +32,7 @@ func newRouter(opts ...Option) Router {
return &router{
opts: options,
status: Status{Error: nil, Code: Stopped},
status: Status{Error: nil, Code: Init},
advertChan: make(chan *Update),
exit: make(chan struct{}),
wg: &sync.WaitGroup{},
@ -72,99 +72,6 @@ func (r *router) Network() string {
return r.opts.Network
}
// Advertise advertises the routes to the network.
// It returns error if any of the launched goroutines fail with error.
func (r *router) Advertise() (<-chan *Update, error) {
r.Lock()
defer r.Unlock()
if r.status.Code != Running {
// add local service routes into the routing table
if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil {
return nil, fmt.Errorf("failed adding routes: %v", err)
}
// add default gateway into routing table
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
route := Route{
Destination: "*",
Gateway: r.opts.Gateway,
Router: "*",
Network: "*",
Metric: DefaultLocalMetric,
}
if err := r.opts.Table.Add(route); err != nil {
return nil, fmt.Errorf("error to add default gateway route: %s", err)
}
}
// routing table watcher which watches all routes i.e. to every destination
tableWatcher, err := r.opts.Table.Watch(WatchDestination("*"))
if err != nil {
return nil, fmt.Errorf("failed to create routing table watcher: %v", err)
}
// registry watcher
regWatcher, err := r.opts.Registry.Watch()
if err != nil {
return nil, fmt.Errorf("failed to create registry watcher: %v", err)
}
// error channel collecting goroutine errors
errChan := make(chan error, 2)
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routine table
errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routing table
errChan <- r.watchTable(tableWatcher)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
// wait for exit chan
case <-r.exit:
r.status.Code = Stopped
case err := <-errChan:
r.status.Code = Error
r.status.Error = err
}
// close the advertise channel
close(r.advertChan)
}()
// mark the router as running
r.status.Code = Running
}
return r.advertChan, nil
}
// Update updates the routing table using the advertised values
func (r *router) Update(a *Update) error {
// we extract the route from advertisement and update the routing table
route := Route{
Destination: a.Event.Route.Destination,
Gateway: a.Event.Route.Gateway,
Router: a.Event.Route.Router,
Network: a.Event.Route.Network,
Metric: a.Event.Route.Metric,
Policy: AddIfNotExists,
}
return r.opts.Table.Update(route)
}
// addServiceRoutes adds all services in given registry to the routing table.
// NOTE: this is a one-off operation done when bootstrapping the routing table
// It returns error if either the services failed to be listed or
@ -301,21 +208,145 @@ func (r *router) watchTable(w Watcher) error {
return watchErr
}
// manageStatus manages router status
func (r *router) manageStatus(errChan <-chan error) {
defer r.wg.Done()
r.Lock()
r.status.Code = Running
r.status.Error = nil
r.Unlock()
var code StatusCode
var err error
select {
case <-r.exit:
code = Stopped
case err = <-errChan:
code = Error
}
r.Lock()
defer r.Unlock()
r.status.Code = code
r.status.Error = err
// close the advertise channel
close(r.advertChan)
// stop the router if some error happened
// this will notify all watcher goroutines to stop
if err != nil && code != Stopped {
close(r.exit)
}
}
// Advertise advertises the routes to the network.
// It returns error if any of the launched goroutines fail with error.
func (r *router) Advertise() (<-chan *Update, error) {
r.Lock()
defer r.Unlock()
if r.status.Code != Running {
// add local service routes into the routing table
if err := r.addServiceRoutes(r.opts.Registry, "local", DefaultLocalMetric); err != nil {
return nil, fmt.Errorf("failed adding routes: %v", err)
}
// add default gateway into routing table
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
route := Route{
Destination: "*",
Gateway: r.opts.Gateway,
Router: "*",
Network: "*",
Metric: DefaultLocalMetric,
}
if err := r.opts.Table.Add(route); err != nil {
return nil, fmt.Errorf("error to add default gateway route: %s", err)
}
}
// NOTE: we only need to recreate the exit/advertChan if the router errored or was stopped
if r.status.Code == Error || r.status.Code == Stopped {
r.exit = make(chan struct{})
r.advertChan = make(chan *Update)
}
// routing table watcher which watches all routes i.e. to every destination
tableWatcher, err := r.opts.Table.Watch(WatchDestination("*"))
if err != nil {
return nil, fmt.Errorf("failed to create routing table watcher: %v", err)
}
// registry watcher
regWatcher, err := r.opts.Registry.Watch()
if err != nil {
return nil, fmt.Errorf("failed to create registry watcher: %v", err)
}
// error channel collecting goroutine errors
errChan := make(chan error, 2)
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routine table
errChan <- r.manageServiceRoutes(regWatcher, DefaultLocalMetric)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
// watch local registry and register routes in routing table
errChan <- r.watchTable(tableWatcher)
}()
r.wg.Add(1)
go r.manageStatus(errChan)
}
return r.advertChan, nil
}
// Update updates the routing table using the advertised values
func (r *router) Update(a *Update) error {
// we extract the route from advertisement and update the routing table
route := Route{
Destination: a.Event.Route.Destination,
Gateway: a.Event.Route.Gateway,
Router: a.Event.Route.Router,
Network: a.Event.Route.Network,
Metric: a.Event.Route.Metric,
Policy: AddIfNotExists,
}
return r.opts.Table.Update(route)
}
// Status returns router status
func (r *router) Status() Status {
r.RLock()
defer r.RUnlock()
return r.status
// make a copy of the status
status := r.status
return status
}
// Stop stops the router
func (r *router) Stop() error {
// notify all goroutines to finish
close(r.exit)
r.RLock()
defer r.RUnlock()
// wait for all goroutines to finish
r.wg.Wait()
// only close the channel if the router is running
if r.status.Code == Running {
// notify all goroutines to finish
close(r.exit)
// wait for all goroutines to finish
r.wg.Wait()
}
return nil
}
@ -325,13 +356,14 @@ func (r *router) String() string {
sb := &strings.Builder{}
table := tablewriter.NewWriter(sb)
table.SetHeader([]string{"ID", "Address", "Network", "Table"})
table.SetHeader([]string{"ID", "Address", "Network", "Table", "Status"})
data := []string{
r.opts.ID,
r.opts.Address,
r.opts.Network,
fmt.Sprintf("%d", r.opts.Table.Size()),
r.status.Code.String(),
}
table.Append(data)

View File

@ -56,8 +56,10 @@ type Status struct {
}
const (
// Running means the rotuer is running
Running StatusCode = iota
// Init means the rotuer has just been initialized
Init StatusCode = iota
// Running means the router is running
Running
// Error means the router has crashed with error
Error
// Stopped means the router has stopped
@ -67,6 +69,8 @@ const (
// String returns human readable status code
func (sc StatusCode) String() string {
switch sc {
case Init:
return "INITIALIZED"
case Running:
return "RUNNING"
case Error: