1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-23 17:53:05 +02:00

Simplified table code. Fixed event dedup.

This commit is contained in:
Milos Gajdos 2019-09-26 11:56:30 +01:00
parent 6f2a8298ef
commit 77f3e7ef48
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
3 changed files with 20 additions and 24 deletions

View File

@ -622,7 +622,7 @@ func (n *network) processCtrlChan(client transport.Client, listener tunnel.Liste
Events: events,
}
log.Debugf("Network router processing advert: %s", advert.Id)
log.Debugf("Network router %s processing advert: %s", n.Id(), advert.Id)
if err := n.router.Process(advert); err != nil {
log.Debugf("Network failed to process advert %s: %v", advert.Id, err)
}

View File

@ -113,19 +113,19 @@ func (r *router) manageRoute(route Route, action string) error {
if err := r.table.Create(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", route.Service, err)
}
case "update":
if err := r.table.Update(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
}
case "delete":
if err := r.table.Delete(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
}
case "update":
if err := r.table.Update(route); err != nil {
return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
}
case "solicit":
// nothing to do here
return nil
default:
return fmt.Errorf("failed to manage route for service %s. Unknown action: %s", route.Service, action)
return fmt.Errorf("failed to manage route for service %s: unknown action %s", route.Service, action)
}
return nil
@ -426,6 +426,7 @@ func (r *router) advertiseEvents() error {
// advertise all Update events to subscribers
if len(events) > 0 {
r.advertWg.Add(1)
log.Debugf("Router publishing %d events", len(events))
go r.publishAdvert(RouteUpdate, events)
}
case e := <-r.eventChan:
@ -433,7 +434,7 @@ func (r *router) advertiseEvents() error {
if e == nil {
continue
}
log.Debugf("Router processing table event %s for service %s", e.Type, e.Route.Address)
// determine the event penalty
var penalty float64
switch e.Type {
@ -460,7 +461,8 @@ func (r *router) advertiseEvents() error {
// attempt to squash last two events if possible
lastEvent := advert.events[len(advert.events)-1]
if lastEvent.Type == e.Type {
if lastEvent.Type == e.Type && lastEvent.Route.Hash() == hash {
log.Debugf("Router squashing event %s with hash %d for service %s", e.Type, hash, e.Route.Address)
advert.events[len(advert.events)-1] = e
} else {
advert.events = append(advert.events, e)
@ -675,7 +677,7 @@ func (r *router) Process(a *Advert) error {
// create a copy of the route
route := event.Route
action := event.Type
log.Debugf("Router processing route action %s: %s", action, r.options.Id)
log.Debugf("Router %s processing route action %s for: %s", r.options.Id, action, route.Address)
if err := r.manageRoute(route, fmt.Sprintf("%s", action)); err != nil {
return fmt.Errorf("failed applying action %s to routing table: %s", action, err)
}

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/util/log"
)
var (
@ -56,14 +57,12 @@ func (t *table) Create(r Route) error {
// check if there are any routes in the table for the route destination
if _, ok := t.routes[service]; !ok {
t.routes[service] = make(map[uint64]Route)
t.routes[service][sum] = r
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil
}
// add new route to the table for the route destination
if _, ok := t.routes[service][sum]; !ok {
t.routes[service][sum] = r
log.Debugf("Router emitting %s for route: %s", Create, r.Address)
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil
}
@ -83,11 +82,14 @@ func (t *table) Delete(r Route) error {
return ErrRouteNotFound
}
if _, ok := t.routes[service][sum]; ok {
delete(t.routes[service], sum)
go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r})
if _, ok := t.routes[service][sum]; !ok {
return ErrRouteNotFound
}
delete(t.routes[service], sum)
log.Debugf("Router emitting %s for route: %s", Update, r.Address)
go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r})
return nil
}
@ -102,18 +104,10 @@ func (t *table) Update(r Route) error {
// check if the route destination has any routes in the table
if _, ok := t.routes[service]; !ok {
t.routes[service] = make(map[uint64]Route)
t.routes[service][sum] = r
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil
}
if _, ok := t.routes[service][sum]; !ok {
t.routes[service][sum] = r
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
return nil
}
t.routes[service][sum] = r
log.Debugf("Router emitting %s for route: %s", Update, r.Address)
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
return nil