diff --git a/router/default_router.go b/router/default_router.go index 6e949e68..89b28623 100644 --- a/router/default_router.go +++ b/router/default_router.go @@ -103,7 +103,7 @@ func (r *router) Start() error { Name: route.Options().DestAddr, Nodes: []*registry.Node{node}, } - if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil { + if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil { return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err) } } @@ -137,8 +137,8 @@ func (r *router) Start() error { } // addServiceRouteslists all available services in given registry and adds them to the routing table. -// NOTE: this is a one-off operation done to bootstrap the rouing table of the new router when it starts. -// It returns error if any of the routes could not be added to the routing table. +// NOTE: this is a one-off operation done when bootstrapping the routing table of the new router. +// It returns error if either the services could not be listed or if the routes could not be added to the routing table. func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error { services, err := reg.ListServices() if err != nil { @@ -160,11 +160,11 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric return nil } -// parseToNode parses router address into registryNode. -// It retuns error if the router network address could not be parsed into service host and port. -// NOTE: We use ":" as a default delimiter we split the network address on and then attempt to parse port into int. +// parseToNode parses router into registry.Node and returns the result. +// It returns error if the router network address could not be parsed into service host and port. +// NOTE: We use ":" as the default delimiter when we split the network address. func (r *router) parseToNode() (*registry.Node, error) { - // split on ":" as a standard host:port delimiter + // split on ":" as a standard host/port delimiter addr := strings.Split(r.opts.NetworkAddress, ":") // try to parse network port into integer port, err := strconv.Atoi(addr[1]) @@ -219,13 +219,17 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric switch res.Action { case "create": if len(res.Service.Nodes) > 0 { + /// only return error if the route is not duplicate, but something else has failed if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute { return fmt.Errorf("failed to add route for service: %v", res.Service.Name) } } case "delete": - if err := r.opts.Table.Remove(route); err != nil && err != ErrRouteNotFound { - return fmt.Errorf("failed to remove route for service: %v", res.Service.Name) + if len(res.Service.Nodes) <= 1 { + // only return error if the route is present in the table, but something else has failed + if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound { + return fmt.Errorf("failed to delete route for service: %v", res.Service.Name) + } } } } @@ -233,7 +237,8 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric return watchErr } -// watch routing table changes +// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry +// It returns error if the locally registered services either fails to be added/deleted to/from network registry. func (r *router) watchTable(w Watcher) error { defer r.wg.Done() @@ -270,12 +275,18 @@ func (r *router) watchTable(w Watcher) error { switch res.Action { case "add": - if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil { - return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err) + // only register remotely if the service is "local" + if res.Route.Options().Network == "local" { + if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil { + return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err) + } } - case "remove": - if err := r.opts.NetworkRegistry.Register(service); err != nil { - return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err) + case "delete": + // only deregister remotely if the service is "local" + if res.Route.Options().Network == "local" { + if err := r.opts.NetworkRegistry.Deregister(service); err != nil { + return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err) + } } } } @@ -285,7 +296,8 @@ func (r *router) watchTable(w Watcher) error { // Stop stops the router func (r *router) Stop() error { - // NOTE: we need a more efficient way of doing this e.g. network routes should be autoremoved when router stops gossiping + // NOTE: we need a more efficient way of doing this e.g. network routes + // should ideally be autodeleted when the router stops gossiping // deregister all services advertised by this router from remote registry query := NewQuery(QueryGateway(r), QueryNetwork(r.opts.NetworkAddress)) routes, err := r.opts.Table.Lookup(query) diff --git a/router/default_table.go b/router/default_table.go index 78b451ae..70aab636 100644 --- a/router/default_table.go +++ b/router/default_table.go @@ -70,6 +70,7 @@ func (t *table) Add(r Route) error { t.Lock() defer t.Unlock() + // check if the destination has any routes in the table if _, ok := t.m[destAddr]; !ok { t.m[destAddr] = make(map[uint64]Route) t.m[destAddr][sum] = r @@ -77,12 +78,15 @@ func (t *table) Add(r Route) error { return nil } + // only add the route if it exists and if override is requested if _, ok := t.m[destAddr][sum]; ok && r.Options().Policy == OverrideIfExists { t.m[destAddr][sum] = r go t.sendResult(&Result{Action: "update", Route: r}) return nil } + // if we reached this point without already returning the route already exists + // we return nil only if explicitly requested by the client if r.Options().Policy == IgnoreIfExists { return nil } @@ -90,8 +94,8 @@ func (t *table) Add(r Route) error { return ErrDuplicateRoute } -// Remove removes the route from the routing table -func (t *table) Remove(r Route) error { +// Delete deletes the route from the routing table +func (t *table) Delete(r Route) error { t.Lock() defer t.Unlock() @@ -103,7 +107,7 @@ func (t *table) Remove(r Route) error { } delete(t.m[destAddr], sum) - go t.sendResult(&Result{Action: "remove", Route: r}) + go t.sendResult(&Result{Action: "delete", Route: r}) return nil } @@ -116,10 +120,12 @@ func (t *table) Update(r Route) error { t.Lock() defer t.Unlock() + // check if the destAddr has ANY routes in the table if _, ok := t.m[destAddr]; !ok { return ErrRouteNotFound } + // if the route has been found update it if _, ok := t.m[destAddr][sum]; ok { t.m[destAddr][sum] = r go t.sendResult(&Result{Action: "update", Route: r}) diff --git a/router/table.go b/router/table.go index e1361380..53a6f8cd 100644 --- a/router/table.go +++ b/router/table.go @@ -19,8 +19,8 @@ type Table interface { Options() TableOptions // Add adds new route to the routing table Add(Route) error - // Remove removes existing route from the routing table - Remove(Route) error + // Delete deletes existing route from the routing table + Delete(Route) error // Update updates route in the routing table Update(Route) error // Lookup looks up routes in the routing table and returns them diff --git a/router/table_watcher.go b/router/table_watcher.go index 4afe85d0..e1797f25 100644 --- a/router/table_watcher.go +++ b/router/table_watcher.go @@ -23,7 +23,7 @@ type Watcher interface { // Result is returned by a call to Next on the watcher. type Result struct { - // Action is routing table action which is either of add, remove or update + // Action is routing table action which is either of add, delete or update Action string // Route is table rout Route Route @@ -58,8 +58,9 @@ type tableWatcher struct { done chan struct{} } -// TODO: this needs to be thought through properly // Next returns the next noticed action taken on table +// TODO: this needs to be thought through properly +// we are aiming to provide the same watch options Query() provides func (w *tableWatcher) Next() (*Result, error) { for { select { @@ -74,8 +75,6 @@ func (w *tableWatcher) Next() (*Result, error) { return res, nil } } - // ignore if no match is found - continue case <-w.done: return nil, ErrWatcherStopped }