diff --git a/network/router/service/service.go b/network/router/service/service.go index cc79d5ce..d1929b55 100644 --- a/network/router/service/service.go +++ b/network/router/service/service.go @@ -5,6 +5,7 @@ import ( "errors" "sync" + "github.com/google/uuid" "github.com/micro/go-micro/client" "github.com/micro/go-micro/network/router" pb "github.com/micro/go-micro/network/router/proto" @@ -16,9 +17,10 @@ var ( ) type svc struct { - router pb.RouterService - opts router.Options - status router.Status + opts router.Options + router pb.RouterService + status router.Status + watchers map[string]*svcWatcher sync.RWMutex } @@ -37,9 +39,10 @@ func NewRouter(opts ...router.Option) router.Router { // NOTE: should we have Client/Service option in router.Options? s := &svc{ - opts: options, - status: router.Status{Code: router.Stopped, Error: nil}, - router: pb.NewRouterService(router.DefaultName, client), + opts: options, + router: pb.NewRouterService(router.DefaultName, client), + status: router.Status{Code: router.Stopped, Error: nil}, + watchers: make(map[string]*svcWatcher), } return s @@ -58,8 +61,22 @@ func (s *svc) Options() router.Options { return s.opts } +// Run runs the router. +// It returns error if the router is already running. +func (s *svc) run() { + s.Lock() + defer s.Unlock() + + switch s.status.Code { + case router.Stopped, router.Error: + // TODO: start event stream watcher + // TODO: start watchError monitor + } +} + // Advertise advertises routes to the network func (s *svc) Advertise() (<-chan *router.Advert, error) { + // TODO: start advert stream watcher return nil, nil } @@ -70,17 +87,56 @@ func (s *svc) Process(a *router.Advert) error { // Create new route in the routing table func (s *svc) Create(r router.Route) error { - return ErrNotImplemented + route := &pb.Route{ + Service: r.Service, + Address: r.Address, + Gateway: r.Gateway, + Network: r.Network, + Link: r.Link, + Metric: int64(r.Metric), + } + + if _, err := s.router.Create(context.Background(), route); err != nil { + return err + } + + return nil } // Delete deletes existing route from the routing table func (s *svc) Delete(r router.Route) error { - return ErrNotImplemented + route := &pb.Route{ + Service: r.Service, + Address: r.Address, + Gateway: r.Gateway, + Network: r.Network, + Link: r.Link, + Metric: int64(r.Metric), + } + + if _, err := s.router.Delete(context.Background(), route); err != nil { + return err + } + + return nil } // Update updates route in the routing table func (s *svc) Update(r router.Route) error { - return ErrNotImplemented + route := &pb.Route{ + Service: r.Service, + Address: r.Address, + Gateway: r.Gateway, + Network: r.Network, + Link: r.Link, + Metric: int64(r.Metric), + } + + if _, err := s.router.Update(context.Background(), route); err != nil { + return err + } + + return nil } // List returns the list of all routes in the table @@ -138,7 +194,25 @@ func (s *svc) Lookup(q router.Query) ([]router.Route, error) { // Watch returns a watcher which allows to track updates to the routing table func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) { - return nil, nil + wopts := router.WatchOptions{ + Service: "*", + } + + for _, o := range opts { + o(&wopts) + } + + w := &svcWatcher{ + opts: wopts, + resChan: make(chan *router.Event, 10), + done: make(chan struct{}), + } + + s.Lock() + s.watchers[uuid.New().String()] = w + s.Unlock() + + return w, nil } // Status returns router status diff --git a/network/router/service/watcher.go b/network/router/service/watcher.go new file mode 100644 index 00000000..984c3439 --- /dev/null +++ b/network/router/service/watcher.go @@ -0,0 +1,49 @@ +package service + +import ( + "sync" + + "github.com/micro/go-micro/network/router" +) + +type svcWatcher struct { + opts router.WatchOptions + resChan chan *router.Event + done chan struct{} + sync.RWMutex +} + +// Next is a blocking call that returns watch result +func (w *svcWatcher) Next() (*router.Event, error) { + for { + select { + case res := <-w.resChan: + switch w.opts.Service { + case res.Route.Service, "*": + return res, nil + default: + continue + } + case <-w.done: + return nil, router.ErrWatcherStopped + } + } +} + +// Chan returns event channel +func (w *svcWatcher) Chan() (<-chan *router.Event, error) { + return w.resChan, nil +} + +// Stop stops watcher +func (w *svcWatcher) Stop() { + w.Lock() + defer w.Unlock() + + select { + case <-w.done: + return + default: + close(w.done) + } +} diff --git a/network/router/watcher.go b/network/router/watcher.go index d1ff3ea7..d1a9c017 100644 --- a/network/router/watcher.go +++ b/network/router/watcher.go @@ -2,6 +2,7 @@ package router import ( "errors" + "sync" "time" ) @@ -78,10 +79,11 @@ type tableWatcher struct { opts WatchOptions resChan chan *Event done chan struct{} + sync.RWMutex } // Next returns the next noticed action taken on table -// TODO: think this through properly; right now we only watch service +// TODO: right now we only allow to watch particular service func (w *tableWatcher) Next() (*Event, error) { for { select { @@ -105,6 +107,9 @@ func (w *tableWatcher) Chan() (<-chan *Event, error) { // Stop stops routing table watcher func (w *tableWatcher) Stop() { + w.Lock() + defer w.Unlock() + select { case <-w.done: return