1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

Adds network id. Skips processing routes when router is the origin.

This commit is contained in:
Milos Gajdos 2019-08-27 23:08:35 +01:00
parent 470304ef87
commit 5e7208119e
No known key found for this signature in database
GPG Key ID: 8B31058CC55DFD4F
8 changed files with 142 additions and 76 deletions

View File

@ -34,8 +34,8 @@ type network struct {
proxy.Proxy
// tun is network tunnel
tunnel.Tunnel
// srv is network server
srv server.Server
// server is network server
server server.Server
// client is network client
client client.Client
@ -59,13 +59,19 @@ func newNetwork(opts ...Option) Network {
tunnel.Address(options.Address),
)
// init router Id to the network id
options.Router.Init(
router.Id(options.Id),
)
// create tunnel client with tunnel transport
tunTransport := trn.NewTransport(
trn.WithTunnel(options.Tunnel),
)
// srv is network server
srv := server.NewServer(
// server is network server
server := server.NewServer(
server.Id(options.Id),
server.Address(options.Address),
server.Name(options.Name),
server.Transport(tunTransport),
@ -86,7 +92,7 @@ func newNetwork(opts ...Option) Network {
Router: options.Router,
Proxy: options.Proxy,
Tunnel: options.Tunnel,
srv: srv,
server: server,
client: client,
}
}
@ -333,7 +339,7 @@ func (n *network) Connect() error {
go n.process(listener)
// start the server
if err := n.srv.Start(); err != nil {
if err := n.server.Start(); err != nil {
return err
}
@ -345,7 +351,7 @@ func (n *network) Connect() error {
func (n *network) close() error {
// stop the server
if err := n.srv.Stop(); err != nil {
if err := n.server.Stop(); err != nil {
return err
}
@ -390,5 +396,5 @@ func (n *network) Client() client.Client {
// Server returns network server
func (n *network) Server() server.Server {
return n.srv
return n.server
}

View File

@ -1,6 +1,7 @@
package network
import (
"github.com/google/uuid"
"github.com/micro/go-micro/network/resolver"
"github.com/micro/go-micro/network/resolver/registry"
"github.com/micro/go-micro/proxy"
@ -13,6 +14,8 @@ type Option func(*Options)
// Options configure network
type Options struct {
// Id of the node
Id string
// Name of the network
Name string
// Address to bind to
@ -27,14 +30,21 @@ type Options struct {
Resolver resolver.Resolver
}
// Name is the network name
// Id sets the id of the network node
func Id(id string) Option {
return func(o *Options) {
o.Id = id
}
}
// Name sets the network name
func Name(n string) Option {
return func(o *Options) {
o.Name = n
}
}
// Address is the network address
// Address sets the network address
func Address(a string) Option {
return func(o *Options) {
o.Address = a
@ -72,6 +82,7 @@ func Resolver(r resolver.Resolver) Option {
// DefaultOptions returns network default options
func DefaultOptions() Options {
return Options{
Id: uuid.New().String(),
Name: DefaultName,
Address: DefaultAddress,
Tunnel: tunnel.NewTunnel(),

View File

@ -43,7 +43,7 @@ var (
// router implements default router
type router struct {
sync.RWMutex
opts Options
options Options
status Status
table *table
exit chan struct{}
@ -70,7 +70,7 @@ func newRouter(opts ...Option) Router {
status := Status{Code: Stopped, Error: nil}
return &router{
opts: options,
options: options,
status: status,
table: newTable(),
advertWg: &sync.WaitGroup{},
@ -85,7 +85,7 @@ func (r *router) Init(opts ...Option) error {
defer r.Unlock()
for _, o := range opts {
o(&r.opts)
o(&r.options)
}
return nil
@ -94,10 +94,10 @@ func (r *router) Init(opts ...Option) error {
// Options returns router options
func (r *router) Options() Options {
r.Lock()
opts := r.opts
options := r.options
r.Unlock()
return opts
return options
}
// Table returns routing table
@ -139,7 +139,8 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
Service: service.Name,
Address: node.Address,
Gateway: "",
Network: r.opts.Network,
Network: r.options.Network,
Router: r.options.Id,
Link: DefaultLink,
Metric: DefaultLocalMetric,
}
@ -278,7 +279,7 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) {
defer r.advertWg.Done()
a := &Advert{
Id: r.opts.Id,
Id: r.options.Id,
Type: advType,
TTL: DefaultAdvertTTL,
Timestamp: time.Now(),
@ -529,20 +530,22 @@ func (r *router) Start() error {
}
// add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil {
if err := r.manageRegistryRoutes(r.options.Registry, "create"); err != nil {
e := fmt.Errorf("failed adding registry routes: %s", err)
r.status = Status{Code: Error, Error: e}
return e
}
// add default gateway into routing table
if r.opts.Gateway != "" {
if r.options.Gateway != "" {
// note, the only non-default value is the gateway
route := Route{
Service: "*",
Address: "*",
Gateway: r.opts.Gateway,
Gateway: r.options.Gateway,
Network: "*",
Router: r.options.Id,
Link: DefaultLink,
Metric: DefaultLocalMetric,
}
if err := r.table.Create(route); err != nil {
@ -557,7 +560,7 @@ func (r *router) Start() error {
r.exit = make(chan struct{})
// registry watcher
regWatcher, err := r.opts.Registry.Watch()
regWatcher, err := r.options.Registry.Watch()
if err != nil {
e := fmt.Errorf("failed creating registry watcher: %v", err)
r.status = Status{Code: Error, Error: e}
@ -669,6 +672,10 @@ func (r *router) Process(a *Advert) error {
})
for _, event := range events {
// skip if the router is the origin of this route
if event.Route.Router == r.options.Id {
continue
}
// create a copy of the route
route := event.Route
action := event.Type

View File

@ -11,29 +11,38 @@ type QueryOptions struct {
Gateway string
// Network is network address
Network string
// Router is router id
Router string
}
// QueryService sets destination address
// QueryService sets service to query
func QueryService(s string) QueryOption {
return func(o *QueryOptions) {
o.Service = s
}
}
// QueryGateway sets route gateway
// QueryGateway sets gateway address to query
func QueryGateway(g string) QueryOption {
return func(o *QueryOptions) {
o.Gateway = g
}
}
// QueryNetwork sets route network address
// QueryNetwork sets network name to query
func QueryNetwork(n string) QueryOption {
return func(o *QueryOptions) {
o.Network = n
}
}
// QueryRouter sets router id to query
func QueryRouter(r string) QueryOption {
return func(o *QueryOptions) {
o.Router = r
}
}
// Query is routing table query
type Query interface {
// Options returns query options
@ -52,6 +61,7 @@ func NewQuery(opts ...QueryOption) Query {
Service: "*",
Gateway: "*",
Network: "*",
Router: "*",
}
for _, o := range opts {
@ -67,8 +77,3 @@ func NewQuery(opts ...QueryOption) Query {
func (q *query) Options() QueryOptions {
return q.opts
}
// String prints routing table query in human readable form
func (q query) String() string {
return "query"
}

View File

@ -7,9 +7,9 @@ import (
var (
// DefaultLink is default network link
DefaultLink = "local"
// DefaultLocalMetric is default route cost metric for the local network
// DefaultLocalMetric is default route cost for a local route
DefaultLocalMetric = 1
// DefaultNetworkMetric is default route cost metric for the micro network
// DefaultNetworkMetric is default route cost for a network route
DefaultNetworkMetric = 10
)
@ -23,6 +23,8 @@ type Route struct {
Gateway string
// Network is network address
Network string
// Router is router id
Router string
// Link is network link
Link string
// Metric is the route cost metric
@ -33,6 +35,6 @@ type Route struct {
func (r *Route) Hash() uint64 {
h := fnv.New64()
h.Reset()
h.Write([]byte(r.Service + r.Address + r.Gateway + r.Network + r.Link))
h.Write([]byte(r.Service + r.Address + r.Gateway + r.Network + r.Router + r.Link))
return h.Sum64()
}

View File

@ -8,7 +8,14 @@ import (
"github.com/google/uuid"
)
// table is an in memory routing table
var (
// ErrRouteNotFound is returned when no route was found in the routing table
ErrRouteNotFound = errors.New("route not found")
// ErrDuplicateRoute is returned when the route already exists
ErrDuplicateRoute = errors.New("duplicate route")
)
// table is an in-memory routing table
type table struct {
sync.RWMutex
// routes stores service routes
@ -25,6 +32,19 @@ func newTable(opts ...Option) *table {
}
}
// sendEvent sends events to all subscribed watchers
func (t *table) sendEvent(e *Event) {
t.RLock()
defer t.RUnlock()
for _, w := range t.watchers {
select {
case w.resChan <- e:
case <-w.done:
}
}
}
// Create creates new route in the routing table
func (t *table) Create(r Route) error {
service := r.Service
@ -106,21 +126,23 @@ func (t *table) List() ([]Route, error) {
return routes, nil
}
// isMatch checks if the route matches given network and router
func isMatch(route Route, network, router string) bool {
if network == "*" || network == route.Network {
if router == "*" || router == route.Gateway {
return true
// isMatch checks if the route matches given query options
func isMatch(route Route, gateway, network, router string) bool {
if gateway == "*" || gateway == route.Gateway {
if network == "*" || network == route.Network {
if router == "*" || router == route.Router {
return true
}
}
}
return false
}
// findRoutes finds all the routes for given network and router and returns them
func findRoutes(routes map[uint64]Route, network, router string) []Route {
func findRoutes(routes map[uint64]Route, gateway, network, router string) []Route {
var results []Route
for _, route := range routes {
if isMatch(route, network, router) {
if isMatch(route, gateway, network, router) {
results = append(results, route)
}
}
@ -136,13 +158,13 @@ func (t *table) Query(q Query) ([]Route, error) {
if _, ok := t.routes[q.Options().Service]; !ok {
return nil, ErrRouteNotFound
}
return findRoutes(t.routes[q.Options().Service], q.Options().Network, q.Options().Gateway), nil
return findRoutes(t.routes[q.Options().Service], q.Options().Gateway, q.Options().Network, q.Options().Router), nil
}
var results []Route
// search through all destinations
for _, routes := range t.routes {
results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...)
results = append(results, findRoutes(routes, q.Options().Gateway, q.Options().Network, q.Options().Router)...)
}
return results, nil
@ -181,23 +203,3 @@ func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
return w, nil
}
// sendEvent sends events to all subscribed watchers
func (t *table) sendEvent(e *Event) {
t.RLock()
defer t.RUnlock()
for _, w := range t.watchers {
select {
case w.resChan <- e:
case <-w.done:
}
}
}
var (
// ErrRouteNotFound is returned when no route was found in the routing table
ErrRouteNotFound = errors.New("route not found")
// ErrDuplicateRoute is returned when the route already exists
ErrDuplicateRoute = errors.New("duplicate route")
)

View File

@ -9,6 +9,7 @@ func testSetup() (*table, Route) {
Service: "dest.svc",
Gateway: "dest.gw",
Network: "dest.network",
Router: "src.router",
Link: "det.link",
Metric: 10,
}
@ -109,11 +110,13 @@ func TestQuery(t *testing.T) {
svc := []string{"svc1", "svc2", "svc3"}
net := []string{"net1", "net2", "net1"}
gw := []string{"gw1", "gw2", "gw3"}
rtr := []string{"rtr1", "rt2", "rt3"}
for i := 0; i < len(svc); i++ {
route.Service = svc[i]
route.Network = net[i]
route.Gateway = gw[i]
route.Router = rtr[i]
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
@ -127,8 +130,9 @@ func TestQuery(t *testing.T) {
t.Errorf("error looking up routes: %s", err)
}
// query particular net
query = NewQuery(QueryNetwork("net1"))
// query routes particular network
network := "net1"
query = NewQuery(QueryNetwork(network))
routes, err = table.Query(query)
if err != nil {
@ -139,7 +143,13 @@ func TestQuery(t *testing.T) {
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
}
// query particular gateway
for _, route := range routes {
if route.Network != network {
t.Errorf("incorrect route returned. Expected network: %s, found: %s", network, route.Network)
}
}
// query routes for particular gateway
gateway := "gw1"
query = NewQuery(QueryGateway(gateway))
@ -156,11 +166,28 @@ func TestQuery(t *testing.T) {
t.Errorf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
}
// query particular route
network := "net1"
// query routes for particular router
router := "rtr1"
query = NewQuery(QueryRouter(router))
routes, err = table.Query(query)
if err != nil {
t.Errorf("error looking up routes: %s", err)
}
if len(routes) != 1 {
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
}
if routes[0].Router != router {
t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router)
}
// query particular gateway and network
query = NewQuery(
QueryGateway(gateway),
QueryNetwork(network),
QueryRouter(router),
)
routes, err = table.Query(query)
@ -180,7 +207,11 @@ func TestQuery(t *testing.T) {
t.Errorf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network)
}
// bullshit route query
if routes[0].Router != router {
t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router)
}
// non-existen route query
query = NewQuery(QueryService("foobar"))
routes, err = table.Query(query)

View File

@ -6,6 +6,11 @@ import (
"time"
)
var (
// ErrWatcherStopped is returned when routing table watcher has been stopped
ErrWatcherStopped = errors.New("watcher stopped")
)
// EventType defines routing table event
type EventType int
@ -42,9 +47,6 @@ type Event struct {
Route Route
}
// WatchOption is used to define what routes to watch in the table
type WatchOption func(*WatchOptions)
// Watcher defines routing table watcher interface
// Watcher returns updates to the routing table
type Watcher interface {
@ -56,7 +58,11 @@ type Watcher interface {
Stop()
}
// WatchOption is used to define what routes to watch in the table
type WatchOption func(*WatchOptions)
// WatchOptions are table watcher options
// TODO: expand the options to watch based on other criteria
type WatchOptions struct {
// Service allows to watch specific service routes
Service string
@ -70,6 +76,7 @@ func WatchService(s string) WatchOption {
}
}
// tableWatcher implements routing table Watcher
type tableWatcher struct {
sync.RWMutex
id string
@ -113,8 +120,3 @@ func (w *tableWatcher) Stop() {
close(w.done)
}
}
var (
// ErrWatcherStopped is returned when routing table watcher has been stopped
ErrWatcherStopped = errors.New("watcher stopped")
)