1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-23 17:53:05 +02:00

Add Start method to router

Added Start to router packages.
Fixed potential deadlocks.
This commit is contained in:
Milos Gajdos 2019-08-12 18:18:17 +01:00
parent c0a676bfa9
commit cb1679fd8d
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
3 changed files with 117 additions and 85 deletions

View File

@ -43,10 +43,9 @@ var (
// router implements default router // router implements default router
type router struct { type router struct {
sync.RWMutex sync.RWMutex
// embed the table
table *table
opts Options opts Options
status Status status Status
table *table
exit chan struct{} exit chan struct{}
errChan chan error errChan chan error
eventChan chan *Event eventChan chan *Event
@ -67,33 +66,41 @@ func newRouter(opts ...Option) Router {
o(&options) o(&options)
} }
r := &router{ // set initial status to Stopped
table: newTable(), status := Status{Code: Stopped, Error: nil}
return &router{
opts: options, opts: options,
status: Status{Code: Stopped, Error: nil}, status: status,
table: newTable(),
advertWg: &sync.WaitGroup{}, advertWg: &sync.WaitGroup{},
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
subscribers: make(map[string]chan *Advert), subscribers: make(map[string]chan *Advert),
} }
go r.run()
return r
} }
// Init initializes router with given options // Init initializes router with given options
func (r *router) Init(opts ...Option) error { func (r *router) Init(opts ...Option) error {
r.Lock()
defer r.Unlock()
for _, o := range opts { for _, o := range opts {
o(&r.opts) o(&r.opts)
} }
return nil return nil
} }
// Options returns router options // Options returns router options
func (r *router) Options() Options { func (r *router) Options() Options {
return r.opts r.Lock()
opts := r.opts
r.Unlock()
return opts
} }
// Table returns routing table
func (r *router) Table() Table { func (r *router) Table() Table {
return r.table return r.table
} }
@ -475,11 +482,12 @@ func (r *router) watchErrors() {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
// if the router is not stopped, stop it
if r.status.Code != Stopped { if r.status.Code != Stopped {
// notify all goroutines to finish // notify all goroutines to finish
close(r.exit) close(r.exit)
// drain the advertise channel only if advertising // drain the advertise channel only if the router is advertising
if r.status.Code == Advertising { if r.status.Code == Advertising {
// drain the event channel // drain the event channel
for range r.eventChan { for range r.eventChan {
@ -495,69 +503,67 @@ func (r *router) watchErrors() {
} }
} }
// Run runs the router. // Start starts the router
func (r *router) run() { func (r *router) Start() error {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
switch r.status.Code { // add all local service routes into the routing table
case Stopped, Error: if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil {
// add all local service routes into the routing table e := fmt.Errorf("failed adding registry routes: %s", err)
if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil { r.status = Status{Code: Error, Error: e}
r.status = Status{Code: Error, Error: fmt.Errorf("failed adding registry routes: %s", err)} return e
return
}
// add default gateway into routing table
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
route := Route{
Service: "*",
Address: "*",
Gateway: r.opts.Gateway,
Network: "*",
Metric: DefaultLocalMetric,
}
if err := r.table.Create(route); err != nil {
r.status = Status{Code: Error, Error: fmt.Errorf("failed adding default gateway route: %s", err)}
return
}
}
// create error and exit channels
r.errChan = make(chan error, 1)
r.exit = make(chan struct{})
// registry watcher
regWatcher, err := r.opts.Registry.Watch()
if err != nil {
r.status = Status{Code: Error, Error: fmt.Errorf("failed creating registry watcher: %v", err)}
return
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchRegistry(regWatcher):
case <-r.exit:
}
}()
// watch for errors and cleanup
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.watchErrors()
}()
// mark router as Running and set its Error to nil
r.status = Status{Code: Running, Error: nil}
return
} }
return // add default gateway into routing table
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
route := Route{
Service: "*",
Address: "*",
Gateway: r.opts.Gateway,
Network: "*",
Metric: DefaultLocalMetric,
}
if err := r.table.Create(route); err != nil {
e := fmt.Errorf("failed adding default gateway route: %s", err)
r.status = Status{Code: Error, Error: e}
return e
}
}
// create error and exit channels
r.errChan = make(chan error, 1)
r.exit = make(chan struct{})
// registry watcher
regWatcher, err := r.opts.Registry.Watch()
if err != nil {
e := fmt.Errorf("failed creating registry watcher: %v", err)
r.status = Status{Code: Error, Error: e}
return e
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchRegistry(regWatcher):
case <-r.exit:
}
}()
// watch for errors and cleanup
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.watchErrors()
}()
// mark router as Running
r.status = Status{Code: Running, Error: nil}
return nil
} }
// Advertise stars advertising the routes to the network and returns the advertisements channel to consume from. // Advertise stars advertising the routes to the network and returns the advertisements channel to consume from.
@ -578,6 +584,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("failed listing routes: %s", err) return nil, fmt.Errorf("failed listing routes: %s", err)
} }
// collect all the added routes before we attempt to add default gateway // collect all the added routes before we attempt to add default gateway
events := make([]*Event, len(routes)) events := make([]*Event, len(routes))
for i, route := range routes { for i, route := range routes {

View File

@ -21,6 +21,8 @@ type Router interface {
Lookup(Query) ([]Route, error) Lookup(Query) ([]Route, error)
// Watch returns a watcher which tracks updates to the routing table // Watch returns a watcher which tracks updates to the routing table
Watch(opts ...WatchOption) (Watcher, error) Watch(opts ...WatchOption) (Watcher, error)
// Start starts the router
Start() error
// Status returns router status // Status returns router status
Status() Status Status() Status
// Stop stops the router // Stop stops the router
@ -76,10 +78,15 @@ func (s StatusCode) String() string {
// Status is router status // Status is router status
type Status struct { type Status struct {
// Error is router error
Error error
// Code defines router status // Code defines router status
Code StatusCode Code StatusCode
// Error contains error description
Error error
}
// String returns human readable status
func (s Status) String() string {
return s.Code.String()
} }
// AdvertType is route advertisement type // AdvertType is route advertisement type

View File

@ -43,9 +43,16 @@ func NewRouter(opts ...router.Option) router.Router {
cli = options.Client cli = options.Client
} }
// set the status to Stopped
status := &router.Status{
Code: router.Stopped,
Error: nil,
}
// NOTE: should we have Client/Service option in router.Options? // NOTE: should we have Client/Service option in router.Options?
s := &svc{ s := &svc{
opts: options, opts: options,
status: status,
router: pb.NewRouterService(router.DefaultName, cli), router: pb.NewRouterService(router.DefaultName, cli),
} }
@ -63,21 +70,43 @@ func NewRouter(opts ...router.Option) router.Router {
// Init initializes router with given options // Init initializes router with given options
func (s *svc) Init(opts ...router.Option) error { func (s *svc) Init(opts ...router.Option) error {
s.Lock()
defer s.Unlock()
for _, o := range opts { for _, o := range opts {
o(&s.opts) o(&s.opts)
} }
return nil return nil
} }
// Options returns router options // Options returns router options
func (s *svc) Options() router.Options { func (s *svc) Options() router.Options {
return s.opts s.Lock()
opts := s.opts
s.Unlock()
return opts
} }
// Table returns routing table
func (s *svc) Table() router.Table { func (s *svc) Table() router.Table {
return s.table return s.table
} }
// Start starts the service
func (s *svc) Start() error {
s.Lock()
defer s.Unlock()
s.status = &router.Status{
Code: router.Running,
Error: nil,
}
return nil
}
func (s *svc) advertiseEvents(advertChan chan *router.Advert, stream pb.Router_AdvertiseService) error { func (s *svc) advertiseEvents(advertChan chan *router.Advert, stream pb.Router_AdvertiseService) error {
go func() { go func() {
<-s.exit <-s.exit
@ -140,10 +169,7 @@ func (s *svc) Advertise() (<-chan *router.Advert, error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
// get the status switch s.status.Code {
status := s.Status()
switch status.Code {
case router.Running, router.Advertising: case router.Running, router.Advertising:
stream, err := s.router.Advertise(context.Background(), &pb.Request{}, s.callOpts...) stream, err := s.router.Advertise(context.Background(), &pb.Request{}, s.callOpts...)
if err != nil { if err != nil {
@ -154,15 +180,7 @@ func (s *svc) Advertise() (<-chan *router.Advert, error) {
go s.advertiseEvents(advertChan, stream) go s.advertiseEvents(advertChan, stream)
return advertChan, nil return advertChan, nil
case router.Stopped: case router.Stopped:
// check if our router is stopped return nil, fmt.Errorf("not running")
select {
case <-s.exit:
s.exit = make(chan bool)
// call advertise again
return s.Advertise()
default:
return nil, fmt.Errorf("not running")
}
} }
return nil, fmt.Errorf("error: %s", s.status.Error) return nil, fmt.Errorf("error: %s", s.status.Error)