1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-23 17:53:05 +02:00

Merge pull request #621 from milosgajdos83/no-table-package

[WIP] No table package. router/service package introduced
This commit is contained in:
Asim Aslam 2019-07-29 12:36:40 +01:00 committed by GitHub
commit 4fc9b9821a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1099 additions and 707 deletions

View File

@ -11,7 +11,6 @@ import (
"github.com/micro/go-micro/client/selector"
"github.com/micro/go-micro/network/router"
pb "github.com/micro/go-micro/network/router/proto"
"github.com/micro/go-micro/network/router/table"
"github.com/micro/go-micro/registry"
)
@ -41,11 +40,11 @@ type clientKey struct{}
type routerKey struct{}
// getRoutes returns the routes whether they are remote or local
func (r *routerSelector) getRoutes(service string) ([]table.Route, error) {
func (r *routerSelector) getRoutes(service string) ([]router.Route, error) {
if !r.remote {
// lookup router for routes for the service
return r.r.Lookup(table.NewQuery(
table.QueryService(service),
return r.r.Lookup(router.NewQuery(
router.QueryService(service),
))
}
@ -102,11 +101,11 @@ func (r *routerSelector) getRoutes(service string) ([]table.Route, error) {
return nil, selector.ErrNoneAvailable
}
var routes []table.Route
var routes []router.Route
// convert from pb to []*router.Route
for _, r := range pbRoutes.Routes {
routes = append(routes, table.Route{
routes = append(routes, router.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,

View File

@ -15,8 +15,6 @@ import (
"github.com/micro/go-micro/network/proxy"
"github.com/micro/go-micro/network/router"
"github.com/micro/go-micro/server"
"github.com/micro/go-micro/network/router/table"
)
// Proxy will transparently proxy requests to an endpoint.
@ -36,7 +34,7 @@ type Proxy struct {
// A fib of routes service:address
sync.RWMutex
Routes map[string]map[uint64]table.Route
Routes map[string]map[uint64]router.Route
// The channel to monitor watcher errors
errChan chan error
@ -78,7 +76,7 @@ func readLoop(r server.Request, s client.Stream) error {
}
// toNodes returns a list of node addresses from given routes
func toNodes(routes map[uint64]table.Route) []string {
func toNodes(routes map[uint64]router.Route) []string {
var nodes []string
for _, node := range routes {
address := node.Address
@ -98,7 +96,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
p.Unlock()
return toNodes(routes), nil
}
p.Routes[service] = make(map[uint64]table.Route)
p.Routes[service] = make(map[uint64]router.Route)
p.Unlock()
// if the router is broken return error
@ -107,7 +105,7 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
}
// lookup the routes in the router
results, err := p.Router.Lookup(table.NewQuery(table.QueryService(service)))
results, err := p.Router.Lookup(router.NewQuery(router.QueryService(service)))
if err != nil {
return nil, err
}
@ -124,11 +122,11 @@ func (p *Proxy) getRoute(service string) ([]string, error) {
}
// manageRouteCache applies action on a given route to Proxy route cache
func (p *Proxy) manageRouteCache(route table.Route, action string) error {
func (p *Proxy) manageRouteCache(route router.Route, action string) error {
switch action {
case "create", "update":
if _, ok := p.Routes[route.Service]; !ok {
p.Routes[route.Service] = make(map[uint64]table.Route)
p.Routes[route.Service] = make(map[uint64]router.Route)
}
p.Routes[route.Service][route.Hash()] = route
case "delete":
@ -317,7 +315,7 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
}
// routes cache
p.Routes = make(map[string]map[uint64]table.Route)
p.Routes = make(map[string]map[uint64]router.Route)
// watch router service routes
p.errChan = make(chan error, 1)

View File

@ -8,7 +8,6 @@ import (
"sync"
"time"
"github.com/micro/go-micro/network/router/table"
"github.com/micro/go-micro/registry"
)
@ -43,12 +42,12 @@ var (
// router implements default router
type router struct {
// embed the table
table.Table
*Table
opts Options
status Status
exit chan struct{}
errChan chan error
eventChan chan *table.Event
eventChan chan *Event
advertChan chan *Advert
advertWg *sync.WaitGroup
wg *sync.WaitGroup
@ -92,18 +91,18 @@ func (r *router) Options() Options {
}
// manageRoute applies action on a given route
func (r *router) manageRoute(route table.Route, action string) error {
func (r *router) manageRoute(route Route, action string) error {
switch action {
case "create":
if err := r.Create(route); err != nil && err != table.ErrDuplicateRoute {
if err := r.Create(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed adding route for service %s: %s", route.Service, err)
}
case "update":
if err := r.Update(route); err != nil && err != table.ErrDuplicateRoute {
if err := r.Update(route); err != nil && err != ErrDuplicateRoute {
return fmt.Errorf("failed updating route for service %s: %s", route.Service, err)
}
case "delete":
if err := r.Delete(route); err != nil && err != table.ErrRouteNotFound {
if err := r.Delete(route); err != nil && err != ErrRouteNotFound {
return fmt.Errorf("failed deleting route for service %s: %s", route.Service, err)
}
default:
@ -121,13 +120,13 @@ func (r *router) manageServiceRoutes(service *registry.Service, action string) e
// take route action on each service node
for _, node := range service.Nodes {
route := table.Route{
route := Route{
Service: service.Name,
Address: node.Address,
Gateway: "",
Network: r.opts.Network,
Link: table.DefaultLink,
Metric: table.DefaultLocalMetric,
Link: DefaultLink,
Metric: DefaultLocalMetric,
}
if err := r.manageRoute(route, action); err != nil {
@ -197,7 +196,7 @@ func (r *router) watchRegistry(w registry.Watcher) error {
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
// It returns error if the locally registered services either fails to be added/deleted to/from network registry.
func (r *router) watchTable(w table.Watcher) error {
func (r *router) watchTable(w Watcher) error {
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
@ -212,7 +211,7 @@ func (r *router) watchTable(w table.Watcher) error {
for {
event, err := w.Next()
if err != nil {
if err != table.ErrWatcherStopped {
if err != ErrWatcherStopped {
watchErr = err
}
break
@ -234,7 +233,7 @@ func (r *router) watchTable(w table.Watcher) error {
// publishAdvert publishes router advert to advert channel
// NOTE: this might cease to be a dedicated method in the future
func (r *router) publishAdvert(advType AdvertType, events []*table.Event) {
func (r *router) publishAdvert(advType AdvertType, events []*Event) {
defer r.advertWg.Done()
a := &Advert{
@ -266,10 +265,10 @@ func (r *router) advertiseTable() error {
return fmt.Errorf("failed listing routes: %s", err)
}
// collect all the added routes before we attempt to add default gateway
events := make([]*table.Event, len(routes))
events := make([]*Event, len(routes))
for i, route := range routes {
event := &table.Event{
Type: table.Update,
event := &Event{
Type: Update,
Timestamp: time.Now(),
Route: route,
}
@ -279,7 +278,7 @@ func (r *router) advertiseTable() error {
// advertise all routes as Update events to subscribers
if len(events) > 0 {
r.advertWg.Add(1)
go r.publishAdvert(Update, events)
go r.publishAdvert(RouteUpdate, events)
}
case <-r.exit:
return nil
@ -289,7 +288,7 @@ func (r *router) advertiseTable() error {
// routeAdvert contains a list of route events to be advertised
type routeAdvert struct {
events []*table.Event
events []*Event
// lastUpdate records the time of the last advert update
lastUpdate time.Time
// penalty is current advert penalty
@ -326,7 +325,7 @@ func (r *router) advertiseEvents() error {
for {
select {
case <-ticker.C:
var events []*table.Event
var events []*Event
// collect all events which are not flapping
for key, advert := range advertMap {
// decay the event penalty
@ -352,7 +351,7 @@ func (r *router) advertiseEvents() error {
if !advert.isSuppressed {
for _, event := range advert.events {
e := new(table.Event)
e := new(Event)
*e = *event
events = append(events, e)
// delete the advert from the advertMap
@ -364,7 +363,7 @@ func (r *router) advertiseEvents() error {
// advertise all Update events to subscribers
if len(events) > 0 {
r.advertWg.Add(1)
go r.publishAdvert(Update, events)
go r.publishAdvert(RouteUpdate, events)
}
case e := <-r.eventChan:
// if event is nil, continue
@ -375,9 +374,9 @@ func (r *router) advertiseEvents() error {
// determine the event penalty
var penalty float64
switch e.Type {
case table.Update:
case Update:
penalty = UpdatePenalty
case table.Delete:
case Delete:
penalty = DeletePenalty
}
@ -386,7 +385,7 @@ func (r *router) advertiseEvents() error {
hash := e.Route.Hash()
advert, ok := advertMap[hash]
if !ok {
events := []*table.Event{e}
events := []*Event{e}
advert = &routeAdvert{
events: events,
penalty: penalty,
@ -432,12 +431,19 @@ func (r *router) watchErrors() {
if r.status.Code != Stopped {
// notify all goroutines to finish
close(r.exit)
// drain the advertise channel
for range r.advertChan {
}
// drain the event channel
for range r.eventChan {
// drain the advertise channel only if advertising
if r.status.Code == Advertising {
// drain the advertise channel
for range r.advertChan {
}
// drain the event channel
for range r.eventChan {
}
}
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
}
if err != nil {
@ -446,7 +452,6 @@ func (r *router) watchErrors() {
}
// Run runs the router.
// It returns error if the router is already running.
func (r *router) run() {
r.Lock()
defer r.Unlock()
@ -462,12 +467,12 @@ func (r *router) run() {
// add default gateway into routing table
if r.opts.Gateway != "" {
// note, the only non-default value is the gateway
route := table.Route{
route := Route{
Service: "*",
Address: "*",
Gateway: r.opts.Gateway,
Network: "*",
Metric: table.DefaultLocalMetric,
Metric: DefaultLocalMetric,
}
if err := r.Create(route); err != nil {
r.status = Status{Code: Error, Error: fmt.Errorf("failed adding default gateway route: %s", err)}
@ -528,10 +533,10 @@ func (r *router) Advertise() (<-chan *Advert, error) {
return nil, fmt.Errorf("failed listing routes: %s", err)
}
// collect all the added routes before we attempt to add default gateway
events := make([]*table.Event, len(routes))
events := make([]*Event, len(routes))
for i, route := range routes {
event := &table.Event{
Type: table.Create,
event := &Event{
Type: Create,
Timestamp: time.Now(),
Route: route,
}
@ -540,7 +545,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
// create advertise and event channels
r.advertChan = make(chan *Advert)
r.eventChan = make(chan *table.Event)
r.eventChan = make(chan *Event)
// advertise your presence
r.advertWg.Add(1)
@ -580,7 +585,7 @@ func (r *router) Advertise() (<-chan *Advert, error) {
func (r *router) Process(a *Advert) error {
// NOTE: event sorting might not be necessary
// copy update events intp new slices
events := make([]*table.Event, len(a.Events))
events := make([]*Event, len(a.Events))
copy(events, a.Events)
// sort events by timestamp
sort.Slice(events, func(i, j int) bool {
@ -617,11 +622,15 @@ func (r *router) Stop() error {
if r.status.Code == Running || r.status.Code == Advertising {
// notify all goroutines to finish
close(r.exit)
// drain the advertise channel
for range r.advertChan {
}
// drain the event channel
for range r.eventChan {
// drain the advertise channel only if advertising
if r.status.Code == Advertising {
// drain the advertise channel
for range r.advertChan {
}
// drain the event channel
for range r.eventChan {
}
}
// mark the router as Stopped and set its Error to nil

View File

@ -2,7 +2,6 @@ package router
import (
"github.com/google/uuid"
"github.com/micro/go-micro/network/router/table"
"github.com/micro/go-micro/registry"
)
@ -26,7 +25,7 @@ type Options struct {
// Registry is the local registry
Registry registry.Registry
// Table is routing table
Table table.Table
Table *Table
}
// Id sets Router Id
@ -64,8 +63,8 @@ func Registry(r registry.Registry) Option {
}
}
// Table sets the routing table
func Table(t table.Table) Option {
// RoutingTable sets the routing table
func RoutingTable(t *Table) Option {
return func(o *Options) {
o.Table = t
}
@ -78,6 +77,6 @@ func DefaultOptions() Options {
Address: DefaultAddress,
Network: DefaultNetwork,
Registry: registry.DefaultRegistry,
Table: table.NewTable(),
Table: NewTable(),
}
}

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: go-micro/network/router/proto/router.proto
// source: router.proto
package go_micro_router
@ -34,11 +34,14 @@ var _ server.Option
// Client API for Router service
type RouterService interface {
Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error)
Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error)
List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error)
Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error)
Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error)
Advertise(ctx context.Context, in *AdvertiseRequest, opts ...client.CallOption) (Router_AdvertiseService, error)
Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error)
Create(ctx context.Context, in *Route, opts ...client.CallOption) (*CreateResponse, error)
Delete(ctx context.Context, in *Route, opts ...client.CallOption) (*DeleteResponse, error)
Update(ctx context.Context, in *Route, opts ...client.CallOption) (*UpdateResponse, error)
}
type routerService struct {
@ -59,6 +62,26 @@ func NewRouterService(name string, c client.Client) RouterService {
}
}
func (c *routerService) List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) {
req := c.c.NewRequest(c.name, "Router.List", in)
out := new(ListResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerService) Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) {
req := c.c.NewRequest(c.name, "Router.Lookup", in)
out := new(LookupResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerService) Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error) {
req := c.c.NewRequest(c.name, "Router.Watch", &WatchRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
@ -103,26 +126,6 @@ func (x *routerServiceWatch) Recv() (*Event, error) {
return m, nil
}
func (c *routerService) Lookup(ctx context.Context, in *LookupRequest, opts ...client.CallOption) (*LookupResponse, error) {
req := c.c.NewRequest(c.name, "Router.Lookup", in)
out := new(LookupResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerService) List(ctx context.Context, in *ListRequest, opts ...client.CallOption) (*ListResponse, error) {
req := c.c.NewRequest(c.name, "Router.List", in)
out := new(ListResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerService) Advertise(ctx context.Context, in *AdvertiseRequest, opts ...client.CallOption) (Router_AdvertiseService, error) {
req := c.c.NewRequest(c.name, "Router.Advertise", &AdvertiseRequest{})
stream, err := c.c.Stream(ctx, req, opts...)
@ -177,23 +180,59 @@ func (c *routerService) Process(ctx context.Context, in *Advert, opts ...client.
return out, nil
}
func (c *routerService) Create(ctx context.Context, in *Route, opts ...client.CallOption) (*CreateResponse, error) {
req := c.c.NewRequest(c.name, "Router.Create", in)
out := new(CreateResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerService) Delete(ctx context.Context, in *Route, opts ...client.CallOption) (*DeleteResponse, error) {
req := c.c.NewRequest(c.name, "Router.Delete", in)
out := new(DeleteResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerService) Update(ctx context.Context, in *Route, opts ...client.CallOption) (*UpdateResponse, error) {
req := c.c.NewRequest(c.name, "Router.Update", in)
out := new(UpdateResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Router service
type RouterHandler interface {
Watch(context.Context, *WatchRequest, Router_WatchStream) error
Lookup(context.Context, *LookupRequest, *LookupResponse) error
List(context.Context, *ListRequest, *ListResponse) error
Lookup(context.Context, *LookupRequest, *LookupResponse) error
Watch(context.Context, *WatchRequest, Router_WatchStream) error
Advertise(context.Context, *AdvertiseRequest, Router_AdvertiseStream) error
Process(context.Context, *Advert, *ProcessResponse) error
Create(context.Context, *Route, *CreateResponse) error
Delete(context.Context, *Route, *DeleteResponse) error
Update(context.Context, *Route, *UpdateResponse) error
}
func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.HandlerOption) error {
type router interface {
Watch(ctx context.Context, stream server.Stream) error
Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error
List(ctx context.Context, in *ListRequest, out *ListResponse) error
Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error
Watch(ctx context.Context, stream server.Stream) error
Advertise(ctx context.Context, stream server.Stream) error
Process(ctx context.Context, in *Advert, out *ProcessResponse) error
Create(ctx context.Context, in *Route, out *CreateResponse) error
Delete(ctx context.Context, in *Route, out *DeleteResponse) error
Update(ctx context.Context, in *Route, out *UpdateResponse) error
}
type Router struct {
router
@ -206,6 +245,14 @@ type routerHandler struct {
RouterHandler
}
func (h *routerHandler) List(ctx context.Context, in *ListRequest, out *ListResponse) error {
return h.RouterHandler.List(ctx, in, out)
}
func (h *routerHandler) Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error {
return h.RouterHandler.Lookup(ctx, in, out)
}
func (h *routerHandler) Watch(ctx context.Context, stream server.Stream) error {
m := new(WatchRequest)
if err := stream.Recv(m); err != nil {
@ -241,14 +288,6 @@ func (x *routerWatchStream) Send(m *Event) error {
return x.stream.Send(m)
}
func (h *routerHandler) Lookup(ctx context.Context, in *LookupRequest, out *LookupResponse) error {
return h.RouterHandler.Lookup(ctx, in, out)
}
func (h *routerHandler) List(ctx context.Context, in *ListRequest, out *ListResponse) error {
return h.RouterHandler.List(ctx, in, out)
}
func (h *routerHandler) Advertise(ctx context.Context, stream server.Stream) error {
m := new(AdvertiseRequest)
if err := stream.Recv(m); err != nil {
@ -287,3 +326,15 @@ func (x *routerAdvertiseStream) Send(m *Advert) error {
func (h *routerHandler) Process(ctx context.Context, in *Advert, out *ProcessResponse) error {
return h.RouterHandler.Process(ctx, in, out)
}
func (h *routerHandler) Create(ctx context.Context, in *Route, out *CreateResponse) error {
return h.RouterHandler.Create(ctx, in, out)
}
func (h *routerHandler) Delete(ctx context.Context, in *Route, out *DeleteResponse) error {
return h.RouterHandler.Delete(ctx, in, out)
}
func (h *routerHandler) Update(ctx context.Context, in *Route, out *UpdateResponse) error {
return h.RouterHandler.Update(ctx, in, out)
}

View File

@ -1,13 +1,11 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: go-micro/network/router/proto/router.proto
// source: router.proto
package go_micro_router
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
math "math"
)
@ -45,7 +43,7 @@ func (x AdvertType) String() string {
}
func (AdvertType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{0}
return fileDescriptor_367072455c71aedc, []int{0}
}
// EventType defines the type of event
@ -74,7 +72,79 @@ func (x EventType) String() string {
}
func (EventType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{1}
return fileDescriptor_367072455c71aedc, []int{1}
}
// ListRequest is made to List routes
type ListRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListRequest) Reset() { *m = ListRequest{} }
func (m *ListRequest) String() string { return proto.CompactTextString(m) }
func (*ListRequest) ProtoMessage() {}
func (*ListRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{0}
}
func (m *ListRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListRequest.Unmarshal(m, b)
}
func (m *ListRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListRequest.Marshal(b, m, deterministic)
}
func (m *ListRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListRequest.Merge(m, src)
}
func (m *ListRequest) XXX_Size() int {
return xxx_messageInfo_ListRequest.Size(m)
}
func (m *ListRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ListRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ListRequest proto.InternalMessageInfo
// ListResponse is returned by List
type ListResponse struct {
Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{1}
}
func (m *ListResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListResponse.Unmarshal(m, b)
}
func (m *ListResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListResponse.Marshal(b, m, deterministic)
}
func (m *ListResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListResponse.Merge(m, src)
}
func (m *ListResponse) XXX_Size() int {
return xxx_messageInfo_ListResponse.Size(m)
}
func (m *ListResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ListResponse.DiscardUnknown(m)
}
var xxx_messageInfo_ListResponse proto.InternalMessageInfo
func (m *ListResponse) GetRoutes() []*Route {
if m != nil {
return m.Routes
}
return nil
}
// LookupRequest is made to Lookup
@ -89,7 +159,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} }
func (m *LookupRequest) String() string { return proto.CompactTextString(m) }
func (*LookupRequest) ProtoMessage() {}
func (*LookupRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{0}
return fileDescriptor_367072455c71aedc, []int{2}
}
func (m *LookupRequest) XXX_Unmarshal(b []byte) error {
@ -129,7 +199,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} }
func (m *LookupResponse) String() string { return proto.CompactTextString(m) }
func (*LookupResponse) ProtoMessage() {}
func (*LookupResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{1}
return fileDescriptor_367072455c71aedc, []int{3}
}
func (m *LookupResponse) XXX_Unmarshal(b []byte) error {
@ -168,7 +238,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} }
func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {}
func (*WatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{2}
return fileDescriptor_367072455c71aedc, []int{4}
}
func (m *WatchRequest) XXX_Unmarshal(b []byte) error {
@ -200,7 +270,7 @@ func (m *AdvertiseRequest) Reset() { *m = AdvertiseRequest{} }
func (m *AdvertiseRequest) String() string { return proto.CompactTextString(m) }
func (*AdvertiseRequest) ProtoMessage() {}
func (*AdvertiseRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{3}
return fileDescriptor_367072455c71aedc, []int{5}
}
func (m *AdvertiseRequest) XXX_Unmarshal(b []byte) error {
@ -242,7 +312,7 @@ func (m *Advert) Reset() { *m = Advert{} }
func (m *Advert) String() string { return proto.CompactTextString(m) }
func (*Advert) ProtoMessage() {}
func (*Advert) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{4}
return fileDescriptor_367072455c71aedc, []int{6}
}
func (m *Advert) XXX_Unmarshal(b []byte) error {
@ -309,7 +379,7 @@ func (m *ProcessResponse) Reset() { *m = ProcessResponse{} }
func (m *ProcessResponse) String() string { return proto.CompactTextString(m) }
func (*ProcessResponse) ProtoMessage() {}
func (*ProcessResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{5}
return fileDescriptor_367072455c71aedc, []int{7}
}
func (m *ProcessResponse) XXX_Unmarshal(b []byte) error {
@ -330,6 +400,102 @@ func (m *ProcessResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_ProcessResponse proto.InternalMessageInfo
// CreateResponse is returned by Create
type CreateResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *CreateResponse) Reset() { *m = CreateResponse{} }
func (m *CreateResponse) String() string { return proto.CompactTextString(m) }
func (*CreateResponse) ProtoMessage() {}
func (*CreateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{8}
}
func (m *CreateResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_CreateResponse.Unmarshal(m, b)
}
func (m *CreateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_CreateResponse.Marshal(b, m, deterministic)
}
func (m *CreateResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_CreateResponse.Merge(m, src)
}
func (m *CreateResponse) XXX_Size() int {
return xxx_messageInfo_CreateResponse.Size(m)
}
func (m *CreateResponse) XXX_DiscardUnknown() {
xxx_messageInfo_CreateResponse.DiscardUnknown(m)
}
var xxx_messageInfo_CreateResponse proto.InternalMessageInfo
// DeleteResponse is returned by Delete
type DeleteResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *DeleteResponse) Reset() { *m = DeleteResponse{} }
func (m *DeleteResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteResponse) ProtoMessage() {}
func (*DeleteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{9}
}
func (m *DeleteResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_DeleteResponse.Unmarshal(m, b)
}
func (m *DeleteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_DeleteResponse.Marshal(b, m, deterministic)
}
func (m *DeleteResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_DeleteResponse.Merge(m, src)
}
func (m *DeleteResponse) XXX_Size() int {
return xxx_messageInfo_DeleteResponse.Size(m)
}
func (m *DeleteResponse) XXX_DiscardUnknown() {
xxx_messageInfo_DeleteResponse.DiscardUnknown(m)
}
var xxx_messageInfo_DeleteResponse proto.InternalMessageInfo
// UpdateResponse is returned by Update
type UpdateResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *UpdateResponse) Reset() { *m = UpdateResponse{} }
func (m *UpdateResponse) String() string { return proto.CompactTextString(m) }
func (*UpdateResponse) ProtoMessage() {}
func (*UpdateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{10}
}
func (m *UpdateResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_UpdateResponse.Unmarshal(m, b)
}
func (m *UpdateResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_UpdateResponse.Marshal(b, m, deterministic)
}
func (m *UpdateResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_UpdateResponse.Merge(m, src)
}
func (m *UpdateResponse) XXX_Size() int {
return xxx_messageInfo_UpdateResponse.Size(m)
}
func (m *UpdateResponse) XXX_DiscardUnknown() {
xxx_messageInfo_UpdateResponse.DiscardUnknown(m)
}
var xxx_messageInfo_UpdateResponse proto.InternalMessageInfo
// Event is routing table event
type Event struct {
// type of event
@ -347,7 +513,7 @@ func (m *Event) Reset() { *m = Event{} }
func (m *Event) String() string { return proto.CompactTextString(m) }
func (*Event) ProtoMessage() {}
func (*Event) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{6}
return fileDescriptor_367072455c71aedc, []int{11}
}
func (m *Event) XXX_Unmarshal(b []byte) error {
@ -389,82 +555,14 @@ func (m *Event) GetRoute() *Route {
return nil
}
// ListRequest is made to List routes
type ListRequest struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListRequest) Reset() { *m = ListRequest{} }
func (m *ListRequest) String() string { return proto.CompactTextString(m) }
func (*ListRequest) ProtoMessage() {}
func (*ListRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{7}
}
func (m *ListRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListRequest.Unmarshal(m, b)
}
func (m *ListRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListRequest.Marshal(b, m, deterministic)
}
func (m *ListRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListRequest.Merge(m, src)
}
func (m *ListRequest) XXX_Size() int {
return xxx_messageInfo_ListRequest.Size(m)
}
func (m *ListRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ListRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ListRequest proto.InternalMessageInfo
// ListResponse is returned by List
type ListResponse struct {
Routes []*Route `protobuf:"bytes,1,rep,name=routes,proto3" json:"routes,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{8}
}
func (m *ListResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_ListResponse.Unmarshal(m, b)
}
func (m *ListResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_ListResponse.Marshal(b, m, deterministic)
}
func (m *ListResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_ListResponse.Merge(m, src)
}
func (m *ListResponse) XXX_Size() int {
return xxx_messageInfo_ListResponse.Size(m)
}
func (m *ListResponse) XXX_DiscardUnknown() {
xxx_messageInfo_ListResponse.DiscardUnknown(m)
}
var xxx_messageInfo_ListResponse proto.InternalMessageInfo
func (m *ListResponse) GetRoutes() []*Route {
if m != nil {
return m.Routes
}
return nil
}
// Query is passed in a LookupRequest
type Query struct {
// service to lookup
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
Service string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"`
// gateway to lookup
Gateway string `protobuf:"bytes,2,opt,name=gateway,proto3" json:"gateway,omitempty"`
// network to lookup
Network string `protobuf:"bytes,3,opt,name=network,proto3" json:"network,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -474,7 +572,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{9}
return fileDescriptor_367072455c71aedc, []int{12}
}
func (m *Query) XXX_Unmarshal(b []byte) error {
@ -502,6 +600,20 @@ func (m *Query) GetService() string {
return ""
}
func (m *Query) GetGateway() string {
if m != nil {
return m.Gateway
}
return ""
}
func (m *Query) GetNetwork() string {
if m != nil {
return m.Network
}
return ""
}
// Route is a service route
type Route struct {
// service for the route
@ -525,7 +637,7 @@ func (m *Route) Reset() { *m = Route{} }
func (m *Route) String() string { return proto.CompactTextString(m) }
func (*Route) ProtoMessage() {}
func (*Route) Descriptor() ([]byte, []int) {
return fileDescriptor_fc08514fc6dadd29, []int{10}
return fileDescriptor_367072455c71aedc, []int{13}
}
func (m *Route) XXX_Unmarshal(b []byte) error {
@ -591,317 +703,61 @@ func (m *Route) GetMetric() int64 {
func init() {
proto.RegisterEnum("go.micro.router.AdvertType", AdvertType_name, AdvertType_value)
proto.RegisterEnum("go.micro.router.EventType", EventType_name, EventType_value)
proto.RegisterType((*ListRequest)(nil), "go.micro.router.ListRequest")
proto.RegisterType((*ListResponse)(nil), "go.micro.router.ListResponse")
proto.RegisterType((*LookupRequest)(nil), "go.micro.router.LookupRequest")
proto.RegisterType((*LookupResponse)(nil), "go.micro.router.LookupResponse")
proto.RegisterType((*WatchRequest)(nil), "go.micro.router.WatchRequest")
proto.RegisterType((*AdvertiseRequest)(nil), "go.micro.router.AdvertiseRequest")
proto.RegisterType((*Advert)(nil), "go.micro.router.Advert")
proto.RegisterType((*ProcessResponse)(nil), "go.micro.router.ProcessResponse")
proto.RegisterType((*CreateResponse)(nil), "go.micro.router.CreateResponse")
proto.RegisterType((*DeleteResponse)(nil), "go.micro.router.DeleteResponse")
proto.RegisterType((*UpdateResponse)(nil), "go.micro.router.UpdateResponse")
proto.RegisterType((*Event)(nil), "go.micro.router.Event")
proto.RegisterType((*ListRequest)(nil), "go.micro.router.ListRequest")
proto.RegisterType((*ListResponse)(nil), "go.micro.router.ListResponse")
proto.RegisterType((*Query)(nil), "go.micro.router.Query")
proto.RegisterType((*Route)(nil), "go.micro.router.Route")
}
func init() {
proto.RegisterFile("go-micro/network/router/proto/router.proto", fileDescriptor_fc08514fc6dadd29)
}
var fileDescriptor_fc08514fc6dadd29 = []byte{
// 553 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0x8d, 0x9d, 0xd8, 0x95, 0xa7, 0x6d, 0x1a, 0xe6, 0x50, 0x2c, 0xd3, 0x42, 0xea, 0x53, 0x55,
0x15, 0x07, 0x85, 0x33, 0x88, 0x02, 0xe5, 0xd2, 0x1e, 0xc0, 0x02, 0x71, 0x36, 0xf6, 0x28, 0x58,
0x49, 0xbc, 0xee, 0xee, 0x26, 0x55, 0xce, 0x7c, 0x06, 0x5f, 0xc0, 0x07, 0x72, 0x47, 0x3b, 0xb6,
0x13, 0x48, 0xea, 0x0b, 0xa7, 0xec, 0x9b, 0xf7, 0x66, 0x3d, 0x33, 0x3b, 0x2f, 0x70, 0x31, 0x11,
0xcf, 0xe7, 0x79, 0x2a, 0xc5, 0xa8, 0x20, 0x7d, 0x2f, 0xe4, 0x74, 0x24, 0xc5, 0x42, 0x93, 0x1c,
0x95, 0x52, 0x68, 0x51, 0x83, 0x88, 0x01, 0x1e, 0x4d, 0x44, 0xc4, 0xda, 0xa8, 0x0a, 0x87, 0xaf,
0xe0, 0xf0, 0x56, 0x88, 0xe9, 0xa2, 0x8c, 0xe9, 0x6e, 0x41, 0x4a, 0xe3, 0x25, 0x38, 0x77, 0x0b,
0x92, 0x2b, 0xdf, 0x1a, 0x5a, 0xe7, 0xfb, 0xe3, 0xe3, 0x68, 0x2b, 0x23, 0xfa, 0x64, 0xd8, 0xb8,
0x12, 0x85, 0x6f, 0xa0, 0xdf, 0xa4, 0xab, 0x52, 0x14, 0x8a, 0x30, 0x02, 0x97, 0x85, 0xca, 0xb7,
0x86, 0xdd, 0x07, 0x2f, 0x88, 0xcd, 0x4f, 0x5c, 0xab, 0xc2, 0x3e, 0x1c, 0x7c, 0x4d, 0x74, 0xfa,
0xbd, 0xfe, 0x7e, 0x88, 0x30, 0xb8, 0xca, 0x96, 0x24, 0x75, 0xae, 0xa8, 0x89, 0xfd, 0xb2, 0xc0,
0xad, 0x82, 0xd8, 0x07, 0x3b, 0xcf, 0xb8, 0x36, 0x2f, 0xb6, 0xf3, 0x0c, 0x47, 0xd0, 0xd3, 0xab,
0x92, 0x7c, 0x7b, 0x68, 0x9d, 0xf7, 0xc7, 0x4f, 0x76, 0x3e, 0x56, 0xa5, 0x7d, 0x5e, 0x95, 0x14,
0xb3, 0x10, 0x4f, 0xc0, 0xd3, 0xf9, 0x9c, 0x94, 0x4e, 0xe6, 0xa5, 0xdf, 0x1d, 0x5a, 0xe7, 0xdd,
0x78, 0x13, 0xc0, 0x01, 0x74, 0xb5, 0x9e, 0xf9, 0x3d, 0x8e, 0x9b, 0xa3, 0xe9, 0x87, 0x96, 0x54,
0x68, 0xe5, 0x3b, 0x2d, 0xfd, 0x5c, 0x1b, 0x3a, 0xae, 0x55, 0xe1, 0x23, 0x38, 0xfa, 0x28, 0x45,
0x4a, 0x4a, 0x35, 0x23, 0x09, 0x7f, 0x58, 0xe0, 0xb0, 0x08, 0xa3, 0xba, 0x5a, 0x8b, 0xab, 0x0d,
0x1e, 0xbe, 0xaa, 0xad, 0x58, 0x7b, 0xbb, 0xd8, 0x4b, 0x70, 0x38, 0x8f, 0xdb, 0x68, 0x9f, 0x74,
0x25, 0x0a, 0x0f, 0x61, 0xff, 0x36, 0x57, 0xba, 0x99, 0xe9, 0x6b, 0x38, 0xa8, 0xe0, 0x7f, 0xbe,
0xdb, 0x19, 0x38, 0xbc, 0x09, 0xe8, 0xc3, 0x9e, 0x22, 0xb9, 0xcc, 0x53, 0xaa, 0x9f, 0xa5, 0x81,
0xe1, 0x4f, 0x0b, 0x1c, 0x4e, 0x6a, 0xd7, 0x18, 0x26, 0xc9, 0x32, 0x49, 0x4a, 0x71, 0x7f, 0x5e,
0xdc, 0x40, 0xc3, 0x4c, 0x12, 0x4d, 0xf7, 0xc9, 0x8a, 0xfb, 0xf3, 0xe2, 0x06, 0x1a, 0xa6, 0xde,
0x74, 0x7e, 0x28, 0x2f, 0x6e, 0x20, 0x22, 0xf4, 0x66, 0x79, 0x31, 0xf5, 0x1d, 0x0e, 0xf3, 0x19,
0x8f, 0xc1, 0x9d, 0x93, 0x96, 0x79, 0xea, 0xbb, 0x3c, 0xc0, 0x1a, 0x5d, 0x8c, 0x01, 0x36, 0xcb,
0x81, 0x08, 0xfd, 0x0a, 0x5d, 0x15, 0x85, 0x58, 0x14, 0x29, 0x0d, 0x3a, 0x38, 0x80, 0x83, 0x2a,
0xf6, 0xa5, 0xcc, 0x12, 0x4d, 0x03, 0xeb, 0x62, 0x04, 0xde, 0xfa, 0x89, 0x10, 0xc0, 0x7d, 0x27,
0xc9, 0x10, 0x1d, 0x73, 0x7e, 0x4f, 0x33, 0x32, 0x22, 0x73, 0xae, 0x13, 0xec, 0xf1, 0x6f, 0x1b,
0x5c, 0x1e, 0x81, 0xc4, 0xb7, 0xe0, 0xf0, 0xa2, 0xe3, 0xe9, 0xce, 0x64, 0xff, 0x36, 0x40, 0xd0,
0xb2, 0x60, 0x61, 0xe7, 0x85, 0x85, 0x37, 0xe0, 0x56, 0x76, 0xc3, 0xa7, 0x3b, 0xaa, 0x7f, 0x6c,
0x1c, 0x3c, 0x6b, 0xe5, 0xeb, 0xa5, 0xec, 0xe0, 0x35, 0xf4, 0xcc, 0x06, 0xe0, 0xc9, 0xae, 0x74,
0xb3, 0x27, 0xc1, 0x69, 0x0b, 0xbb, 0xbe, 0xe6, 0x06, 0xbc, 0xb5, 0x61, 0xf1, 0xac, 0xc5, 0x80,
0x1b, 0x33, 0x07, 0x8f, 0x5b, 0x24, 0xdc, 0xe0, 0x07, 0xd8, 0xab, 0xdd, 0x83, 0x6d, 0xba, 0x60,
0xb8, 0x43, 0x6c, 0x1b, 0xae, 0xf3, 0xcd, 0xe5, 0xbf, 0xbb, 0x97, 0x7f, 0x02, 0x00, 0x00, 0xff,
0xff, 0x99, 0x8e, 0xb9, 0x97, 0x1c, 0x05, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// RouterClient is the client API for Router service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type RouterClient interface {
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Router_WatchClient, error)
Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error)
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
Advertise(ctx context.Context, in *AdvertiseRequest, opts ...grpc.CallOption) (Router_AdvertiseClient, error)
Process(ctx context.Context, in *Advert, opts ...grpc.CallOption) (*ProcessResponse, error)
}
type routerClient struct {
cc *grpc.ClientConn
}
func NewRouterClient(cc *grpc.ClientConn) RouterClient {
return &routerClient{cc}
}
func (c *routerClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (Router_WatchClient, error) {
stream, err := c.cc.NewStream(ctx, &_Router_serviceDesc.Streams[0], "/go.micro.router.Router/Watch", opts...)
if err != nil {
return nil, err
}
x := &routerWatchClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Router_WatchClient interface {
Recv() (*Event, error)
grpc.ClientStream
}
type routerWatchClient struct {
grpc.ClientStream
}
func (x *routerWatchClient) Recv() (*Event, error) {
m := new(Event)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *routerClient) Lookup(ctx context.Context, in *LookupRequest, opts ...grpc.CallOption) (*LookupResponse, error) {
out := new(LookupResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Router/Lookup", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
out := new(ListResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Router/List", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *routerClient) Advertise(ctx context.Context, in *AdvertiseRequest, opts ...grpc.CallOption) (Router_AdvertiseClient, error) {
stream, err := c.cc.NewStream(ctx, &_Router_serviceDesc.Streams[1], "/go.micro.router.Router/Advertise", opts...)
if err != nil {
return nil, err
}
x := &routerAdvertiseClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Router_AdvertiseClient interface {
Recv() (*Advert, error)
grpc.ClientStream
}
type routerAdvertiseClient struct {
grpc.ClientStream
}
func (x *routerAdvertiseClient) Recv() (*Advert, error) {
m := new(Advert)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *routerClient) Process(ctx context.Context, in *Advert, opts ...grpc.CallOption) (*ProcessResponse, error) {
out := new(ProcessResponse)
err := c.cc.Invoke(ctx, "/go.micro.router.Router/Process", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// RouterServer is the server API for Router service.
type RouterServer interface {
Watch(*WatchRequest, Router_WatchServer) error
Lookup(context.Context, *LookupRequest) (*LookupResponse, error)
List(context.Context, *ListRequest) (*ListResponse, error)
Advertise(*AdvertiseRequest, Router_AdvertiseServer) error
Process(context.Context, *Advert) (*ProcessResponse, error)
}
func RegisterRouterServer(s *grpc.Server, srv RouterServer) {
s.RegisterService(&_Router_serviceDesc, srv)
}
func _Router_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(WatchRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RouterServer).Watch(m, &routerWatchServer{stream})
}
type Router_WatchServer interface {
Send(*Event) error
grpc.ServerStream
}
type routerWatchServer struct {
grpc.ServerStream
}
func (x *routerWatchServer) Send(m *Event) error {
return x.ServerStream.SendMsg(m)
}
func _Router_Lookup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(LookupRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RouterServer).Lookup(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Router/Lookup",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RouterServer).Lookup(ctx, req.(*LookupRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Router_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RouterServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Router/List",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RouterServer).List(ctx, req.(*ListRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Router_Advertise_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(AdvertiseRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RouterServer).Advertise(m, &routerAdvertiseServer{stream})
}
type Router_AdvertiseServer interface {
Send(*Advert) error
grpc.ServerStream
}
type routerAdvertiseServer struct {
grpc.ServerStream
}
func (x *routerAdvertiseServer) Send(m *Advert) error {
return x.ServerStream.SendMsg(m)
}
func _Router_Process_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Advert)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(RouterServer).Process(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/go.micro.router.Router/Process",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RouterServer).Process(ctx, req.(*Advert))
}
return interceptor(ctx, in, info, handler)
}
var _Router_serviceDesc = grpc.ServiceDesc{
ServiceName: "go.micro.router.Router",
HandlerType: (*RouterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Lookup",
Handler: _Router_Lookup_Handler,
},
{
MethodName: "List",
Handler: _Router_List_Handler,
},
{
MethodName: "Process",
Handler: _Router_Process_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Watch",
Handler: _Router_Watch_Handler,
ServerStreams: true,
},
{
StreamName: "Advertise",
Handler: _Router_Advertise_Handler,
ServerStreams: true,
},
},
Metadata: "go-micro/network/router/proto/router.proto",
func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) }
var fileDescriptor_367072455c71aedc = []byte{
// 591 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xc1, 0x6e, 0xd3, 0x40,
0x10, 0xf5, 0x26, 0xb6, 0x2b, 0x4f, 0x53, 0xd7, 0xcc, 0xa1, 0x58, 0xa6, 0x40, 0xf0, 0xa9, 0xaa,
0x2a, 0x17, 0x85, 0x33, 0x88, 0x52, 0xca, 0xa5, 0x3d, 0x80, 0x45, 0xc5, 0xd9, 0xd8, 0xa3, 0x62,
0xb5, 0xb1, 0xdd, 0xdd, 0x4d, 0xab, 0x9c, 0xf9, 0x0c, 0xbe, 0x80, 0xff, 0xe0, 0xc3, 0x90, 0x77,
0xed, 0xd6, 0x75, 0x62, 0xa4, 0x72, 0xca, 0xce, 0xcc, 0x9b, 0x37, 0x3b, 0x33, 0xfb, 0x62, 0x98,
0xf0, 0x72, 0x21, 0x89, 0x47, 0x15, 0x2f, 0x65, 0x89, 0xdb, 0x17, 0x65, 0x34, 0xcf, 0x53, 0x5e,
0x46, 0xda, 0x1d, 0x6e, 0xc1, 0xe6, 0x59, 0x2e, 0x64, 0x4c, 0xd7, 0x0b, 0x12, 0x32, 0x7c, 0x07,
0x13, 0x6d, 0x8a, 0xaa, 0x2c, 0x04, 0x61, 0x04, 0xb6, 0x02, 0x0a, 0x9f, 0x4d, 0xc7, 0x7b, 0x9b,
0xb3, 0x9d, 0xa8, 0x47, 0x10, 0xc5, 0xf5, 0x4f, 0xdc, 0xa0, 0xc2, 0xb7, 0xb0, 0x75, 0x56, 0x96,
0x97, 0x8b, 0xaa, 0x21, 0xc4, 0x03, 0xb0, 0xae, 0x17, 0xc4, 0x97, 0x3e, 0x9b, 0xb2, 0xb5, 0xf9,
0x5f, 0xea, 0x68, 0xac, 0x41, 0xe1, 0x7b, 0x70, 0xdb, 0xf4, 0xff, 0xbc, 0x80, 0x0b, 0x93, 0x6f,
0x89, 0x4c, 0x7f, 0xb4, 0x0d, 0x21, 0x78, 0x47, 0xd9, 0x0d, 0x71, 0x99, 0x0b, 0x6a, 0x7d, 0xbf,
0x19, 0xd8, 0xda, 0x89, 0x2e, 0x8c, 0xf2, 0x4c, 0xdd, 0xcd, 0x89, 0x47, 0x79, 0x86, 0x87, 0x60,
0xca, 0x65, 0x45, 0xfe, 0x68, 0xca, 0xf6, 0xdc, 0xd9, 0xb3, 0x95, 0x62, 0x3a, 0xed, 0xeb, 0xb2,
0xa2, 0x58, 0x01, 0x71, 0x17, 0x1c, 0x99, 0xcf, 0x49, 0xc8, 0x64, 0x5e, 0xf9, 0xe3, 0x29, 0xdb,
0x1b, 0xc7, 0xf7, 0x0e, 0xf4, 0x60, 0x2c, 0xe5, 0x95, 0x6f, 0x2a, 0x7f, 0x7d, 0xac, 0xfb, 0xa1,
0x1b, 0x2a, 0xa4, 0xf0, 0xad, 0x81, 0x7e, 0x4e, 0xea, 0x70, 0xdc, 0xa0, 0xc2, 0x27, 0xb0, 0xfd,
0x99, 0x97, 0x29, 0x09, 0xd1, 0x8e, 0x24, 0xf4, 0xc0, 0x3d, 0xe6, 0x94, 0x48, 0xea, 0x7a, 0x3e,
0xd2, 0x15, 0x3d, 0xf4, 0x9c, 0x57, 0x59, 0x17, 0xf3, 0x93, 0x81, 0xa5, 0xa8, 0x31, 0x6a, 0x7a,
0x64, 0xaa, 0xc7, 0x60, 0xfd, 0x05, 0x86, 0x5a, 0x1c, 0xf5, 0x5b, 0x3c, 0x00, 0x4b, 0xe5, 0xa9,
0xe6, 0x87, 0xf7, 0xa3, 0x41, 0xe1, 0x39, 0x58, 0x6a, 0xe1, 0xe8, 0xc3, 0x86, 0x20, 0x7e, 0x93,
0xa7, 0xd4, 0x4c, 0xbf, 0x35, 0xeb, 0xc8, 0x45, 0x22, 0xe9, 0x36, 0x59, 0xaa, 0x62, 0x4e, 0xdc,
0x9a, 0x75, 0xa4, 0x20, 0x79, 0x5b, 0xf2, 0x4b, 0x55, 0xcc, 0x89, 0x5b, 0x33, 0xfc, 0xc5, 0xc0,
0x52, 0x75, 0xfe, 0xcd, 0x9b, 0x64, 0x19, 0x27, 0x21, 0x5a, 0xde, 0xc6, 0xec, 0x56, 0x1c, 0x0f,
0x56, 0x34, 0x1f, 0x54, 0x44, 0x04, 0xf3, 0x2a, 0x2f, 0x2e, 0x7d, 0x4b, 0xb9, 0xd5, 0x19, 0x77,
0xc0, 0x9e, 0x93, 0xe4, 0x79, 0xea, 0xdb, 0x6a, 0x4a, 0x8d, 0xb5, 0x3f, 0x03, 0xb8, 0x7f, 0x37,
0x88, 0xe0, 0x6a, 0xeb, 0xa8, 0x28, 0xca, 0x45, 0x91, 0x92, 0x67, 0xa0, 0x07, 0x13, 0xed, 0xd3,
0x4b, 0xf3, 0xd8, 0xfe, 0x21, 0x38, 0x77, 0x7b, 0x40, 0x00, 0x5b, 0x6f, 0xdc, 0x33, 0xea, 0xb3,
0xde, 0xb5, 0xc7, 0xea, 0x73, 0x93, 0x30, 0x9a, 0xfd, 0x31, 0xc1, 0x56, 0x23, 0xe0, 0x78, 0x02,
0x66, 0x2d, 0x62, 0xdc, 0x5d, 0xd9, 0x45, 0x47, 0xea, 0xc1, 0xf3, 0x81, 0x68, 0xf3, 0x5e, 0x0c,
0x3c, 0x05, 0x5b, 0x8b, 0x11, 0x5f, 0xac, 0x42, 0xbb, 0x22, 0x0f, 0x5e, 0x0e, 0xc6, 0xef, 0xc8,
0x3e, 0x80, 0xa5, 0x74, 0x89, 0xab, 0x65, 0xbb, 0x7a, 0x0d, 0x06, 0xf4, 0x10, 0x1a, 0xaf, 0x19,
0x9e, 0x82, 0x73, 0xa7, 0x65, 0x7c, 0x35, 0xa0, 0xcd, 0x7b, 0x9d, 0x07, 0x4f, 0x07, 0x20, 0x8a,
0xec, 0x13, 0x6c, 0x34, 0xc2, 0xc2, 0x21, 0x5c, 0x30, 0x5d, 0x09, 0xf4, 0xb5, 0x68, 0xe0, 0x71,
0xbb, 0x1b, 0x1c, 0x78, 0xfa, 0x6b, 0xa6, 0xd3, 0x93, 0xaf, 0x22, 0xd1, 0x4b, 0x7d, 0x04, 0x49,
0x4f, 0xf1, 0x8a, 0x44, 0xbf, 0x86, 0x47, 0x90, 0xf4, 0xfe, 0x24, 0x8c, 0xef, 0xb6, 0xfa, 0x4e,
0xbc, 0xf9, 0x1b, 0x00, 0x00, 0xff, 0xff, 0xbb, 0x08, 0x6d, 0x39, 0x37, 0x06, 0x00, 0x00,
}

View File

@ -4,11 +4,22 @@ package go.micro.router;
// Router service is used by the proxy to lookup routes
service Router {
rpc Watch(WatchRequest) returns (stream Event) {};
rpc Lookup(LookupRequest) returns (LookupResponse) {};
rpc List(ListRequest) returns (ListResponse) {};
rpc Lookup(LookupRequest) returns (LookupResponse) {};
rpc Watch(WatchRequest) returns (stream Event) {};
rpc Advertise(AdvertiseRequest) returns (stream Advert) {};
rpc Process(Advert) returns (ProcessResponse) {};
rpc Create(Route) returns (CreateResponse) {};
rpc Delete(Route) returns (DeleteResponse) {};
rpc Update(Route) returns (UpdateResponse) {};
}
// ListRequest is made to List routes
message ListRequest {}
// ListResponse is returned by List
message ListResponse {
repeated Route routes = 1;
}
// LookupRequest is made to Lookup
@ -24,7 +35,6 @@ message LookupResponse {
// WatchRequest is made to Watch Router
message WatchRequest {}
// AdvertiseRequest request a stream of Adverts
message AdvertiseRequest {}
@ -51,6 +61,15 @@ message Advert {
// ProcessResponse is returned by Process
message ProcessResponse {}
// CreateResponse is returned by Create
message CreateResponse {}
// DeleteResponse is returned by Delete
message DeleteResponse {}
// UpdateResponse is returned by Update
message UpdateResponse {}
// EventType defines the type of event
enum EventType {
Create = 0;
@ -68,18 +87,14 @@ message Event {
Route route = 3;
}
// ListRequest is made to List routes
message ListRequest {}
// ListResponse is returned by List
message ListResponse {
repeated Route routes = 1;
}
// Query is passed in a LookupRequest
message Query {
// service to lookup
string service = 1;
// gateway to lookup
string gateway = 2;
// network to lookup
string network = 3;
}
// Route is a service route

View File

@ -1,26 +1,4 @@
package table
// LookupPolicy defines query policy
type LookupPolicy int
const (
// DiscardIfNone discards query when no route is found
DiscardIfNone LookupPolicy = iota
// ClosestMatch returns closest match to supplied query
ClosestMatch
)
// String returns human representation of LookupPolicy
func (lp LookupPolicy) String() string {
switch lp {
case DiscardIfNone:
return "DISCARD"
case ClosestMatch:
return "CLOSEST"
default:
return "UNKNOWN"
}
}
package router
// QueryOption sets routing table query options
type QueryOption func(*QueryOptions)
@ -33,8 +11,6 @@ type QueryOptions struct {
Gateway string
// Network is network address
Network string
// Policy is query lookup policy
Policy LookupPolicy
}
// QueryService sets destination address
@ -58,14 +34,6 @@ func QueryNetwork(n string) QueryOption {
}
}
// QueryPolicy sets query policy
// NOTE: this might be renamed to filter or some such
func QueryPolicy(p LookupPolicy) QueryOption {
return func(o *QueryOptions) {
o.Policy = p
}
}
// Query is routing table query
type Query interface {
// Options returns query options
@ -80,12 +48,10 @@ type query struct {
// NewQuery creates new query and returns it
func NewQuery(opts ...QueryOption) Query {
// default options
// NOTE: by default we use DefaultNetworkMetric
qopts := QueryOptions{
Service: "*",
Gateway: "*",
Network: "*",
Policy: DiscardIfNone,
}
for _, o := range opts {

View File

@ -1,4 +1,4 @@
package table
package router
import (
"hash/fnv"

View File

@ -0,0 +1,24 @@
package router
import "testing"
func TestHash(t *testing.T) {
route1 := Route{
Service: "dest.svc",
Gateway: "dest.gw",
Network: "dest.network",
Link: "det.link",
Metric: 10,
}
// make a copy
route2 := route1
route1Hash := route1.Hash()
route2Hash := route2.Hash()
// we should get the same hash
if route1Hash != route2Hash {
t.Errorf("identical routes result in different hashes")
}
}

View File

@ -3,19 +3,17 @@ package router
import (
"time"
"github.com/micro/go-micro/network/router/table"
)
var (
// DefaultRouter is default network router
DefaultRouter = NewRouter()
// DefaultName is default router service name
DefaultName = "go.micro.router"
)
// Router is an interface for a routing control plane
type Router interface {
// Router provides a routing table
table.Table
// Init initializes the router with options
Init(...Option) error
// Options returns the router options
@ -24,6 +22,18 @@ type Router interface {
Advertise() (<-chan *Advert, error)
// Process processes incoming adverts
Process(*Advert) error
// Create new route in the routing table
Create(Route) error
// Delete existing route from the routing table
Delete(Route) error
// Update exiting route in the routing table
Update(Route) error
// List lists all routes in the routing table
List() ([]Route, error)
// Lookup queries routes in the routing table
Lookup(Query) ([]Route, error)
// Watch returns a watcher which tracks updates to the routing table
Watch(opts ...WatchOption) (Watcher, error)
// Status returns router status
Status() Status
// Stop stops the router
@ -49,6 +59,21 @@ const (
Error
)
func (s StatusCode) String() string {
switch s {
case Running:
return "running"
case Advertising:
return "advertising"
case Stopped:
return "stopped"
case Error:
return "error"
default:
return "unknown"
}
}
// Status is router status
type Status struct {
// Error is router error
@ -63,10 +88,22 @@ type AdvertType int
const (
// Announce is advertised when the router announces itself
Announce AdvertType = iota
// Update advertises route updates
Update
// RouteUpdate advertises route updates
RouteUpdate
)
// String returns human readable advertisement type
func (t AdvertType) String() string {
switch t {
case Announce:
return "announce"
case RouteUpdate:
return "update"
default:
return "unknown"
}
}
// Advert contains a list of events advertised by the router to the network
type Advert struct {
// Id is the router Id
@ -78,7 +115,7 @@ type Advert struct {
// TTL is Advert TTL
TTL time.Duration
// Events is a list of routing table events to advertise
Events []*table.Event
Events []*Event
}
// NewRouter creates new Router and returns it

View File

@ -0,0 +1,493 @@
package service
import (
"context"
"fmt"
"io"
"sync"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/client"
"github.com/micro/go-micro/network/router"
pb "github.com/micro/go-micro/network/router/proto"
)
type svc struct {
opts router.Options
router pb.RouterService
status router.Status
watchers map[string]*svcWatcher
exit chan struct{}
errChan chan error
advertChan chan *router.Advert
wg *sync.WaitGroup
sync.RWMutex
}
// NewRouter creates new service router and returns it
func NewRouter(opts ...router.Option) router.Router {
// get default options
options := router.DefaultOptions()
// apply requested options
for _, o := range opts {
o(&options)
}
// NOTE: might need some client opts here
client := client.DefaultClient
// NOTE: should we have Client/Service option in router.Options?
s := &svc{
opts: options,
router: pb.NewRouterService(router.DefaultName, client),
status: router.Status{Code: router.Stopped, Error: nil},
watchers: make(map[string]*svcWatcher),
wg: &sync.WaitGroup{},
}
go s.run()
return s
}
// Init initializes router with given options
func (s *svc) Init(opts ...router.Option) error {
for _, o := range opts {
o(&s.opts)
}
return nil
}
// Options returns router options
func (s *svc) Options() router.Options {
return s.opts
}
// watchRouter watches router and send events to all registered watchers
func (s *svc) watchRouter(stream pb.Router_WatchService) error {
s.wg.Add(1)
go func() {
defer s.wg.Done()
<-s.exit
stream.Close()
}()
var watchErr error
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
watchErr = err
}
break
}
route := router.Route{
Service: resp.Route.Service,
Address: resp.Route.Address,
Gateway: resp.Route.Gateway,
Network: resp.Route.Network,
Link: resp.Route.Link,
Metric: int(resp.Route.Metric),
}
event := &router.Event{
Type: router.EventType(resp.Type),
Timestamp: time.Unix(0, resp.Timestamp),
Route: route,
}
// TODO: might make this non-blocking
s.RLock()
for _, w := range s.watchers {
select {
case w.resChan <- event:
case <-w.done:
}
}
s.RUnlock()
}
return watchErr
}
// watchErrors watches router errors and takes appropriate actions
func (s *svc) watchErrors() {
var err error
select {
case <-s.exit:
case err = <-s.errChan:
}
s.Lock()
defer s.Unlock()
if s.status.Code != router.Stopped {
// notify all goroutines to finish
close(s.exit)
if s.status.Code == router.Advertising {
// drain the advertise channel
for range s.advertChan {
}
}
s.status = router.Status{Code: router.Stopped, Error: nil}
}
if err != nil {
s.status = router.Status{Code: router.Error, Error: err}
}
}
// Run runs the router.
func (s *svc) run() {
s.Lock()
defer s.Unlock()
switch s.status.Code {
case router.Stopped, router.Error:
stream, err := s.router.Watch(context.Background(), &pb.WatchRequest{})
if err != nil {
s.status = router.Status{Code: router.Error, Error: fmt.Errorf("failed getting event stream: %s", err)}
return
}
// create error and exit channels
s.errChan = make(chan error, 1)
s.exit = make(chan struct{})
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case s.errChan <- s.watchRouter(stream):
case <-s.exit:
}
}()
// watch for errors and cleanup
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.watchErrors()
}()
// mark router as Running and set its Error to nil
s.status = router.Status{Code: router.Running, Error: nil}
return
}
return
}
func (s *svc) advertiseEvents(stream pb.Router_AdvertiseService) error {
s.wg.Add(1)
go func() {
defer s.wg.Done()
<-s.exit
stream.Close()
}()
var advErr error
for {
resp, err := stream.Recv()
if err != nil {
if err != io.EOF {
advErr = err
}
break
}
events := make([]*router.Event, len(resp.Events))
for i, event := range resp.Events {
route := router.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Link: event.Route.Link,
Metric: int(event.Route.Metric),
}
events[i] = &router.Event{
Type: router.EventType(event.Type),
Timestamp: time.Unix(0, event.Timestamp),
Route: route,
}
}
advert := &router.Advert{
Id: resp.Id,
Type: router.AdvertType(resp.Type),
Timestamp: time.Unix(0, resp.Timestamp),
TTL: time.Duration(resp.Ttl),
Events: events,
}
select {
case s.advertChan <- advert:
case <-s.exit:
close(s.advertChan)
return nil
}
}
// close the channel on exit
close(s.advertChan)
return advErr
}
// Advertise advertises routes to the network
func (s *svc) Advertise() (<-chan *router.Advert, error) {
s.Lock()
defer s.Unlock()
switch s.status.Code {
case router.Advertising:
return s.advertChan, nil
case router.Running:
stream, err := s.router.Advertise(context.Background(), &pb.AdvertiseRequest{})
if err != nil {
return nil, fmt.Errorf("failed getting advert stream: %s", err)
}
// create advertise and event channels
s.advertChan = make(chan *router.Advert)
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case s.errChan <- s.advertiseEvents(stream):
case <-s.exit:
}
}()
// mark router as Running and set its Error to nil
s.status = router.Status{Code: router.Advertising, Error: nil}
return s.advertChan, nil
case router.Stopped:
return nil, fmt.Errorf("not running")
}
return nil, fmt.Errorf("error: %s", s.status.Error)
}
// Process processes incoming adverts
func (s *svc) Process(advert *router.Advert) error {
var events []*pb.Event
for _, event := range advert.Events {
route := &pb.Route{
Service: event.Route.Service,
Address: event.Route.Address,
Gateway: event.Route.Gateway,
Network: event.Route.Network,
Link: event.Route.Link,
Metric: int64(event.Route.Metric),
}
e := &pb.Event{
Type: pb.EventType(event.Type),
Timestamp: event.Timestamp.UnixNano(),
Route: route,
}
events = append(events, e)
}
advertReq := &pb.Advert{
Id: s.Options().Id,
Type: pb.AdvertType(advert.Type),
Timestamp: advert.Timestamp.UnixNano(),
Events: events,
}
if _, err := s.router.Process(context.Background(), advertReq); err != nil {
return err
}
return nil
}
// Create new route in the routing table
func (s *svc) Create(r router.Route) error {
route := &pb.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: int64(r.Metric),
}
if _, err := s.router.Create(context.Background(), route); err != nil {
return err
}
return nil
}
// Delete deletes existing route from the routing table
func (s *svc) Delete(r router.Route) error {
route := &pb.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: int64(r.Metric),
}
if _, err := s.router.Delete(context.Background(), route); err != nil {
return err
}
return nil
}
// Update updates route in the routing table
func (s *svc) Update(r router.Route) error {
route := &pb.Route{
Service: r.Service,
Address: r.Address,
Gateway: r.Gateway,
Network: r.Network,
Link: r.Link,
Metric: int64(r.Metric),
}
if _, err := s.router.Update(context.Background(), route); err != nil {
return err
}
return nil
}
// List returns the list of all routes in the table
func (s *svc) List() ([]router.Route, error) {
resp, err := s.router.List(context.Background(), &pb.ListRequest{})
if err != nil {
return nil, err
}
routes := make([]router.Route, len(resp.Routes))
for i, route := range resp.Routes {
routes[i] = router.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Link: route.Link,
Metric: int(route.Metric),
}
}
return routes, nil
}
// Lookup looks up routes in the routing table and returns them
func (s *svc) Lookup(q router.Query) ([]router.Route, error) {
// call the router
resp, err := s.router.Lookup(context.Background(), &pb.LookupRequest{
Query: &pb.Query{
Service: q.Options().Service,
Gateway: q.Options().Gateway,
Network: q.Options().Network,
},
})
// errored out
if err != nil {
return nil, err
}
routes := make([]router.Route, len(resp.Routes))
for i, route := range resp.Routes {
routes[i] = router.Route{
Service: route.Service,
Address: route.Address,
Gateway: route.Gateway,
Network: route.Network,
Link: route.Link,
Metric: int(route.Metric),
}
}
return routes, nil
}
// Watch returns a watcher which allows to track updates to the routing table
func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) {
wopts := router.WatchOptions{
Service: "*",
}
for _, o := range opts {
o(&wopts)
}
w := &svcWatcher{
opts: wopts,
resChan: make(chan *router.Event, 10),
done: make(chan struct{}),
}
s.Lock()
s.watchers[uuid.New().String()] = w
s.Unlock()
// when the router stops, stop the watcher and exit
s.wg.Add(1)
go func() {
defer s.wg.Done()
<-s.exit
w.Stop()
}()
return w, nil
}
// Status returns router status
func (s *svc) Status() router.Status {
s.RLock()
defer s.RUnlock()
// make a copy of the status
status := s.status
return status
}
// Stop stops the router
func (s *svc) Stop() error {
s.Lock()
// only close the channel if the router is running and/or advertising
if s.status.Code == router.Running || s.status.Code == router.Advertising {
// notify all goroutines to finish
close(s.exit)
// drain the advertise channel only if advertising
if s.status.Code == router.Advertising {
for range s.advertChan {
}
}
// mark the router as Stopped and set its Error to nil
s.status = router.Status{Code: router.Stopped, Error: nil}
}
s.Unlock()
// wait for all goroutines to finish
s.wg.Wait()
return nil
}
// Returns the router implementation
func (s *svc) String() string {
return "service"
}

View File

@ -0,0 +1,49 @@
package service
import (
"sync"
"github.com/micro/go-micro/network/router"
)
type svcWatcher struct {
opts router.WatchOptions
resChan chan *router.Event
done chan struct{}
sync.RWMutex
}
// Next is a blocking call that returns watch result
func (w *svcWatcher) Next() (*router.Event, error) {
for {
select {
case res := <-w.resChan:
switch w.opts.Service {
case res.Route.Service, "*":
return res, nil
default:
continue
}
case <-w.done:
return nil, router.ErrWatcherStopped
}
}
}
// Chan returns event channel
func (w *svcWatcher) Chan() (<-chan *router.Event, error) {
return w.resChan, nil
}
// Stop stops watcher
func (w *svcWatcher) Stop() {
w.Lock()
defer w.Unlock()
select {
case <-w.done:
return
default:
close(w.done)
}
}

View File

@ -1,59 +1,39 @@
package table
package router
import (
"errors"
"sync"
"time"
"github.com/google/uuid"
)
// Options specify routing table options
// TODO: table options TBD in the future
type Options struct{}
var (
// ErrRouteNotFound is returned when no route was found in the routing table
ErrRouteNotFound = errors.New("route not found")
// ErrDuplicateRoute is returned when the route already exists
ErrDuplicateRoute = errors.New("duplicate route")
)
// table is an in memory routing table
type table struct {
// opts are table options
opts Options
// m stores routing table map
m map[string]map[uint64]Route
// w is a list of table watchers
w map[string]*tableWatcher
// Table is an in memory routing table
type Table struct {
// routes stores service routes
routes map[string]map[uint64]Route
// watchers stores table watchers
watchers map[string]*tableWatcher
sync.RWMutex
}
// newTable creates a new routing table and returns it
func newTable(opts ...Option) Table {
// default options
var options Options
// apply requested options
for _, o := range opts {
o(&options)
// NewTable creates a new routing table and returns it
func NewTable(opts ...Option) *Table {
return &Table{
routes: make(map[string]map[uint64]Route),
watchers: make(map[string]*tableWatcher),
}
return &table{
opts: options,
m: make(map[string]map[uint64]Route),
w: make(map[string]*tableWatcher),
}
}
// Init initializes routing table with options
func (t *table) Init(opts ...Option) error {
for _, o := range opts {
o(&t.opts)
}
return nil
}
// Options returns routing table options
func (t *table) Options() Options {
return t.opts
}
// Create creates new route in the routing table
func (t *table) Create(r Route) error {
func (t *Table) Create(r Route) error {
service := r.Service
sum := r.Hash()
@ -61,16 +41,16 @@ func (t *table) Create(r Route) error {
defer t.Unlock()
// check if there are any routes in the table for the route destination
if _, ok := t.m[service]; !ok {
t.m[service] = make(map[uint64]Route)
t.m[service][sum] = r
if _, ok := t.routes[service]; !ok {
t.routes[service] = make(map[uint64]Route)
t.routes[service][sum] = r
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil
}
// add new route to the table for the route destination
if _, ok := t.m[service][sum]; !ok {
t.m[service][sum] = r
if _, ok := t.routes[service][sum]; !ok {
t.routes[service][sum] = r
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil
}
@ -79,25 +59,25 @@ func (t *table) Create(r Route) error {
}
// Delete deletes the route from the routing table
func (t *table) Delete(r Route) error {
func (t *Table) Delete(r Route) error {
service := r.Service
sum := r.Hash()
t.Lock()
defer t.Unlock()
if _, ok := t.m[service]; !ok {
if _, ok := t.routes[service]; !ok {
return ErrRouteNotFound
}
delete(t.m[service], sum)
delete(t.routes[service], sum)
go t.sendEvent(&Event{Type: Delete, Timestamp: time.Now(), Route: r})
return nil
}
// Update updates routing table with the new route
func (t *table) Update(r Route) error {
func (t *Table) Update(r Route) error {
service := r.Service
sum := r.Hash()
@ -105,26 +85,26 @@ func (t *table) Update(r Route) error {
defer t.Unlock()
// check if the route destination has any routes in the table
if _, ok := t.m[service]; !ok {
t.m[service] = make(map[uint64]Route)
t.m[service][sum] = r
if _, ok := t.routes[service]; !ok {
t.routes[service] = make(map[uint64]Route)
t.routes[service][sum] = r
go t.sendEvent(&Event{Type: Create, Timestamp: time.Now(), Route: r})
return nil
}
t.m[service][sum] = r
t.routes[service][sum] = r
go t.sendEvent(&Event{Type: Update, Timestamp: time.Now(), Route: r})
return nil
}
// List returns a list of all routes in the table
func (t *table) List() ([]Route, error) {
func (t *Table) List() ([]Route, error) {
t.RLock()
defer t.RUnlock()
var routes []Route
for _, rmap := range t.m {
for _, rmap := range t.routes {
for _, route := range rmap {
routes = append(routes, route)
}
@ -155,21 +135,20 @@ func findRoutes(routes map[uint64]Route, network, router string) []Route {
}
// Lookup queries routing table and returns all routes that match the lookup query
func (t *table) Lookup(q Query) ([]Route, error) {
func (t *Table) Lookup(q Query) ([]Route, error) {
t.RLock()
defer t.RUnlock()
if q.Options().Service != "*" {
// no routes found for the destination and query policy is not a DiscardIfNone
if _, ok := t.m[q.Options().Service]; !ok && q.Options().Policy != DiscardIfNone {
if _, ok := t.routes[q.Options().Service]; !ok {
return nil, ErrRouteNotFound
}
return findRoutes(t.m[q.Options().Service], q.Options().Network, q.Options().Gateway), nil
return findRoutes(t.routes[q.Options().Service], q.Options().Network, q.Options().Gateway), nil
}
var results []Route
// search through all destinations
for _, routes := range t.m {
for _, routes := range t.routes {
results = append(results, findRoutes(routes, q.Options().Network, q.Options().Gateway)...)
}
@ -177,7 +156,7 @@ func (t *table) Lookup(q Query) ([]Route, error) {
}
// Watch returns routing table entry watcher
func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
func (t *Table) Watch(opts ...WatchOption) (Watcher, error) {
// by default watch everything
wopts := WatchOptions{
Service: "*",
@ -187,46 +166,33 @@ func (t *table) Watch(opts ...WatchOption) (Watcher, error) {
o(&wopts)
}
watcher := &tableWatcher{
w := &tableWatcher{
opts: wopts,
resChan: make(chan *Event, 10),
done: make(chan struct{}),
}
t.Lock()
t.w[uuid.New().String()] = watcher
t.watchers[uuid.New().String()] = w
t.Unlock()
return watcher, nil
return w, nil
}
// sendEvent sends rules to all subscribe watchers
func (t *table) sendEvent(r *Event) {
// sendEvent sends events to all subscribed watchers
func (t *Table) sendEvent(e *Event) {
t.RLock()
defer t.RUnlock()
for _, w := range t.w {
for _, w := range t.watchers {
select {
case w.resChan <- r:
case w.resChan <- e:
case <-w.done:
}
}
}
// Size returns the size of the routing table
func (t *table) Size() int {
t.RLock()
defer t.RUnlock()
size := 0
for dest := range t.m {
size += len(t.m[dest])
}
return size
}
// String returns debug information
func (t *table) String() string {
return "default"
func (t *Table) String() string {
return "table"
}

View File

@ -1,38 +0,0 @@
package table
import (
"errors"
)
var (
// ErrRouteNotFound is returned when no route was found in the routing table
ErrRouteNotFound = errors.New("route not found")
// ErrDuplicateRoute is returned when the route already exists
ErrDuplicateRoute = errors.New("duplicate route")
)
// Table defines routing table interface
type Table interface {
// Create new route in the routing table
Create(Route) error
// Delete deletes existing route from the routing table
Delete(Route) error
// Update updates route in the routing table
Update(Route) error
// List returns the list of all routes in the table
List() ([]Route, error)
// Lookup looks up routes in the routing table and returns them
Lookup(Query) ([]Route, error)
// Watch returns a watcher which allows to track updates to the routing table
Watch(opts ...WatchOption) (Watcher, error)
// Size returns the size of the routing table
Size() int
}
// Option used by the routing table
type Option func(*Options)
// NewTable creates new routing table and returns it
func NewTable(opts ...Option) Table {
return newTable(opts...)
}

View File

@ -1,8 +1,8 @@
package table
package router
import "testing"
func testSetup() (Table, Route) {
func testSetup() (*Table, Route) {
table := NewTable()
route := Route{
@ -18,12 +18,10 @@ func testSetup() (Table, Route) {
func TestCreate(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize++
// adds new route for the original destination
route.Gateway = "dest.gw2"
@ -31,11 +29,6 @@ func TestCreate(t *testing.T) {
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize++
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
}
// adding the same route under Insert policy must error
if err := table.Create(route); err != ErrDuplicateRoute {
@ -45,12 +38,10 @@ func TestCreate(t *testing.T) {
func TestDelete(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize++
// should fail to delete non-existant route
prevSvc := route.Service
@ -66,21 +57,14 @@ func TestDelete(t *testing.T) {
if err := table.Delete(route); err != nil {
t.Errorf("error deleting route: %s", err)
}
testTableSize--
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
}
}
func TestUpdate(t *testing.T) {
table, route := testSetup()
testTableSize := table.Size()
if err := table.Create(route); err != nil {
t.Errorf("error adding route: %s", err)
}
testTableSize++
// change the metric of the original route
route.Metric = 200
@ -89,22 +73,12 @@ func TestUpdate(t *testing.T) {
t.Errorf("error updating route: %s", err)
}
// the size of the table should not change as we're only updating the metric of an existing route
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
}
// this should add a new route
route.Service = "rand.dest"
if err := table.Update(route); err != nil {
t.Errorf("error updating route: %s", err)
}
testTableSize++
if table.Size() != testTableSize {
t.Errorf("invalid number of routes. Expected: %d, found: %d", testTableSize, table.Size())
}
}
func TestList(t *testing.T) {
@ -127,10 +101,6 @@ func TestList(t *testing.T) {
if len(routes) != len(svc) {
t.Errorf("incorrect number of routes listed. Expected: %d, found: %d", len(svc), len(routes))
}
if len(routes) != table.Size() {
t.Errorf("mismatch number of routes and table size. Expected: %d, found: %d", len(routes), table.Size())
}
}
func TestLookup(t *testing.T) {
@ -157,10 +127,6 @@ func TestLookup(t *testing.T) {
t.Errorf("error looking up routes: %s", err)
}
if len(routes) != table.Size() {
t.Errorf("incorrect number of routes returned. Expected: %d, found: %d", table.Size(), len(routes))
}
// query particular net
query = NewQuery(QueryNetwork("net1"))
@ -218,8 +184,8 @@ func TestLookup(t *testing.T) {
query = NewQuery(QueryService("foobar"))
routes, err = table.Lookup(query)
if err != nil {
t.Errorf("error looking up routes: %s", err)
if err != ErrRouteNotFound {
t.Errorf("error looking up routes. Expected: %s, found: %s", ErrRouteNotFound, err)
}
if len(routes) != 0 {

View File

@ -1,7 +1,8 @@
package table
package router
import (
"errors"
"sync"
"time"
)
@ -22,11 +23,9 @@ const (
Update
)
// String implements fmt.Stringer
// NOTE: we need this as this makes converting the numeric codes
// into miro style string actions very simple
func (et EventType) String() string {
switch et {
// String returns human readable event type
func (t EventType) String() string {
switch t {
case Create:
return "create"
case Delete:
@ -80,11 +79,11 @@ type tableWatcher struct {
opts WatchOptions
resChan chan *Event
done chan struct{}
sync.RWMutex
}
// Next returns the next noticed action taken on table
// TODO: this needs to be thought through properly;
// right now we only allow to watch service
// TODO: right now we only allow to watch particular service
func (w *tableWatcher) Next() (*Event, error) {
for {
select {
@ -108,6 +107,9 @@ func (w *tableWatcher) Chan() (<-chan *Event, error) {
// Stop stops routing table watcher
func (w *tableWatcher) Stop() {
w.Lock()
defer w.Unlock()
select {
case <-w.done:
return