diff --git a/router/default.go b/router/default.go index e5971228..5d2ffc29 100644 --- a/router/default.go +++ b/router/default.go @@ -3,7 +3,9 @@ package router import ( "fmt" "strings" + "sync" + "github.com/micro/go-log" "github.com/micro/go-micro/registry" "github.com/micro/go-micro/registry/gossip" "github.com/olekukonko/tablewriter" @@ -12,6 +14,8 @@ import ( type router struct { opts Options goss registry.Registry + exit chan struct{} + wg *sync.WaitGroup } func newRouter(opts ...Option) Router { @@ -20,22 +24,22 @@ func newRouter(opts ...Option) Router { Table: NewTable(), } + // apply requested options for _, o := range opts { o(&options) } + // bind to gossip address to join gossip registry goss := gossip.NewRegistry( gossip.Address(options.GossipAddr), ) - r := &router{ + return &router{ opts: options, goss: goss, + exit: make(chan struct{}), + wg: &sync.WaitGroup{}, } - - // TODO: start gossip.Registry watch here - - return r } // Init initializes router with given options @@ -66,6 +70,118 @@ func (r *router) Network() string { return r.opts.NetworkAddr } +// Start starts the router +func (r *router) Start() error { + // TODO: + // - list all remote services and populate routing table + // - list all local services and populate remote registry + + gWatcher, err := r.goss.Watch() + if err != nil { + return fmt.Errorf("failed to create router gossip registry watcher: %v", err) + } + + tWatcher, err := r.opts.Table.Watch() + if err != nil { + return fmt.Errorf("failed to create routing table watcher: %v", err) + } + + r.wg.Add(1) + go r.watchGossip(gWatcher) + + r.wg.Add(1) + go r.watchTable(tWatcher) + + return nil +} + +// watch gossip registry +func (r *router) watchGossip(w registry.Watcher) error { + defer r.wg.Done() + + r.wg.Add(1) + go func() { + defer r.wg.Done() + <-r.exit + // stop gossip registry watcher + w.Stop() + }() + + var watchErr error + + // watch for changes to services + for { + res, err := w.Next() + if err == registry.ErrWatcherStopped { + break + } + + if err != nil { + watchErr = err + break + } + + switch res.Action { + case "create": + if len(res.Service.Nodes) > 0 { + log.Logf("Action: %s, Service: %v", res.Action, res.Service.Name) + } + case "delete": + log.Logf("Action: %s, Service: %v", res.Action, res.Service.Name) + } + } + + return watchErr +} + +// watch gossip registry +func (r *router) watchTable(w Watcher) error { + defer r.wg.Done() + + r.wg.Add(1) + go func() { + defer r.wg.Done() + <-r.exit + // stop gossip registry watcher + w.Stop() + }() + + var watchErr error + + // watch for changes to services + for { + res, err := w.Next() + if err == ErrWatcherStopped { + break + } + + if err != nil { + watchErr = err + break + } + + switch res.Action { + case "add": + log.Logf("Action: %s, Route: %v", res.Action, res.Route) + case "remove": + log.Logf("Action: %s, Route: %v", res.Action, res.Route) + } + } + + return watchErr +} + +// Stop stops the router +func (r *router) Stop() error { + // notify all goroutines to finish + close(r.exit) + + // wait for all goroutines to finish + r.wg.Wait() + + return nil +} + // String prints debugging information about router func (r *router) String() string { sb := &strings.Builder{} diff --git a/router/entry.go b/router/entry.go index 7d4a8d88..d446a3d9 100644 --- a/router/entry.go +++ b/router/entry.go @@ -14,8 +14,8 @@ const ( type RouteOptions struct { // DestAddr is destination address DestAddr string - // Hop is the next route hop - Hop Router + // Gateway is the next route hop + Gateway Router // Network defines micro network Network string // Metric is route cost metric @@ -31,10 +31,10 @@ func DestAddr(a string) RouteOption { } } -// Hop allows to set the route route options -func Hop(r Router) RouteOption { +// Gateway sets the route gateway +func Gateway(r Router) RouteOption { return func(o *RouteOptions) { - o.Hop = r + o.Gateway = r } } diff --git a/router/options.go b/router/options.go index 066bf5ab..f8c34091 100644 --- a/router/options.go +++ b/router/options.go @@ -29,21 +29,21 @@ func ID(id string) Option { } } -// Address allows to set router address +// Address sets router address func Address(a string) Option { return func(o *Options) { o.Address = a } } -// GossipAddress allows to set router address -func GossipAddress(a string) Option { +// GossipAddr sets router gossip address +func GossipAddr(a string) Option { return func(o *Options) { o.GossipAddr = a } } -// NetworkAddr allows to set router network +// NetworkAddr sets router network address func NetworkAddr(n string) Option { return func(o *Options) { o.NetworkAddr = n diff --git a/router/query.go b/router/query.go index 68298740..a022eda2 100644 --- a/router/query.go +++ b/router/query.go @@ -13,26 +13,17 @@ const ( // QueryOptions allow to define routing table query options type QueryOptions struct { // Route allows to set route options - Route *RouteOptions - // Service is micro service name - Service string + RouteOptions *RouteOptions // Policy defines query lookup policy Policy LookupPolicy // Count defines max number of results to return Count int } -// RouteOpts allows to set the route query options -func RouteOpts(r *RouteOptions) QueryOption { +// QueryRouteOpts allows to set the route query options +func QueryRouteOptons(r *RouteOptions) QueryOption { return func(o *QueryOptions) { - o.Route = r - } -} - -// Service allows to set the service name in routing query -func Service(s string) QueryOption { - return func(o *QueryOptions) { - o.Service = s + o.RouteOptions = r } } @@ -43,8 +34,8 @@ func QueryPolicy(p LookupPolicy) QueryOption { } } -// ResultCount allows to set max results to return -func ResultCount(c int) QueryOption { +// QueryCount allows to set max results to return +func QueryCount(c int) QueryOption { return func(o *QueryOptions) { o.Count = c } diff --git a/router/router.go b/router/router.go index fffd1752..40b49bc5 100644 --- a/router/router.go +++ b/router/router.go @@ -1,4 +1,4 @@ -// Package router provides an interface for micro network routers +// Package router provides an interface for micro network router package router // Router is micro network router @@ -9,11 +9,15 @@ type Router interface { Options() Options // Table returns routing table Table() Table - // Address returns router gossip adddress + // Address returns router adddress Address() string - // Network returns micro network address + // Network returns router network address Network() string - // String implemens fmt.Stringer interface + // Start starts router + Start() error + // Stop stops router + Stop() error + // String returns router debug info String() string } @@ -32,6 +36,9 @@ type RouteOption func(*RouteOptions) // QueryOption is used to define query options type QueryOption func(*QueryOptions) +// WatchOption is used to define what routes to watch in the table +type WatchOption func(*WatchOptions) + // NewRouter creates new Router and returns it func NewRouter(opts ...Option) Router { return newRouter(opts...) diff --git a/router/table.go b/router/table.go index 618b69ee..9611bf65 100644 --- a/router/table.go +++ b/router/table.go @@ -8,6 +8,7 @@ import ( "strings" "sync" + "github.com/google/uuid" "github.com/olekukonko/tablewriter" ) @@ -24,12 +25,14 @@ var ( type Table interface { // Add adds new route to the table Add(Route) error - // Remove removes route from the table + // Remove removes existing route from the table Remove(Route) error // Update updates route in the table Update(...RouteOption) error // Lookup looks up routes in the table Lookup(Query) ([]Route, error) + // Watch returns a watcher which allows you to track updates to the table + Watch(opts ...WatchOption) (Watcher, error) // Size returns the size of the table Size() int // String prints the routing table @@ -37,12 +40,13 @@ type Table interface { } // table is routing table -// It maps service name to routes type table struct { // m stores routing table map - m map[uint64]Route - // h is a hasher hashes route entries + m map[string]map[uint64]Route + // h hashes route entries h hash.Hash64 + // w is a list of table watchers + w map[string]*tableWatcher sync.RWMutex } @@ -52,73 +56,120 @@ func NewTable() Table { h.Reset() return &table{ - m: make(map[uint64]Route), + m: make(map[string]map[uint64]Route), + w: make(map[string]*tableWatcher), h: h, } } -// Add adds new routing entry +// Add adds a route to the routing table func (t *table) Add(r Route) error { t.Lock() defer t.Unlock() + destAddr := r.Options().DestAddr sum := t.hash(r) - if _, ok := t.m[sum]; !ok { - t.m[sum] = r + if _, ok := t.m[destAddr]; !ok { + t.m[destAddr] = make(map[uint64]Route) + t.m[destAddr][sum] = r + go t.sendResult(&Result{Action: "add", Route: r}) return nil } - if _, ok := t.m[sum]; ok && r.Options().Policy == OverrideIfExists { - t.m[sum] = r + if _, ok := t.m[destAddr][sum]; ok && r.Options().Policy == OverrideIfExists { + t.m[destAddr][sum] = r + go t.sendResult(&Result{Action: "update", Route: r}) return nil } return ErrDuplicateRoute } -// Remove removes entry from the routing table +// Remove removes the route from the routing table func (t *table) Remove(r Route) error { t.Lock() defer t.Unlock() + destAddr := r.Options().DestAddr sum := t.hash(r) - if _, ok := t.m[sum]; !ok { + if _, ok := t.m[destAddr]; !ok { return ErrRouteNotFound } - delete(t.m, sum) + delete(t.m[destAddr], sum) + go t.sendResult(&Result{Action: "remove", Route: r}) return nil } -// Update updates routing entry +// Update updates routing table using propvided options func (t *table) Update(opts ...RouteOption) error { t.Lock() defer t.Unlock() r := NewRoute(opts...) + destAddr := r.Options().DestAddr sum := t.hash(r) - if _, ok := t.m[sum]; !ok { + if _, ok := t.m[destAddr]; !ok { return ErrRouteNotFound } - if _, ok := t.m[sum]; ok { - t.m[sum] = r + if _, ok := t.m[destAddr][sum]; ok { + t.m[destAddr][sum] = r + go t.sendResult(&Result{Action: "update", Route: r}) return nil } return ErrRouteNotFound } -// Lookup looks up entry in the routing table +// Lookup queries routing table and returns all routes that match it func (t *table) Lookup(q Query) ([]Route, error) { return nil, ErrNotImplemented } +// Watch returns routing table entry watcher +func (t *table) Watch(opts ...WatchOption) (Watcher, error) { + // by default watch everything + wopts := WatchOptions{ + DestAddr: "*", + Network: "*", + } + + for _, o := range opts { + o(&wopts) + } + + watcher := &tableWatcher{ + opts: wopts, + resChan: make(chan *Result, 10), + done: make(chan struct{}), + } + + t.Lock() + t.w[uuid.New().String()] = watcher + t.Unlock() + + return watcher, nil +} + +// sendResult sends rules to all subscribe watchers +func (t *table) sendResult(r *Result) { + t.RLock() + defer t.RUnlock() + + for _, w := range t.w { + select { + case w.resChan <- r: + case <-w.done: + } + } +} + // Size returns the size of the routing table func (t *table) Size() int { t.RLock() @@ -127,7 +178,7 @@ func (t *table) Size() int { return len(t.m) } -// String returns text representation of routing table +// String returns debug information func (t *table) String() string { t.RLock() defer t.RUnlock() @@ -137,16 +188,18 @@ func (t *table) String() string { // create nice table printing structure table := tablewriter.NewWriter(sb) - table.SetHeader([]string{"Service", "Gateway", "Network", "Metric"}) + table.SetHeader([]string{"Destination", "Gateway", "Network", "Metric"}) - for _, route := range t.m { - strRoute := []string{ - route.Options().DestAddr, - route.Options().Hop.Address(), - route.Options().Network, - fmt.Sprintf("%d", route.Options().Metric), + for _, destRoute := range t.m { + for _, route := range destRoute { + strRoute := []string{ + route.Options().DestAddr, + route.Options().Gateway.Address(), + route.Options().Gateway.Network(), + fmt.Sprintf("%d", route.Options().Metric), + } + table.Append(strRoute) } - table.Append(strRoute) } // render table into sb @@ -155,13 +208,13 @@ func (t *table) String() string { return sb.String() } +// hash hashes the route using router gateway and network address func (t *table) hash(r Route) uint64 { - destAddr := r.Options().DestAddr - routerAddr := r.Options().Hop.Address() - network := r.Options().Network + gwAddr := r.Options().Gateway.Address() + netAddr := r.Options().Network t.h.Reset() - t.h.Write([]byte(destAddr + routerAddr + network)) + t.h.Write([]byte(gwAddr + netAddr)) return t.h.Sum64() } diff --git a/router/table_watcher.go b/router/table_watcher.go new file mode 100644 index 00000000..c246100b --- /dev/null +++ b/router/table_watcher.go @@ -0,0 +1,89 @@ +package router + +import ( + "errors" +) + +var ( + // ErrWatcherStopped is returned when routing table watcher has been stopped + ErrWatcherStopped = errors.New("routing table watcher stopped") +) + +// Watcher is an interface that returns updates to the routing table +type Watcher interface { + // Next is a blocking call that returns watch result + Next() (*Result, error) + // Stop stops watcher + Stop() +} + +// Result is returned by a call to Next on the watcher. +type Result struct { + // Action is routing table action which is either of add, remove or update + Action string + // Route is table rout + Route Route +} + +// Watcher options +type WatchOptions struct { + // Specify destination address to watch + DestAddr string + // Specify network to watch + Network string +} + +// WatchDestAddr sets what destination to watch +// Destination is usually microservice name +func WatchDestAddr(a string) WatchOption { + return func(o *WatchOptions) { + o.DestAddr = a + } +} + +// WatchNetwork sets what network to watch +func WatchNetwork(n string) WatchOption { + return func(o *WatchOptions) { + o.Network = n + } +} + +type tableWatcher struct { + opts WatchOptions + resChan chan *Result + done chan struct{} +} + +// TODO: We might simply use Query here once QueryLookup is figured out +// Next returns the next noticed action taken on table +func (w *tableWatcher) Next() (*Result, error) { + for { + select { + case res := <-w.resChan: + switch w.opts.DestAddr { + case "*": + if w.opts.Network == "*" || w.opts.Network == res.Route.Options().Network { + return res, nil + } + case res.Route.Options().DestAddr: + if w.opts.Network == "*" || w.opts.Network == res.Route.Options().Network { + return res, nil + } + } + // ignore if no match is found + continue + case <-w.done: + return nil, ErrWatcherStopped + } + } +} + +// Stop stops routing table watcher +func (w *tableWatcher) Stop() { + select { + case <-w.done: + return + default: + close(w.done) + } +}