1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-11 17:18:28 +02:00

Outline of Advertise, Watch and start of the router.

This commit is contained in:
Milos Gajdos 2019-07-26 17:11:59 +01:00
parent ddad43bd77
commit c5740ae031
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
2 changed files with 192 additions and 11 deletions

View File

@ -3,7 +3,10 @@ package service
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/client"
@ -17,10 +20,14 @@ var (
)
type svc struct {
opts router.Options
router pb.RouterService
status router.Status
watchers map[string]*svcWatcher
opts router.Options
router pb.RouterService
status router.Status
watchers map[string]*svcWatcher
exit chan struct{}
errChan chan error
advertChan chan *router.Advert
wg *sync.WaitGroup
sync.RWMutex
}
@ -43,8 +50,11 @@ func NewRouter(opts ...router.Option) router.Router {
router: pb.NewRouterService(router.DefaultName, client),
status: router.Status{Code: router.Stopped, Error: nil},
watchers: make(map[string]*svcWatcher),
wg: &sync.WaitGroup{},
}
go s.run()
return s
}
@ -61,6 +71,70 @@ func (s *svc) Options() router.Options {
return s.opts
}
// watchErrors watches router errors and takes appropriate actions
func (s *svc) watchErrors() {
var err error
select {
case <-s.exit:
case err = <-s.errChan:
}
s.Lock()
defer s.Unlock()
if s.status.Code != router.Stopped {
// notify all goroutines to finish
close(s.exit)
// TODO" might need to drain some channels here
}
if err != nil {
s.status = router.Status{Code: router.Error, Error: err}
}
}
// watchRouter watches router and send events to all registered watchers
func (s *svc) watchRouter(stream pb.Router_WatchService) error {
defer stream.Close()
var watchErr error
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
watchErr = err
}
break
}
route := router.Route{
Service: resp.Route.Service,
Address: resp.Route.Address,
Gateway: resp.Route.Gateway,
Network: resp.Route.Network,
Link: resp.Route.Link,
Metric: int(resp.Route.Metric),
}
event := &router.Event{
Type: router.EventType(resp.Type),
Timestamp: time.Unix(0, resp.Timestamp),
Route: route,
}
s.RLock()
for _, w := range s.watchers {
select {
case w.resChan <- event:
case <-w.done:
}
}
s.RUnlock()
}
return watchErr
}
// Run runs the router.
// It returns error if the router is already running.
func (s *svc) run() {
@ -69,15 +143,107 @@ func (s *svc) run() {
switch s.status.Code {
case router.Stopped, router.Error:
// TODO: start event stream watcher
// TODO: start watchError monitor
stream, err := s.router.Watch(context.Background(), &pb.WatchRequest{})
if err != nil {
s.status = router.Status{Code: router.Error, Error: fmt.Errorf("failed getting router stream: %s", err)}
return
}
// create error and exit channels
s.errChan = make(chan error, 1)
s.exit = make(chan struct{})
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case s.errChan <- s.watchRouter(stream):
case <-s.exit:
}
}()
// watch for errors and cleanup
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.watchErrors()
}()
// mark router as Running and set its Error to nil
s.status = router.Status{Code: router.Running, Error: nil}
return
}
return
}
func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error {
defer stream.Close()
var advErr error
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
advErr = err
}
break
}
// TODO: sort out events and TTL
advert := &router.Advert{
Id: resp.Id,
Type: router.AdvertType(resp.Type),
Timestamp: time.Unix(0, resp.Timestamp),
//Events: events,
}
select {
case s.advertChan <- advert:
case <-s.exit:
return nil
}
}
return advErr
}
// Advertise advertises routes to the network
func (s *svc) Advertise() (<-chan *router.Advert, error) {
// TODO: start advert stream watcher
return nil, nil
s.Lock()
defer s.Unlock()
switch s.status.Code {
case router.Advertising:
return s.advertChan, nil
case router.Running:
stream, err := s.router.Advertise(context.Background(), &pb.AdvertiseRequest{})
if err != nil {
return nil, fmt.Errorf("failed getting advert stream: %s", err)
}
// create advertise and event channels
s.advertChan = make(chan *router.Advert)
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case s.errChan <- s.advertiseEvents(stream):
case <-s.exit:
}
}()
// mark router as Running and set its Error to nil
s.status = router.Status{Code: router.Advertising, Error: nil}
return s.advertChan, nil
case router.Stopped:
return nil, fmt.Errorf("not running")
}
return nil, fmt.Errorf("error: %s", s.status.Error)
}
// Process processes incoming adverts
@ -228,6 +394,21 @@ func (s *svc) Status() router.Status {
// Stop stops the router
func (s *svc) Stop() error {
s.Lock()
// only close the channel if the router is running and/or advertising
if s.status.Code == router.Running || s.status.Code == router.Advertising {
// notify all goroutines to finish
close(s.exit)
// TODO: might need to drain some channels here
// mark the router as Stopped and set its Error to nil
s.status = router.Status{Code: router.Stopped, Error: nil}
}
s.Unlock()
// wait for all goroutines to finish
s.wg.Wait()
return nil
}

View File

@ -179,14 +179,14 @@ func (t *Table) Watch(opts ...WatchOption) (Watcher, error) {
return w, nil
}
// sendEvent sends rules to all subscribe watchers
func (t *Table) sendEvent(r *Event) {
// sendEvent sends events to all subscribed watchers
func (t *Table) sendEvent(e *Event) {
t.RLock()
defer t.RUnlock()
for _, w := range t.watchers {
select {
case w.resChan <- r:
case w.resChan <- e:
case <-w.done:
}
}