1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-24 10:07:04 +02:00

Increased Network registry TTL. Routing Table remove is now delete.

Remove has been renamed to Delete to be more in line with the framework.

A bunch of comments have been added/updated for the future generations

We have increased the Network Registry TTL to 2 minutes.
This commit is contained in:
Milos Gajdos 2019-06-17 19:51:13 +01:00
parent f62fcaad76
commit 5088c9d916
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
4 changed files with 42 additions and 25 deletions

View File

@ -103,7 +103,7 @@ func (r *router) Start() error {
Name: route.Options().DestAddr,
Nodes: []*registry.Node{node},
}
if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil {
if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil {
return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err)
}
}
@ -137,8 +137,8 @@ func (r *router) Start() error {
}
// addServiceRouteslists all available services in given registry and adds them to the routing table.
// NOTE: this is a one-off operation done to bootstrap the rouing table of the new router when it starts.
// It returns error if any of the routes could not be added to the routing table.
// NOTE: this is a one-off operation done when bootstrapping the routing table of the new router.
// It returns error if either the services could not be listed or if the routes could not be added to the routing table.
func (r *router) addServiceRoutes(reg registry.Registry, network string, metric int) error {
services, err := reg.ListServices()
if err != nil {
@ -160,11 +160,11 @@ func (r *router) addServiceRoutes(reg registry.Registry, network string, metric
return nil
}
// parseToNode parses router address into registryNode.
// It retuns error if the router network address could not be parsed into service host and port.
// NOTE: We use ":" as a default delimiter we split the network address on and then attempt to parse port into int.
// parseToNode parses router into registry.Node and returns the result.
// It returns error if the router network address could not be parsed into service host and port.
// NOTE: We use ":" as the default delimiter when we split the network address.
func (r *router) parseToNode() (*registry.Node, error) {
// split on ":" as a standard host:port delimiter
// split on ":" as a standard host/port delimiter
addr := strings.Split(r.opts.NetworkAddress, ":")
// try to parse network port into integer
port, err := strconv.Atoi(addr[1])
@ -219,13 +219,17 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric
switch res.Action {
case "create":
if len(res.Service.Nodes) > 0 {
/// only return error if the route is not duplicate, but something else has failed
if err := r.opts.Table.Add(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed to add route for service: %v", res.Service.Name)
}
}
case "delete":
if err := r.opts.Table.Remove(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed to remove route for service: %v", res.Service.Name)
if len(res.Service.Nodes) <= 1 {
// only return error if the route is present in the table, but something else has failed
if err := r.opts.Table.Delete(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed to delete route for service: %v", res.Service.Name)
}
}
}
}
@ -233,7 +237,8 @@ func (r *router) manageServiceRoutes(w registry.Watcher, network string, metric
return watchErr
}
// watch routing table changes
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
// It returns error if the locally registered services either fails to be added/deleted to/from network registry.
func (r *router) watchTable(w Watcher) error {
defer r.wg.Done()
@ -270,12 +275,18 @@ func (r *router) watchTable(w Watcher) error {
switch res.Action {
case "add":
if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(10*time.Second)); err != nil {
return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err)
// only register remotely if the service is "local"
if res.Route.Options().Network == "local" {
if err := r.opts.NetworkRegistry.Register(service, registry.RegisterTTL(120*time.Second)); err != nil {
return fmt.Errorf("failed to register service %s in network registry: %v", service.Name, err)
}
}
case "remove":
if err := r.opts.NetworkRegistry.Register(service); err != nil {
return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err)
case "delete":
// only deregister remotely if the service is "local"
if res.Route.Options().Network == "local" {
if err := r.opts.NetworkRegistry.Deregister(service); err != nil {
return fmt.Errorf("failed to deregister service %s from network registry: %v", service.Name, err)
}
}
}
}
@ -285,7 +296,8 @@ func (r *router) watchTable(w Watcher) error {
// Stop stops the router
func (r *router) Stop() error {
// NOTE: we need a more efficient way of doing this e.g. network routes should be autoremoved when router stops gossiping
// NOTE: we need a more efficient way of doing this e.g. network routes
// should ideally be autodeleted when the router stops gossiping
// deregister all services advertised by this router from remote registry
query := NewQuery(QueryGateway(r), QueryNetwork(r.opts.NetworkAddress))
routes, err := r.opts.Table.Lookup(query)

View File

@ -70,6 +70,7 @@ func (t *table) Add(r Route) error {
t.Lock()
defer t.Unlock()
// check if the destination has any routes in the table
if _, ok := t.m[destAddr]; !ok {
t.m[destAddr] = make(map[uint64]Route)
t.m[destAddr][sum] = r
@ -77,12 +78,15 @@ func (t *table) Add(r Route) error {
return nil
}
// only add the route if it exists and if override is requested
if _, ok := t.m[destAddr][sum]; ok && r.Options().Policy == OverrideIfExists {
t.m[destAddr][sum] = r
go t.sendResult(&Result{Action: "update", Route: r})
return nil
}
// if we reached this point without already returning the route already exists
// we return nil only if explicitly requested by the client
if r.Options().Policy == IgnoreIfExists {
return nil
}
@ -90,8 +94,8 @@ func (t *table) Add(r Route) error {
return ErrDuplicateRoute
}
// Remove removes the route from the routing table
func (t *table) Remove(r Route) error {
// Delete deletes the route from the routing table
func (t *table) Delete(r Route) error {
t.Lock()
defer t.Unlock()
@ -103,7 +107,7 @@ func (t *table) Remove(r Route) error {
}
delete(t.m[destAddr], sum)
go t.sendResult(&Result{Action: "remove", Route: r})
go t.sendResult(&Result{Action: "delete", Route: r})
return nil
}
@ -116,10 +120,12 @@ func (t *table) Update(r Route) error {
t.Lock()
defer t.Unlock()
// check if the destAddr has ANY routes in the table
if _, ok := t.m[destAddr]; !ok {
return ErrRouteNotFound
}
// if the route has been found update it
if _, ok := t.m[destAddr][sum]; ok {
t.m[destAddr][sum] = r
go t.sendResult(&Result{Action: "update", Route: r})

View File

@ -19,8 +19,8 @@ type Table interface {
Options() TableOptions
// Add adds new route to the routing table
Add(Route) error
// Remove removes existing route from the routing table
Remove(Route) error
// Delete deletes existing route from the routing table
Delete(Route) error
// Update updates route in the routing table
Update(Route) error
// Lookup looks up routes in the routing table and returns them

View File

@ -23,7 +23,7 @@ type Watcher interface {
// Result is returned by a call to Next on the watcher.
type Result struct {
// Action is routing table action which is either of add, remove or update
// Action is routing table action which is either of add, delete or update
Action string
// Route is table rout
Route Route
@ -58,8 +58,9 @@ type tableWatcher struct {
done chan struct{}
}
// TODO: this needs to be thought through properly
// Next returns the next noticed action taken on table
// TODO: this needs to be thought through properly
// we are aiming to provide the same watch options Query() provides
func (w *tableWatcher) Next() (*Result, error) {
for {
select {
@ -74,8 +75,6 @@ func (w *tableWatcher) Next() (*Result, error) {
return res, nil
}
}
// ignore if no match is found
continue
case <-w.done:
return nil, ErrWatcherStopped
}