From ddad43bd774408795c8973252dbc0de43edbc750 Mon Sep 17 00:00:00 2001
From: Milos Gajdos <milosgajdos83@gmail.com>
Date: Fri, 26 Jul 2019 14:05:03 +0100
Subject: [PATCH] Added service.Router Route CRUD. Outlined watcher and run()

---
 network/router/service/service.go | 94 +++++++++++++++++++++++++++----
 network/router/service/watcher.go | 49 ++++++++++++++++
 network/router/watcher.go         |  7 ++-
 3 files changed, 139 insertions(+), 11 deletions(-)
 create mode 100644 network/router/service/watcher.go

diff --git a/network/router/service/service.go b/network/router/service/service.go
index cc79d5ce..d1929b55 100644
--- a/network/router/service/service.go
+++ b/network/router/service/service.go
@@ -5,6 +5,7 @@ import (
 	"errors"
 	"sync"
 
+	"github.com/google/uuid"
 	"github.com/micro/go-micro/client"
 	"github.com/micro/go-micro/network/router"
 	pb "github.com/micro/go-micro/network/router/proto"
@@ -16,9 +17,10 @@ var (
 )
 
 type svc struct {
-	router pb.RouterService
-	opts   router.Options
-	status router.Status
+	opts     router.Options
+	router   pb.RouterService
+	status   router.Status
+	watchers map[string]*svcWatcher
 	sync.RWMutex
 }
 
@@ -37,9 +39,10 @@ func NewRouter(opts ...router.Option) router.Router {
 
 	// NOTE: should we have Client/Service option in router.Options?
 	s := &svc{
-		opts:   options,
-		status: router.Status{Code: router.Stopped, Error: nil},
-		router: pb.NewRouterService(router.DefaultName, client),
+		opts:     options,
+		router:   pb.NewRouterService(router.DefaultName, client),
+		status:   router.Status{Code: router.Stopped, Error: nil},
+		watchers: make(map[string]*svcWatcher),
 	}
 
 	return s
@@ -58,8 +61,22 @@ func (s *svc) Options() router.Options {
 	return s.opts
 }
 
+// Run runs the router.
+// It returns error if the router is already running.
+func (s *svc) run() {
+	s.Lock()
+	defer s.Unlock()
+
+	switch s.status.Code {
+	case router.Stopped, router.Error:
+		// TODO: start event stream watcher
+		// TODO: start watchError monitor
+	}
+}
+
 // Advertise advertises routes to the network
 func (s *svc) Advertise() (<-chan *router.Advert, error) {
+	// TODO: start advert stream watcher
 	return nil, nil
 }
 
@@ -70,17 +87,56 @@ func (s *svc) Process(a *router.Advert) error {
 
 // Create new route in the routing table
 func (s *svc) Create(r router.Route) error {
-	return ErrNotImplemented
+	route := &pb.Route{
+		Service: r.Service,
+		Address: r.Address,
+		Gateway: r.Gateway,
+		Network: r.Network,
+		Link:    r.Link,
+		Metric:  int64(r.Metric),
+	}
+
+	if _, err := s.router.Create(context.Background(), route); err != nil {
+		return err
+	}
+
+	return nil
 }
 
 // Delete deletes existing route from the routing table
 func (s *svc) Delete(r router.Route) error {
-	return ErrNotImplemented
+	route := &pb.Route{
+		Service: r.Service,
+		Address: r.Address,
+		Gateway: r.Gateway,
+		Network: r.Network,
+		Link:    r.Link,
+		Metric:  int64(r.Metric),
+	}
+
+	if _, err := s.router.Delete(context.Background(), route); err != nil {
+		return err
+	}
+
+	return nil
 }
 
 // Update updates route in the routing table
 func (s *svc) Update(r router.Route) error {
-	return ErrNotImplemented
+	route := &pb.Route{
+		Service: r.Service,
+		Address: r.Address,
+		Gateway: r.Gateway,
+		Network: r.Network,
+		Link:    r.Link,
+		Metric:  int64(r.Metric),
+	}
+
+	if _, err := s.router.Update(context.Background(), route); err != nil {
+		return err
+	}
+
+	return nil
 }
 
 // List returns the list of all routes in the table
@@ -138,7 +194,25 @@ func (s *svc) Lookup(q router.Query) ([]router.Route, error) {
 
 // Watch returns a watcher which allows to track updates to the routing table
 func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) {
-	return nil, nil
+	wopts := router.WatchOptions{
+		Service: "*",
+	}
+
+	for _, o := range opts {
+		o(&wopts)
+	}
+
+	w := &svcWatcher{
+		opts:    wopts,
+		resChan: make(chan *router.Event, 10),
+		done:    make(chan struct{}),
+	}
+
+	s.Lock()
+	s.watchers[uuid.New().String()] = w
+	s.Unlock()
+
+	return w, nil
 }
 
 // Status returns router status
diff --git a/network/router/service/watcher.go b/network/router/service/watcher.go
new file mode 100644
index 00000000..984c3439
--- /dev/null
+++ b/network/router/service/watcher.go
@@ -0,0 +1,49 @@
+package service
+
+import (
+	"sync"
+
+	"github.com/micro/go-micro/network/router"
+)
+
+type svcWatcher struct {
+	opts    router.WatchOptions
+	resChan chan *router.Event
+	done    chan struct{}
+	sync.RWMutex
+}
+
+// Next is a blocking call that returns watch result
+func (w *svcWatcher) Next() (*router.Event, error) {
+	for {
+		select {
+		case res := <-w.resChan:
+			switch w.opts.Service {
+			case res.Route.Service, "*":
+				return res, nil
+			default:
+				continue
+			}
+		case <-w.done:
+			return nil, router.ErrWatcherStopped
+		}
+	}
+}
+
+// Chan returns event channel
+func (w *svcWatcher) Chan() (<-chan *router.Event, error) {
+	return w.resChan, nil
+}
+
+// Stop stops watcher
+func (w *svcWatcher) Stop() {
+	w.Lock()
+	defer w.Unlock()
+
+	select {
+	case <-w.done:
+		return
+	default:
+		close(w.done)
+	}
+}
diff --git a/network/router/watcher.go b/network/router/watcher.go
index d1ff3ea7..d1a9c017 100644
--- a/network/router/watcher.go
+++ b/network/router/watcher.go
@@ -2,6 +2,7 @@ package router
 
 import (
 	"errors"
+	"sync"
 	"time"
 )
 
@@ -78,10 +79,11 @@ type tableWatcher struct {
 	opts    WatchOptions
 	resChan chan *Event
 	done    chan struct{}
+	sync.RWMutex
 }
 
 // Next returns the next noticed action taken on table
-// TODO: think this through properly; right now we only watch service
+// TODO: right now we only allow to watch particular service
 func (w *tableWatcher) Next() (*Event, error) {
 	for {
 		select {
@@ -105,6 +107,9 @@ func (w *tableWatcher) Chan() (<-chan *Event, error) {
 
 // Stop stops routing table watcher
 func (w *tableWatcher) Stop() {
+	w.Lock()
+	defer w.Unlock()
+
 	select {
 	case <-w.done:
 		return