mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-12 08:23:58 +02:00
Merge pull request #703 from milosgajdos83/net-id
Adds network id. Skips processing routes when router is the origin.
This commit is contained in:
commit
731f6f74dd
@ -34,8 +34,8 @@ type network struct {
|
||||
proxy.Proxy
|
||||
// tun is network tunnel
|
||||
tunnel.Tunnel
|
||||
// srv is network server
|
||||
srv server.Server
|
||||
// server is network server
|
||||
server server.Server
|
||||
// client is network client
|
||||
client client.Client
|
||||
|
||||
@ -59,13 +59,19 @@ func newNetwork(opts ...Option) Network {
|
||||
tunnel.Address(options.Address),
|
||||
)
|
||||
|
||||
// init router Id to the network id
|
||||
options.Router.Init(
|
||||
router.Id(options.Id),
|
||||
)
|
||||
|
||||
// create tunnel client with tunnel transport
|
||||
tunTransport := trn.NewTransport(
|
||||
trn.WithTunnel(options.Tunnel),
|
||||
)
|
||||
|
||||
// srv is network server
|
||||
srv := server.NewServer(
|
||||
// server is network server
|
||||
server := server.NewServer(
|
||||
server.Id(options.Id),
|
||||
server.Address(options.Address),
|
||||
server.Name(options.Name),
|
||||
server.Transport(tunTransport),
|
||||
@ -86,7 +92,7 @@ func newNetwork(opts ...Option) Network {
|
||||
Router: options.Router,
|
||||
Proxy: options.Proxy,
|
||||
Tunnel: options.Tunnel,
|
||||
srv: srv,
|
||||
server: server,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
@ -333,7 +339,7 @@ func (n *network) Connect() error {
|
||||
go n.process(listener)
|
||||
|
||||
// start the server
|
||||
if err := n.srv.Start(); err != nil {
|
||||
if err := n.server.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -345,7 +351,7 @@ func (n *network) Connect() error {
|
||||
|
||||
func (n *network) close() error {
|
||||
// stop the server
|
||||
if err := n.srv.Stop(); err != nil {
|
||||
if err := n.server.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -390,5 +396,5 @@ func (n *network) Client() client.Client {
|
||||
|
||||
// Server returns network server
|
||||
func (n *network) Server() server.Server {
|
||||
return n.srv
|
||||
return n.server
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package network
|
||||
|
||||
import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/micro/go-micro/network/resolver"
|
||||
"github.com/micro/go-micro/network/resolver/registry"
|
||||
"github.com/micro/go-micro/proxy"
|
||||
@ -13,6 +14,8 @@ type Option func(*Options)
|
||||
|
||||
// Options configure network
|
||||
type Options struct {
|
||||
// Id of the node
|
||||
Id string
|
||||
// Name of the network
|
||||
Name string
|
||||
// Address to bind to
|
||||
@ -27,14 +30,21 @@ type Options struct {
|
||||
Resolver resolver.Resolver
|
||||
}
|
||||
|
||||
// Name is the network name
|
||||
// Id sets the id of the network node
|
||||
func Id(id string) Option {
|
||||
return func(o *Options) {
|
||||
o.Id = id
|
||||
}
|
||||
}
|
||||
|
||||
// Name sets the network name
|
||||
func Name(n string) Option {
|
||||
return func(o *Options) {
|
||||
o.Name = n
|
||||
}
|
||||
}
|
||||
|
||||
// Address is the network address
|
||||
// Address sets the network address
|
||||
func Address(a string) Option {
|
||||
return func(o *Options) {
|
||||
o.Address = a
|
||||
@ -72,6 +82,7 @@ func Resolver(r resolver.Resolver) Option {
|
||||
// DefaultOptions returns network default options
|
||||
func DefaultOptions() Options {
|
||||
return Options{
|
||||
Id: uuid.New().String(),
|
||||
Name: DefaultName,
|
||||
Address: DefaultAddress,
|
||||
Tunnel: tunnel.NewTunnel(),
|
||||
|
@ -43,7 +43,7 @@ var (
|
||||
// router implements default router
|
||||
type router struct {
|
||||
sync.RWMutex
|
||||
opts Options
|
||||
options Options
|
||||
status Status
|
||||
table *table
|
||||
exit chan struct{}
|
||||
@ -70,7 +70,7 @@ func newRouter(opts ...Option) Router {
|
||||
status := Status{Code: Stopped, Error: nil}
|
||||
|
||||
return &router{
|
||||
opts: options,
|
||||
options: options,
|
||||
status: status,
|
||||
table: newTable(),
|
||||
advertWg: &sync.WaitGroup{},
|
||||
@ -85,7 +85,7 @@ func (r *router) Init(opts ...Option) error {
|
||||
defer r.Unlock()
|
||||
|
||||
for _, o := range opts {
|
||||
o(&r.opts)
|
||||
o(&r.options)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -94,10 +94,10 @@ func (r *router) Init(opts ...Option) error {
|
||||
// Options returns router options
|
||||
func (r *router) Options() Options {
|
||||
r.Lock()
|
||||
opts := r.opts
|
||||
options := r.options
|
||||
r.Unlock()
|
||||
|
||||
return opts
|
||||
return options
|
||||
}
|
||||
|
||||
// Table returns routing table
|
||||
@ -139,7 +139,8 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
|
||||
Service: service.Name,
|
||||
Address: node.Address,
|
||||
Gateway: "",
|
||||
Network: r.opts.Network,
|
||||
Network: r.options.Network,
|
||||
Router: r.options.Id,
|
||||
Link: DefaultLink,
|
||||
Metric: DefaultLocalMetric,
|
||||
}
|
||||
@ -278,7 +279,7 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) {
|
||||
defer r.advertWg.Done()
|
||||
|
||||
a := &Advert{
|
||||
Id: r.opts.Id,
|
||||
Id: r.options.Id,
|
||||
Type: advType,
|
||||
TTL: DefaultAdvertTTL,
|
||||
Timestamp: time.Now(),
|
||||
@ -529,20 +530,22 @@ func (r *router) Start() error {
|
||||
}
|
||||
|
||||
// add all local service routes into the routing table
|
||||
if err := r.manageRegistryRoutes(r.opts.Registry, "create"); err != nil {
|
||||
if err := r.manageRegistryRoutes(r.options.Registry, "create"); err != nil {
|
||||
e := fmt.Errorf("failed adding registry routes: %s", err)
|
||||
r.status = Status{Code: Error, Error: e}
|
||||
return e
|
||||
}
|
||||
|
||||
// add default gateway into routing table
|
||||
if r.opts.Gateway != "" {
|
||||
if r.options.Gateway != "" {
|
||||
// note, the only non-default value is the gateway
|
||||
route := Route{
|
||||
Service: "*",
|
||||
Address: "*",
|
||||
Gateway: r.opts.Gateway,
|
||||
Gateway: r.options.Gateway,
|
||||
Network: "*",
|
||||
Router: r.options.Id,
|
||||
Link: DefaultLink,
|
||||
Metric: DefaultLocalMetric,
|
||||
}
|
||||
if err := r.table.Create(route); err != nil {
|
||||
@ -557,7 +560,7 @@ func (r *router) Start() error {
|
||||
r.exit = make(chan struct{})
|
||||
|
||||
// registry watcher
|
||||
regWatcher, err := r.opts.Registry.Watch()
|
||||
regWatcher, err := r.options.Registry.Watch()
|
||||
if err != nil {
|
||||
e := fmt.Errorf("failed creating registry watcher: %v", err)
|
||||
r.status = Status{Code: Error, Error: e}
|
||||
@ -669,6 +672,10 @@ func (r *router) Process(a *Advert) error {
|
||||
})
|
||||
|
||||
for _, event := range events {
|
||||
// skip if the router is the origin of this route
|
||||
if event.Route.Router == r.options.Id {
|
||||
continue
|
||||
}
|
||||
// create a copy of the route
|
||||
route := event.Route
|
||||
action := event.Type
|
||||
|
@ -11,29 +11,38 @@ type QueryOptions struct {
|
||||
Gateway string
|
||||
// Network is network address
|
||||
Network string
|
||||
// Router is router id
|
||||
Router string
|
||||
}
|
||||
|
||||
// QueryService sets destination address
|
||||
// QueryService sets service to query
|
||||
func QueryService(s string) QueryOption {
|
||||
return func(o *QueryOptions) {
|
||||
o.Service = s
|
||||
}
|
||||
}
|
||||
|
||||
// QueryGateway sets route gateway
|
||||
// QueryGateway sets gateway address to query
|
||||
func QueryGateway(g string) QueryOption {
|
||||
return func(o *QueryOptions) {
|
||||
o.Gateway = g
|
||||
}
|
||||
}
|
||||
|
||||
// QueryNetwork sets route network address
|
||||
// QueryNetwork sets network name to query
|
||||
func QueryNetwork(n string) QueryOption {
|
||||
return func(o *QueryOptions) {
|
||||
o.Network = n
|
||||
}
|
||||
}
|
||||
|
||||
// QueryRouter sets router id to query
|
||||
func QueryRouter(r string) QueryOption {
|
||||
return func(o *QueryOptions) {
|
||||
o.Router = r
|
||||
}
|
||||
}
|
||||
|
||||
// Query is routing table query
|
||||
type Query interface {
|
||||
// Options returns query options
|
||||
@ -52,6 +61,7 @@ func NewQuery(opts ...QueryOption) Query {
|
||||
Service: "*",
|
||||
Gateway: "*",
|
||||
Network: "*",
|
||||
Router: "*",
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
@ -67,8 +77,3 @@ func NewQuery(opts ...QueryOption) Query {
|
||||
func (q *query) Options() QueryOptions {
|
||||
return q.opts
|
||||
}
|
||||
|
||||
// String prints routing table query in human readable form
|
||||
func (q query) String() string {
|
||||
return "query"
|
||||
}
|
||||
|
@ -7,9 +7,9 @@ import (
|
||||
var (
|
||||
// DefaultLink is default network link
|
||||
DefaultLink = "local"
|
||||
// DefaultLocalMetric is default route cost metric for the local network
|
||||
// DefaultLocalMetric is default route cost for a local route
|
||||
DefaultLocalMetric = 1
|
||||
// DefaultNetworkMetric is default route cost metric for the micro network
|
||||
// DefaultNetworkMetric is default route cost for a network route
|
||||
DefaultNetworkMetric = 10
|
||||
)
|
||||
|
||||
@ -23,6 +23,8 @@ type Route struct {
|
||||
Gateway string
|
||||
// Network is network address
|
||||
Network string
|
||||
// Router is router id
|
||||
Router string
|
||||
// Link is network link
|
||||
Link string
|
||||
// Metric is the route cost metric
|
||||
@ -33,6 +35,6 @@ type Route struct {
|
||||
func (r *Route) Hash() uint64 {
|
||||
h := fnv.New64()
|
||||
h.Reset()
|
||||
h.Write([]byte(r.Service + r.Address + r.Gateway + r.Network + r.Link))
|
||||
h.Write([]byte(r.Service + r.Address + r.Gateway + r.Network + r.Router + r.Link))
|
||||
return h.Sum64()
|
||||
}
|
||||
|
@ -8,7 +8,14 @@ import (
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// table is an in memory routing table
|
||||
var (
|
||||
// ErrRouteNotFound is returned when no route was found in the routing table
|
||||
ErrRouteNotFound = errors.New("route not found")
|
||||
// ErrDuplicateRoute is returned when the route already exists
|
||||
ErrDuplicateRoute = errors.New("duplicate route")
|
||||
)
|
||||
|
||||
// table is an in-memory routing table
|
||||
type table struct {
|
||||
sync.RWMutex
|
||||
// routes stores service routes
|
||||
@ -25,6 +32,19 @@ func newTable(opts ...Option) *table {
|
||||
}
|
||||
}
|
||||
|
||||
// sendEvent sends events to all subscribed watchers
|
||||
func (t *table) sendEvent(e *Event) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
for _, w := range t.watchers {
|
||||
select {
|
||||
case w.resChan <- e:
|
||||
case <-w.done:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create creates new route in the routing table
|
||||
func (t *table) Create(r Route) error {
|
||||
service := r.Service
|
||||
@ -106,21 +126,23 @@ func (t *table) List() ([]Route, error) {
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
// isMatch checks if the route matches given network and router
|
||||
func isMatch(route Route, network, router string) bool {
|
||||
if network == "*" || network == route.Network {
|
||||
if router == "*" || router == route.Gateway {
|
||||
return true
|
||||
// isMatch checks if the route matches given query options
|
||||
func isMatch(route Route, gateway, network, router string) bool {
|
||||
if gateway == "*" || gateway == route.Gateway {
|
||||
if network == "*" || network == route.Network {
|
||||
if router == "*" || router == route.Router {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// findRoutes finds all the routes for given network and router and returns them
|
||||
func findRoutes(routes map[uint64]Route, network, router string) []Route {
|
||||
func findRoutes(routes map[uint64]Route, gateway, network, router string) []Route {
|
||||
var results []Route
|
||||
for _, route := range routes {
|
||||
if isMatch(route, network, router) {
|
||||
if isMatch(route, gateway, network, router) {
|
||||
results = append(results, route)
|
||||
}
|
||||
}
|
||||
@ -136,13 +158,13 @@ func (t *table) Query(q Query) ([]Route, error) {
|
||||
if _, ok := t.routes[q.Options().Service]; !ok {
|
||||
return nil, ErrRouteNotFound
|
||||
}
|
||||
return findRoutes(t.routes[q.Options().Service], q.Options().Network, q.Options().Gateway), nil
|
||||
return findRoutes(t.routes[q.Options().Service], q.Options().Gateway, q.Options().Network, q.Options().Router), nil
|
||||
}
|
||||
|
||||
var results []Route
|
||||
// search through all destinations
|
||||
for _, routes := range t.routes {
|
||||
results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...)
|
||||
results = append(results, findRoutes(routes, q.Options().Gateway, q.Options().Network, q.Options().Router)...)
|
||||
}
|
||||
|
||||
return results, nil
|
||||
@ -181,23 +203,3 @@ func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// sendEvent sends events to all subscribed watchers
|
||||
func (t *table) sendEvent(e *Event) {
|
||||
t.RLock()
|
||||
defer t.RUnlock()
|
||||
|
||||
for _, w := range t.watchers {
|
||||
select {
|
||||
case w.resChan <- e:
|
||||
case <-w.done:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrRouteNotFound is returned when no route was found in the routing table
|
||||
ErrRouteNotFound = errors.New("route not found")
|
||||
// ErrDuplicateRoute is returned when the route already exists
|
||||
ErrDuplicateRoute = errors.New("duplicate route")
|
||||
)
|
||||
|
@ -9,6 +9,7 @@ func testSetup() (*table, Route) {
|
||||
Service: "dest.svc",
|
||||
Gateway: "dest.gw",
|
||||
Network: "dest.network",
|
||||
Router: "src.router",
|
||||
Link: "det.link",
|
||||
Metric: 10,
|
||||
}
|
||||
@ -109,11 +110,13 @@ func TestQuery(t *testing.T) {
|
||||
svc := []string{"svc1", "svc2", "svc3"}
|
||||
net := []string{"net1", "net2", "net1"}
|
||||
gw := []string{"gw1", "gw2", "gw3"}
|
||||
rtr := []string{"rtr1", "rt2", "rt3"}
|
||||
|
||||
for i := 0; i < len(svc); i++ {
|
||||
route.Service = svc[i]
|
||||
route.Network = net[i]
|
||||
route.Gateway = gw[i]
|
||||
route.Router = rtr[i]
|
||||
if err := table.Create(route); err != nil {
|
||||
t.Errorf("error adding route: %s", err)
|
||||
}
|
||||
@ -127,8 +130,9 @@ func TestQuery(t *testing.T) {
|
||||
t.Errorf("error looking up routes: %s", err)
|
||||
}
|
||||
|
||||
// query particular net
|
||||
query = NewQuery(QueryNetwork("net1"))
|
||||
// query routes particular network
|
||||
network := "net1"
|
||||
query = NewQuery(QueryNetwork(network))
|
||||
|
||||
routes, err = table.Query(query)
|
||||
if err != nil {
|
||||
@ -139,7 +143,13 @@ func TestQuery(t *testing.T) {
|
||||
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 2, len(routes))
|
||||
}
|
||||
|
||||
// query particular gateway
|
||||
for _, route := range routes {
|
||||
if route.Network != network {
|
||||
t.Errorf("incorrect route returned. Expected network: %s, found: %s", network, route.Network)
|
||||
}
|
||||
}
|
||||
|
||||
// query routes for particular gateway
|
||||
gateway := "gw1"
|
||||
query = NewQuery(QueryGateway(gateway))
|
||||
|
||||
@ -156,11 +166,28 @@ func TestQuery(t *testing.T) {
|
||||
t.Errorf("incorrect route returned. Expected gateway: %s, found: %s", gateway, routes[0].Gateway)
|
||||
}
|
||||
|
||||
// query particular route
|
||||
network := "net1"
|
||||
// query routes for particular router
|
||||
router := "rtr1"
|
||||
query = NewQuery(QueryRouter(router))
|
||||
|
||||
routes, err = table.Query(query)
|
||||
if err != nil {
|
||||
t.Errorf("error looking up routes: %s", err)
|
||||
}
|
||||
|
||||
if len(routes) != 1 {
|
||||
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", 1, len(routes))
|
||||
}
|
||||
|
||||
if routes[0].Router != router {
|
||||
t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router)
|
||||
}
|
||||
|
||||
// query particular gateway and network
|
||||
query = NewQuery(
|
||||
QueryGateway(gateway),
|
||||
QueryNetwork(network),
|
||||
QueryRouter(router),
|
||||
)
|
||||
|
||||
routes, err = table.Query(query)
|
||||
@ -180,7 +207,11 @@ func TestQuery(t *testing.T) {
|
||||
t.Errorf("incorrect network returned. Expected network: %s, found: %s", network, routes[0].Network)
|
||||
}
|
||||
|
||||
// bullshit route query
|
||||
if routes[0].Router != router {
|
||||
t.Errorf("incorrect route returned. Expected router: %s, found: %s", router, routes[0].Router)
|
||||
}
|
||||
|
||||
// non-existen route query
|
||||
query = NewQuery(QueryService("foobar"))
|
||||
|
||||
routes, err = table.Query(query)
|
||||
|
@ -6,6 +6,11 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrWatcherStopped is returned when routing table watcher has been stopped
|
||||
ErrWatcherStopped = errors.New("watcher stopped")
|
||||
)
|
||||
|
||||
// EventType defines routing table event
|
||||
type EventType int
|
||||
|
||||
@ -42,9 +47,6 @@ type Event struct {
|
||||
Route Route
|
||||
}
|
||||
|
||||
// WatchOption is used to define what routes to watch in the table
|
||||
type WatchOption func(*WatchOptions)
|
||||
|
||||
// Watcher defines routing table watcher interface
|
||||
// Watcher returns updates to the routing table
|
||||
type Watcher interface {
|
||||
@ -56,7 +58,11 @@ type Watcher interface {
|
||||
Stop()
|
||||
}
|
||||
|
||||
// WatchOption is used to define what routes to watch in the table
|
||||
type WatchOption func(*WatchOptions)
|
||||
|
||||
// WatchOptions are table watcher options
|
||||
// TODO: expand the options to watch based on other criteria
|
||||
type WatchOptions struct {
|
||||
// Service allows to watch specific service routes
|
||||
Service string
|
||||
@ -70,6 +76,7 @@ func WatchService(s string) WatchOption {
|
||||
}
|
||||
}
|
||||
|
||||
// tableWatcher implements routing table Watcher
|
||||
type tableWatcher struct {
|
||||
sync.RWMutex
|
||||
id string
|
||||
@ -113,8 +120,3 @@ func (w *tableWatcher) Stop() {
|
||||
close(w.done)
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrWatcherStopped is returned when routing table watcher has been stopped
|
||||
ErrWatcherStopped = errors.New("watcher stopped")
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user