1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-12 08:23:58 +02:00

Merge pull request #1131 from micro/router-refactor

refactor and cleanup some router code
This commit is contained in:
Asim Aslam 2020-01-22 16:54:20 +00:00 committed by GitHub
commit 5cceb00df2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 219 additions and 540 deletions

View File

@ -203,10 +203,6 @@ func (p *Proxy) cacheRoutes(service string) ([]router.Route, error) {
// lookup the routes in the router
results, err := p.Router.Lookup(router.QueryService(service))
if err != nil {
// check the status of the router
if status := p.Router.Status(); status.Code == router.Error {
return nil, status.Error
}
// otherwise return the error
return nil, err
}

View File

@ -1,6 +1,7 @@
package router
import (
"errors"
"fmt"
"math"
"sort"
@ -16,8 +17,6 @@ import (
var (
// AdvertiseEventsTick is time interval in which the router advertises route updates
AdvertiseEventsTick = 10 * time.Second
// AdvertiseTableTick is time interval in which router advertises all routes found in routing table
AdvertiseTableTick = 2 * time.Minute
// DefaultAdvertTTL is default advertisement TTL
DefaultAdvertTTL = 2 * time.Minute
// AdvertSuppress is advert suppression threshold
@ -37,14 +36,12 @@ var (
// router implements default router
type router struct {
sync.RWMutex
options Options
status Status
running bool
table *table
exit chan struct{}
errChan chan error
options Options
exit chan bool
eventChan chan *Event
advertWg *sync.WaitGroup
wg *sync.WaitGroup
// advert subscribers
sub sync.RWMutex
@ -61,15 +58,9 @@ func newRouter(opts ...Option) Router {
o(&options)
}
// set initial status to Stopped
status := Status{Code: Stopped, Error: nil}
return &router{
options: options,
status: status,
table: newTable(),
advertWg: &sync.WaitGroup{},
wg: &sync.WaitGroup{},
subscribers: make(map[string]chan *Advert),
}
}
@ -125,7 +116,7 @@ func (r *router) manageRoute(route Route, action string) error {
// manageServiceRoutes applies action to all routes of the service.
// It returns error of the action fails with error.
func (r *router) manageServiceRoutes(service *registry.Service, action string) error {
func (r *router) manageRoutes(service *registry.Service, action string) error {
// action is the routing table action
action = strings.ToLower(action)
@ -166,7 +157,7 @@ func (r *router) manageRegistryRoutes(reg registry.Registry, action string) erro
}
// manage the routes for all returned services
for _, srv := range srvs {
if err := r.manageServiceRoutes(srv, action); err != nil {
if err := r.manageRoutes(srv, action); err != nil {
return err
}
}
@ -181,42 +172,35 @@ func (r *router) watchRegistry(w registry.Watcher) error {
exit := make(chan bool)
defer func() {
// close the exit channel when the go routine finishes
close(exit)
}()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer w.Stop()
defer r.wg.Done()
select {
case <-r.exit:
return
case <-exit:
return
case <-r.exit:
return
}
}()
var watchErr error
for {
res, err := w.Next()
if err != nil {
if err != registry.ErrWatcherStopped {
watchErr = err
return err
}
break
}
if err := r.manageServiceRoutes(res.Service, res.Action); err != nil {
if err := r.manageRoutes(res.Service, res.Action); err != nil {
return err
}
}
return watchErr
return nil
}
// watchTable watches routing table entries and either adds or deletes locally registered service to/from network registry
@ -225,16 +209,13 @@ func (r *router) watchTable(w Watcher) error {
exit := make(chan bool)
defer func() {
// close the exit channel when the go routine finishes
close(exit)
}()
// wait in the background for the router to stop
// when the router stops, stop the watcher and exit
r.wg.Add(1)
go func() {
defer w.Stop()
defer r.wg.Done()
select {
case <-r.exit:
@ -244,13 +225,11 @@ func (r *router) watchTable(w Watcher) error {
}
}()
var watchErr error
for {
event, err := w.Next()
if err != nil {
if err != ErrWatcherStopped {
watchErr = err
return err
}
break
}
@ -260,13 +239,11 @@ func (r *router) watchTable(w Watcher) error {
close(r.eventChan)
return nil
case r.eventChan <- event:
// process event
}
}
// close event channel on error
close(r.eventChan)
return watchErr
return nil
}
// publishAdvert publishes router advert to advert channel
@ -292,36 +269,6 @@ func (r *router) publishAdvert(advType AdvertType, events []*Event) {
r.sub.RUnlock()
}
// advertiseTable advertises the whole routing table to the network
func (r *router) advertiseTable() error {
// create table advertisement ticker
ticker := time.NewTicker(AdvertiseTableTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// do full table flush
events, err := r.flushRouteEvents(Update)
if err != nil {
return fmt.Errorf("failed flushing routes: %s", err)
}
// advertise routes to subscribers
if len(events) > 0 {
log.Debugf("Router flushing table with %d events: %s", len(events), r.options.Id)
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
r.publishAdvert(RouteUpdate, events)
}()
}
case <-r.exit:
return nil
}
}
}
// advert contains a route event to be advertised
type advert struct {
// event received from routing table
@ -392,17 +339,39 @@ func (r *router) advertiseEvents() error {
adverts := make(adverts)
// routing table watcher
tableWatcher, err := r.Watch()
w, err := r.Watch()
if err != nil {
return fmt.Errorf("failed creating routing table watcher: %v", err)
return err
}
defer w.Stop()
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchTable(tableWatcher):
case <-r.exit:
var err error
for {
select {
case <-r.exit:
return
default:
if w == nil {
// routing table watcher
w, err = r.Watch()
if err != nil {
log.Logf("Error creating watcher: %v", err)
time.Sleep(time.Second)
continue
}
}
if err := r.watchTable(w); err != nil {
log.Logf("Error watching table: %v", err)
time.Sleep(time.Second)
}
// reset
w.Stop()
w = nil
}
}
}()
@ -446,11 +415,7 @@ func (r *router) advertiseEvents() error {
// advertise events to subscribers
if len(events) > 0 {
log.Debugf("Router publishing %d events", len(events))
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
r.publishAdvert(RouteUpdate, events)
}()
go r.publishAdvert(RouteUpdate, events)
}
case e := <-r.eventChan:
// if event is nil, continue
@ -502,65 +467,19 @@ func (r *router) advertiseEvents() error {
a.penalty += Penalty
log.Debugf("Router advert %d for route %s %s event penalty: %f", hash, a.event.Route.Service, a.event.Route.Address, a.penalty)
case <-r.exit:
// first wait for the advertiser to finish
r.advertWg.Wait()
w.Stop()
return nil
}
}
}
// close closes exit channels
func (r *router) close() {
log.Debugf("Router closing remaining channels")
// drain the advertise channel only if advertising
if r.status.Code == Advertising {
// drain the event channel
for range r.eventChan {
}
// close advert subscribers
for id, sub := range r.subscribers {
select {
case <-sub:
default:
}
// close the channel
close(sub)
// delete the subscriber
r.sub.Lock()
delete(r.subscribers, id)
r.sub.Unlock()
}
}
// mark the router as Stopped and set its Error to nil
r.status = Status{Code: Stopped, Error: nil}
}
// watchErrors watches router errors and takes appropriate actions
func (r *router) watchErrors() {
var err error
select {
case <-r.exit:
return
case err = <-r.errChan:
}
r.Lock()
defer r.Unlock()
// if the router is not stopped, stop it
if r.status.Code != Stopped {
// notify all goroutines to finish
close(r.exit)
// close all the channels
r.close()
// set the status error
if err != nil {
r.status.Error = err
// drain all the events, only called on Stop
func (r *router) drain() {
for {
select {
case <-r.eventChan:
default:
return
}
}
}
@ -570,16 +489,13 @@ func (r *router) Start() error {
r.Lock()
defer r.Unlock()
// only start if we're stopped
if r.status.Code != Stopped {
if r.running {
return nil
}
// add all local service routes into the routing table
if err := r.manageRegistryRoutes(r.options.Registry, "create"); err != nil {
e := fmt.Errorf("failed adding registry routes: %s", err)
r.status = Status{Code: Error, Error: e}
return e
return fmt.Errorf("failed adding registry routes: %s", err)
}
// add default gateway into routing table
@ -595,42 +511,49 @@ func (r *router) Start() error {
Metric: DefaultLocalMetric,
}
if err := r.table.Create(route); err != nil {
e := fmt.Errorf("failed adding default gateway route: %s", err)
r.status = Status{Code: Error, Error: e}
return e
return fmt.Errorf("failed adding default gateway route: %s", err)
}
}
// create error and exit channels
r.errChan = make(chan error, 1)
r.exit = make(chan struct{})
r.exit = make(chan bool)
// registry watcher
regWatcher, err := r.options.Registry.Watch()
w, err := r.options.Registry.Watch()
if err != nil {
e := fmt.Errorf("failed creating registry watcher: %v", err)
r.status = Status{Code: Error, Error: e}
return e
return fmt.Errorf("failed creating registry watcher: %v", err)
}
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.watchRegistry(regWatcher):
case <-r.exit:
var err error
for {
select {
case <-r.exit:
w.Stop()
return
default:
if w == nil {
w, err = r.options.Registry.Watch()
if err != nil {
log.Logf("failed creating registry watcher: %v", err)
time.Sleep(time.Second)
continue
}
}
if err := r.watchRegistry(w); err != nil {
log.Logf("Error watching the registry: %v", err)
time.Sleep(time.Second)
}
w.Stop()
w = nil
}
}
}()
// watch for errors and cleanup
r.wg.Add(1)
go func() {
defer r.wg.Done()
r.watchErrors()
}()
// mark router as Running
r.status = Status{Code: Running, Error: nil}
r.running = true
return nil
}
@ -642,61 +565,46 @@ func (r *router) Advertise() (<-chan *Advert, error) {
r.Lock()
defer r.Unlock()
switch r.status.Code {
case Advertising:
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil
case Running:
// list all the routes and pack them into even slice to advertise
events, err := r.flushRouteEvents(Create)
if err != nil {
return nil, fmt.Errorf("failed to flush routes: %s", err)
}
// create event channels
r.eventChan = make(chan *Event)
// create advert channel
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
// advertise your presence
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
r.publishAdvert(Announce, events)
}()
r.wg.Add(1)
go func() {
defer r.wg.Done()
select {
case r.errChan <- r.advertiseEvents():
case <-r.exit:
}
}()
r.advertWg.Add(1)
go func() {
defer r.advertWg.Done()
// advertise the whole routing table
select {
case r.errChan <- r.advertiseTable():
case <-r.exit:
}
}()
// mark router as Running and set its Error to nil
r.status = Status{Code: Advertising, Error: nil}
log.Debugf("Router starting to advertise")
return advertChan, nil
case Stopped:
return nil, fmt.Errorf("not running")
if !r.running {
return nil, errors.New("not running")
}
return nil, fmt.Errorf("error: %s", r.status.Error)
// already advertising
if r.eventChan != nil {
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
return advertChan, nil
}
// list all the routes and pack them into even slice to advertise
events, err := r.flushRouteEvents(Create)
if err != nil {
return nil, fmt.Errorf("failed to flush routes: %s", err)
}
// create event channels
r.eventChan = make(chan *Event)
// create advert channel
advertChan := make(chan *Advert, 128)
r.subscribers[uuid.New().String()] = advertChan
// advertise your presence
go r.publishAdvert(Announce, events)
go func() {
select {
case <-r.exit:
return
default:
if err := r.advertiseEvents(); err != nil {
log.Logf("Error adveritising events: %v", err)
}
}
}()
return advertChan, nil
}
// Process updates the routing table using the advertised values
@ -774,48 +682,39 @@ func (r *router) Watch(opts ...WatchOption) (Watcher, error) {
return r.table.Watch(opts...)
}
// Status returns router status
func (r *router) Status() Status {
r.RLock()
defer r.RUnlock()
// make a copy of the status
status := r.status
return status
}
// Stop stops the router
func (r *router) Stop() error {
r.Lock()
defer r.Unlock()
log.Debugf("Router shutting down")
switch r.status.Code {
case Stopped, Error:
r.Unlock()
return r.status.Error
case Running, Advertising:
// notify all goroutines to finish
select {
case <-r.exit:
return nil
default:
close(r.exit)
// close all the channels
// NOTE: close marks the router status as Stopped
r.close()
// extract the events
r.drain()
// close advert subscribers
for id, sub := range r.subscribers {
// close the channel
close(sub)
// delete the subscriber
r.sub.Lock()
delete(r.subscribers, id)
r.sub.Unlock()
}
}
r.Unlock()
log.Tracef("Router waiting for all goroutines to finish")
// wait for all goroutines to finish
r.wg.Wait()
log.Debugf("Router successfully stopped")
// remove event chan
r.eventChan = nil
return nil
}
// String prints debugging information about router
func (r *router) String() string {
return "memory"
return "registry"
}

View File

@ -38,7 +38,6 @@ func TestRouterAdvertise(t *testing.T) {
// lower the advertise interval
AdvertiseEventsTick = 500 * time.Millisecond
AdvertiseTableTick = 1 * time.Second
if err := r.Start(); err != nil {
t.Errorf("failed to start router: %v", err)

View File

@ -34,8 +34,6 @@ type Router interface {
Watch(opts ...WatchOption) (Watcher, error)
// Start starts the router
Start() error
// Status returns router status
Status() Status
// Stop stops the router
Stop() error
// Returns the router implementation
@ -73,34 +71,6 @@ const (
Error
)
func (s StatusCode) String() string {
switch s {
case Running:
return "running"
case Advertising:
return "advertising"
case Stopped:
return "stopped"
case Error:
return "error"
default:
return "unknown"
}
}
// Status is router status
type Status struct {
// Code defines router status
Code StatusCode
// Error contains error description
Error error
}
// String returns human readable status
func (s Status) String() string {
return s.Code.String()
}
// AdvertType is route advertisement type
type AdvertType int

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: router.proto
// source: micro/go-micro/router/service/proto/router.proto
package go_micro_router
@ -43,7 +43,7 @@ func (x AdvertType) String() string {
}
func (AdvertType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{0}
return fileDescriptor_c2b04f200fb3e806, []int{0}
}
// EventType defines the type of event
@ -72,7 +72,7 @@ func (x EventType) String() string {
}
func (EventType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{1}
return fileDescriptor_c2b04f200fb3e806, []int{1}
}
// Empty request
@ -86,7 +86,7 @@ func (m *Request) Reset() { *m = Request{} }
func (m *Request) String() string { return proto.CompactTextString(m) }
func (*Request) ProtoMessage() {}
func (*Request) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{0}
return fileDescriptor_c2b04f200fb3e806, []int{0}
}
func (m *Request) XXX_Unmarshal(b []byte) error {
@ -118,7 +118,7 @@ func (m *Response) Reset() { *m = Response{} }
func (m *Response) String() string { return proto.CompactTextString(m) }
func (*Response) ProtoMessage() {}
func (*Response) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{1}
return fileDescriptor_c2b04f200fb3e806, []int{1}
}
func (m *Response) XXX_Unmarshal(b []byte) error {
@ -151,7 +151,7 @@ func (m *ListResponse) Reset() { *m = ListResponse{} }
func (m *ListResponse) String() string { return proto.CompactTextString(m) }
func (*ListResponse) ProtoMessage() {}
func (*ListResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{2}
return fileDescriptor_c2b04f200fb3e806, []int{2}
}
func (m *ListResponse) XXX_Unmarshal(b []byte) error {
@ -191,7 +191,7 @@ func (m *LookupRequest) Reset() { *m = LookupRequest{} }
func (m *LookupRequest) String() string { return proto.CompactTextString(m) }
func (*LookupRequest) ProtoMessage() {}
func (*LookupRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{3}
return fileDescriptor_c2b04f200fb3e806, []int{3}
}
func (m *LookupRequest) XXX_Unmarshal(b []byte) error {
@ -231,7 +231,7 @@ func (m *LookupResponse) Reset() { *m = LookupResponse{} }
func (m *LookupResponse) String() string { return proto.CompactTextString(m) }
func (*LookupResponse) ProtoMessage() {}
func (*LookupResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{4}
return fileDescriptor_c2b04f200fb3e806, []int{4}
}
func (m *LookupResponse) XXX_Unmarshal(b []byte) error {
@ -271,7 +271,7 @@ func (m *QueryRequest) Reset() { *m = QueryRequest{} }
func (m *QueryRequest) String() string { return proto.CompactTextString(m) }
func (*QueryRequest) ProtoMessage() {}
func (*QueryRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{5}
return fileDescriptor_c2b04f200fb3e806, []int{5}
}
func (m *QueryRequest) XXX_Unmarshal(b []byte) error {
@ -311,7 +311,7 @@ func (m *QueryResponse) Reset() { *m = QueryResponse{} }
func (m *QueryResponse) String() string { return proto.CompactTextString(m) }
func (*QueryResponse) ProtoMessage() {}
func (*QueryResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{6}
return fileDescriptor_c2b04f200fb3e806, []int{6}
}
func (m *QueryResponse) XXX_Unmarshal(b []byte) error {
@ -350,7 +350,7 @@ func (m *WatchRequest) Reset() { *m = WatchRequest{} }
func (m *WatchRequest) String() string { return proto.CompactTextString(m) }
func (*WatchRequest) ProtoMessage() {}
func (*WatchRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{7}
return fileDescriptor_c2b04f200fb3e806, []int{7}
}
func (m *WatchRequest) XXX_Unmarshal(b []byte) error {
@ -392,7 +392,7 @@ func (m *Advert) Reset() { *m = Advert{} }
func (m *Advert) String() string { return proto.CompactTextString(m) }
func (*Advert) ProtoMessage() {}
func (*Advert) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{8}
return fileDescriptor_c2b04f200fb3e806, []int{8}
}
func (m *Advert) XXX_Unmarshal(b []byte) error {
@ -459,7 +459,7 @@ func (m *ProcessResponse) Reset() { *m = ProcessResponse{} }
func (m *ProcessResponse) String() string { return proto.CompactTextString(m) }
func (*ProcessResponse) ProtoMessage() {}
func (*ProcessResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{9}
return fileDescriptor_c2b04f200fb3e806, []int{9}
}
func (m *ProcessResponse) XXX_Unmarshal(b []byte) error {
@ -491,7 +491,7 @@ func (m *CreateResponse) Reset() { *m = CreateResponse{} }
func (m *CreateResponse) String() string { return proto.CompactTextString(m) }
func (*CreateResponse) ProtoMessage() {}
func (*CreateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{10}
return fileDescriptor_c2b04f200fb3e806, []int{10}
}
func (m *CreateResponse) XXX_Unmarshal(b []byte) error {
@ -523,7 +523,7 @@ func (m *DeleteResponse) Reset() { *m = DeleteResponse{} }
func (m *DeleteResponse) String() string { return proto.CompactTextString(m) }
func (*DeleteResponse) ProtoMessage() {}
func (*DeleteResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{11}
return fileDescriptor_c2b04f200fb3e806, []int{11}
}
func (m *DeleteResponse) XXX_Unmarshal(b []byte) error {
@ -555,7 +555,7 @@ func (m *UpdateResponse) Reset() { *m = UpdateResponse{} }
func (m *UpdateResponse) String() string { return proto.CompactTextString(m) }
func (*UpdateResponse) ProtoMessage() {}
func (*UpdateResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{12}
return fileDescriptor_c2b04f200fb3e806, []int{12}
}
func (m *UpdateResponse) XXX_Unmarshal(b []byte) error {
@ -593,7 +593,7 @@ func (m *Event) Reset() { *m = Event{} }
func (m *Event) String() string { return proto.CompactTextString(m) }
func (*Event) ProtoMessage() {}
func (*Event) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{13}
return fileDescriptor_c2b04f200fb3e806, []int{13}
}
func (m *Event) XXX_Unmarshal(b []byte) error {
@ -652,7 +652,7 @@ func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{14}
return fileDescriptor_c2b04f200fb3e806, []int{14}
}
func (m *Query) XXX_Unmarshal(b []byte) error {
@ -719,7 +719,7 @@ func (m *Route) Reset() { *m = Route{} }
func (m *Route) String() string { return proto.CompactTextString(m) }
func (*Route) ProtoMessage() {}
func (*Route) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{15}
return fileDescriptor_c2b04f200fb3e806, []int{15}
}
func (m *Route) XXX_Unmarshal(b []byte) error {
@ -789,92 +789,6 @@ func (m *Route) GetMetric() int64 {
return 0
}
type Status struct {
Code string `protobuf:"bytes,1,opt,name=code,proto3" json:"code,omitempty"`
Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Status) Reset() { *m = Status{} }
func (m *Status) String() string { return proto.CompactTextString(m) }
func (*Status) ProtoMessage() {}
func (*Status) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{16}
}
func (m *Status) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Status.Unmarshal(m, b)
}
func (m *Status) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Status.Marshal(b, m, deterministic)
}
func (m *Status) XXX_Merge(src proto.Message) {
xxx_messageInfo_Status.Merge(m, src)
}
func (m *Status) XXX_Size() int {
return xxx_messageInfo_Status.Size(m)
}
func (m *Status) XXX_DiscardUnknown() {
xxx_messageInfo_Status.DiscardUnknown(m)
}
var xxx_messageInfo_Status proto.InternalMessageInfo
func (m *Status) GetCode() string {
if m != nil {
return m.Code
}
return ""
}
func (m *Status) GetError() string {
if m != nil {
return m.Error
}
return ""
}
type StatusResponse struct {
Status *Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *StatusResponse) Reset() { *m = StatusResponse{} }
func (m *StatusResponse) String() string { return proto.CompactTextString(m) }
func (*StatusResponse) ProtoMessage() {}
func (*StatusResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_367072455c71aedc, []int{17}
}
func (m *StatusResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StatusResponse.Unmarshal(m, b)
}
func (m *StatusResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_StatusResponse.Marshal(b, m, deterministic)
}
func (m *StatusResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_StatusResponse.Merge(m, src)
}
func (m *StatusResponse) XXX_Size() int {
return xxx_messageInfo_StatusResponse.Size(m)
}
func (m *StatusResponse) XXX_DiscardUnknown() {
xxx_messageInfo_StatusResponse.DiscardUnknown(m)
}
var xxx_messageInfo_StatusResponse proto.InternalMessageInfo
func (m *StatusResponse) GetStatus() *Status {
if m != nil {
return m.Status
}
return nil
}
func init() {
proto.RegisterEnum("go.micro.router.AdvertType", AdvertType_name, AdvertType_value)
proto.RegisterEnum("go.micro.router.EventType", EventType_name, EventType_value)
@ -894,56 +808,53 @@ func init() {
proto.RegisterType((*Event)(nil), "go.micro.router.Event")
proto.RegisterType((*Query)(nil), "go.micro.router.Query")
proto.RegisterType((*Route)(nil), "go.micro.router.Route")
proto.RegisterType((*Status)(nil), "go.micro.router.Status")
proto.RegisterType((*StatusResponse)(nil), "go.micro.router.StatusResponse")
}
func init() { proto.RegisterFile("router.proto", fileDescriptor_367072455c71aedc) }
var fileDescriptor_367072455c71aedc = []byte{
// 693 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0x4f, 0x4f, 0xdb, 0x4a,
0x10, 0xb7, 0x93, 0xd8, 0x79, 0x99, 0x17, 0x8c, 0xdf, 0xe8, 0x09, 0xac, 0xb4, 0x40, 0xe4, 0x13,
0x42, 0xc8, 0x54, 0xe9, 0xb5, 0xff, 0x02, 0xa5, 0xaa, 0x54, 0x0e, 0xad, 0x0b, 0xea, 0xd9, 0xd8,
0x23, 0x6a, 0x91, 0xd8, 0x66, 0x77, 0x03, 0xca, 0xb9, 0x9f, 0xa6, 0xe7, 0x7e, 0xa4, 0x5e, 0xfb,
0x21, 0x2a, 0xef, 0xae, 0x43, 0x88, 0x31, 0x12, 0x9c, 0xbc, 0xf3, 0xef, 0x37, 0xb3, 0x3b, 0xbf,
0x19, 0x43, 0x9f, 0xe5, 0x33, 0x41, 0x2c, 0x28, 0x58, 0x2e, 0x72, 0x5c, 0xbf, 0xc8, 0x83, 0x69,
0x1a, 0xb3, 0x3c, 0x50, 0x6a, 0xbf, 0x07, 0xdd, 0x90, 0xae, 0x66, 0xc4, 0x85, 0x0f, 0xf0, 0x4f,
0x48, 0xbc, 0xc8, 0x33, 0x4e, 0xfe, 0x1b, 0xe8, 0x9f, 0xa4, 0x5c, 0x54, 0x32, 0x06, 0x60, 0xcb,
0x00, 0xee, 0x99, 0xc3, 0xf6, 0xee, 0xbf, 0xa3, 0x8d, 0x60, 0x05, 0x28, 0x08, 0xcb, 0x4f, 0xa8,
0xbd, 0xfc, 0xd7, 0xb0, 0x76, 0x92, 0xe7, 0x97, 0xb3, 0x42, 0x83, 0xe3, 0x3e, 0x58, 0x57, 0x33,
0x62, 0x73, 0xcf, 0x1c, 0x9a, 0xf7, 0xc6, 0x7f, 0x29, 0xad, 0xa1, 0x72, 0xf2, 0xdf, 0x81, 0x53,
0x85, 0x3f, 0xb1, 0x80, 0x57, 0xd0, 0x57, 0x88, 0x4f, 0xca, 0xff, 0x16, 0xd6, 0x74, 0xf4, 0x13,
0xd3, 0x3b, 0xd0, 0xff, 0x16, 0x89, 0xf8, 0x7b, 0xf5, 0xb6, 0x3f, 0x4d, 0xb0, 0xc7, 0xc9, 0x35,
0x31, 0x81, 0x0e, 0xb4, 0xd2, 0x44, 0x96, 0xd1, 0x0b, 0x5b, 0x69, 0x82, 0x07, 0xd0, 0x11, 0xf3,
0x82, 0xbc, 0xd6, 0xd0, 0xdc, 0x75, 0x46, 0xcf, 0x6a, 0xc0, 0x2a, 0xec, 0x74, 0x5e, 0x50, 0x28,
0x1d, 0xf1, 0x39, 0xf4, 0x44, 0x3a, 0x25, 0x2e, 0xa2, 0x69, 0xe1, 0xb5, 0x87, 0xe6, 0x6e, 0x3b,
0xbc, 0x55, 0xa0, 0x0b, 0x6d, 0x21, 0x26, 0x5e, 0x47, 0xea, 0xcb, 0x63, 0x59, 0x3b, 0x5d, 0x53,
0x26, 0xb8, 0x67, 0x35, 0xd4, 0x7e, 0x5c, 0x9a, 0x43, 0xed, 0xe5, 0xff, 0x07, 0xeb, 0x9f, 0x59,
0x1e, 0x13, 0xe7, 0x0b, 0x3a, 0xb8, 0xe0, 0x1c, 0x31, 0x8a, 0x04, 0x2d, 0x6b, 0xde, 0xd3, 0x84,
0xee, 0x6a, 0xce, 0x8a, 0x64, 0xd9, 0xe7, 0x87, 0x09, 0x96, 0x84, 0xc6, 0x40, 0xdf, 0xd1, 0x94,
0x77, 0x1c, 0xdc, 0x5f, 0x40, 0xd3, 0x15, 0x5b, 0xab, 0x57, 0xdc, 0x07, 0x4b, 0xc6, 0xc9, 0xcb,
0x37, 0xf7, 0x42, 0x39, 0xf9, 0x67, 0x60, 0xc9, 0x5e, 0xa2, 0x07, 0x5d, 0x4e, 0xec, 0x3a, 0x8d,
0x49, 0xbf, 0x7e, 0x25, 0x96, 0x96, 0x8b, 0x48, 0xd0, 0x4d, 0x34, 0x97, 0xc9, 0x7a, 0x61, 0x25,
0x96, 0x96, 0x8c, 0xc4, 0x4d, 0xce, 0x2e, 0x65, 0xb2, 0x5e, 0x58, 0x89, 0xfe, 0x2f, 0x13, 0x2c,
0x99, 0xe7, 0x61, 0xdc, 0x28, 0x49, 0x18, 0x71, 0x5e, 0xe1, 0x6a, 0x71, 0x39, 0x63, 0xbb, 0x31,
0x63, 0xe7, 0x4e, 0x46, 0xdc, 0xd0, 0x1c, 0x64, 0x9e, 0x25, 0x0d, 0x5a, 0x42, 0x84, 0xce, 0x24,
0xcd, 0x2e, 0x3d, 0x5b, 0x6a, 0xe5, 0xb9, 0xf4, 0x9d, 0x92, 0x60, 0x69, 0xec, 0x75, 0xe5, 0xeb,
0x69, 0xc9, 0x1f, 0x81, 0xfd, 0x55, 0x44, 0x62, 0xc6, 0xcb, 0xa8, 0x38, 0x4f, 0xaa, 0x92, 0xe5,
0x19, 0xff, 0x07, 0x8b, 0x18, 0xcb, 0x99, 0xae, 0x56, 0x09, 0xfe, 0x18, 0x1c, 0x15, 0xb3, 0x98,
0x86, 0x03, 0xb0, 0xb9, 0xd4, 0xe8, 0x69, 0xda, 0xac, 0x75, 0x40, 0x07, 0x68, 0xb7, 0xbd, 0x11,
0xc0, 0x2d, 0x8d, 0x11, 0xc1, 0x51, 0xd2, 0x38, 0xcb, 0xf2, 0x59, 0x16, 0x93, 0x6b, 0xa0, 0x0b,
0x7d, 0xa5, 0x53, 0x1c, 0x72, 0xcd, 0xbd, 0x03, 0xe8, 0x2d, 0x68, 0x81, 0x00, 0xb6, 0x22, 0xa0,
0x6b, 0x94, 0x67, 0x45, 0x3d, 0xd7, 0x2c, 0xcf, 0x3a, 0xa0, 0x35, 0xfa, 0xd3, 0x02, 0x3b, 0x54,
0x4f, 0xf2, 0x09, 0x6c, 0xb5, 0x3f, 0x70, 0xbb, 0x56, 0xda, 0x9d, 0xbd, 0x34, 0xd8, 0x69, 0xb4,
0x6b, 0x12, 0x1b, 0x78, 0x08, 0x96, 0x9c, 0x65, 0xdc, 0xaa, 0xf9, 0x2e, 0xcf, 0xf8, 0xa0, 0x61,
0xae, 0x7c, 0xe3, 0x85, 0x89, 0x87, 0xd0, 0x53, 0xd7, 0x4b, 0x39, 0xa1, 0x57, 0x27, 0xac, 0x86,
0xd8, 0x6c, 0x98, 0x7e, 0x89, 0xf1, 0x01, 0xba, 0x7a, 0x2e, 0xb1, 0xc9, 0x6f, 0x30, 0xac, 0x19,
0x56, 0x47, 0xd9, 0xc0, 0xe3, 0x05, 0x07, 0x9a, 0x0b, 0xd9, 0x69, 0xea, 0xe8, 0x02, 0x66, 0xf4,
0xbb, 0x05, 0xd6, 0x69, 0x74, 0x3e, 0x21, 0x3c, 0xaa, 0x9a, 0x83, 0x0d, 0xa3, 0x78, 0x0f, 0xdc,
0xca, 0x3a, 0x31, 0x4a, 0x10, 0xd5, 0xd5, 0x47, 0x80, 0xac, 0x6c, 0x20, 0x09, 0xa2, 0xe8, 0xf0,
0x08, 0x90, 0x95, 0xa5, 0x65, 0xe0, 0x18, 0x3a, 0xe5, 0xbf, 0xef, 0x81, 0xd7, 0xa9, 0x13, 0x61,
0xf9, 0x67, 0xe9, 0x1b, 0xf8, 0xb1, 0xda, 0x39, 0x5b, 0x0d, 0xff, 0x19, 0x0d, 0xb4, 0xdd, 0x64,
0xae, 0x90, 0xce, 0x6d, 0xf9, 0xdf, 0x7e, 0xf9, 0x37, 0x00, 0x00, 0xff, 0xff, 0x86, 0x75, 0x28,
0x0b, 0xc7, 0x07, 0x00, 0x00,
func init() {
proto.RegisterFile("micro/go-micro/router/service/proto/router.proto", fileDescriptor_c2b04f200fb3e806)
}
var fileDescriptor_c2b04f200fb3e806 = []byte{
// 646 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x55, 0xcb, 0x4e, 0xdb, 0x4c,
0x14, 0xb6, 0x9d, 0xd8, 0xf9, 0x7d, 0xfe, 0x10, 0xdc, 0xb3, 0xa0, 0x56, 0x5a, 0x68, 0xe4, 0x15,
0x42, 0xd4, 0x41, 0xe9, 0xb6, 0x37, 0xa0, 0xad, 0x2a, 0x95, 0x45, 0x6b, 0x81, 0xba, 0x36, 0xc9,
0x11, 0xb5, 0x48, 0x6c, 0x33, 0x33, 0x01, 0x65, 0xdd, 0x67, 0xe9, 0xa2, 0xeb, 0x3e, 0x52, 0x5f,
0xa4, 0x9a, 0x8b, 0x21, 0xc4, 0x18, 0x09, 0x56, 0x99, 0x73, 0xfb, 0xce, 0xf5, 0x8b, 0x61, 0x6f,
0x96, 0x8d, 0x59, 0x31, 0x3c, 0x2b, 0x5e, 0xea, 0x07, 0x2b, 0xe6, 0x82, 0xd8, 0x90, 0x13, 0xbb,
0xcc, 0xc6, 0x34, 0x2c, 0x59, 0x21, 0x2a, 0x65, 0xac, 0x04, 0x5c, 0x3f, 0x2b, 0x62, 0xe5, 0x1b,
0x6b, 0x75, 0xe4, 0x43, 0x27, 0xa1, 0x8b, 0x39, 0x71, 0x11, 0x01, 0xfc, 0x97, 0x10, 0x2f, 0x8b,
0x9c, 0x53, 0xf4, 0x16, 0xba, 0x47, 0x19, 0x17, 0x95, 0x8c, 0x31, 0x78, 0x2a, 0x80, 0x87, 0xf6,
0xa0, 0xb5, 0xfd, 0xff, 0x68, 0x23, 0x5e, 0x01, 0x8a, 0x13, 0xf9, 0x93, 0x18, 0xaf, 0xe8, 0x0d,
0xac, 0x1d, 0x15, 0xc5, 0xf9, 0xbc, 0x34, 0xe0, 0xb8, 0x0b, 0xee, 0xc5, 0x9c, 0xd8, 0x22, 0xb4,
0x07, 0xf6, 0x9d, 0xf1, 0xdf, 0xa4, 0x35, 0xd1, 0x4e, 0xd1, 0x7b, 0xe8, 0x55, 0xe1, 0x8f, 0x2c,
0xe0, 0x35, 0x74, 0x35, 0xe2, 0xa3, 0xf2, 0xbf, 0x83, 0x35, 0x13, 0xfd, 0xc8, 0xf4, 0x3d, 0xe8,
0x7e, 0x4f, 0xc5, 0xf8, 0x47, 0x35, 0xdb, 0xdf, 0x36, 0x78, 0xfb, 0x93, 0x4b, 0x62, 0x02, 0x7b,
0xe0, 0x64, 0x13, 0x55, 0x86, 0x9f, 0x38, 0xd9, 0x04, 0x87, 0xd0, 0x16, 0x8b, 0x92, 0x42, 0x67,
0x60, 0x6f, 0xf7, 0x46, 0xcf, 0x6a, 0xc0, 0x3a, 0xec, 0x78, 0x51, 0x52, 0xa2, 0x1c, 0xf1, 0x39,
0xf8, 0x22, 0x9b, 0x11, 0x17, 0xe9, 0xac, 0x0c, 0x5b, 0x03, 0x7b, 0xbb, 0x95, 0xdc, 0x28, 0x30,
0x80, 0x96, 0x10, 0xd3, 0xb0, 0xad, 0xf4, 0xf2, 0x29, 0x6b, 0xa7, 0x4b, 0xca, 0x05, 0x0f, 0xdd,
0x86, 0xda, 0x3f, 0x4a, 0x73, 0x62, 0xbc, 0xa2, 0x27, 0xb0, 0xfe, 0x95, 0x15, 0x63, 0xe2, 0xfc,
0xfa, 0x1c, 0x02, 0xe8, 0x1d, 0x32, 0x4a, 0x05, 0x2d, 0x6b, 0x3e, 0xd0, 0x94, 0x6e, 0x6b, 0x4e,
0xca, 0xc9, 0xb2, 0xcf, 0x4f, 0x1b, 0x5c, 0x05, 0x8d, 0xb1, 0xe9, 0xd1, 0x56, 0x3d, 0xf6, 0xef,
0x2e, 0xa0, 0xa9, 0x45, 0x67, 0xb5, 0xc5, 0x5d, 0x70, 0x55, 0x9c, 0x6a, 0xbe, 0x79, 0x17, 0xda,
0x29, 0x3a, 0x01, 0x57, 0xed, 0x12, 0x43, 0xe8, 0x18, 0x66, 0x98, 0xe9, 0x57, 0xa2, 0xb4, 0x9c,
0xa5, 0x82, 0xae, 0xd2, 0x85, 0x4a, 0xe6, 0x27, 0x95, 0x28, 0x2d, 0x39, 0x89, 0xab, 0x82, 0x9d,
0xab, 0x64, 0x7e, 0x52, 0x89, 0xd1, 0x1f, 0x1b, 0x5c, 0x95, 0xe7, 0x7e, 0xdc, 0x74, 0x32, 0x61,
0xc4, 0x79, 0x85, 0x6b, 0xc4, 0xe5, 0x8c, 0xad, 0xc6, 0x8c, 0xed, 0x5b, 0x19, 0x71, 0xc3, 0xdc,
0x20, 0x0b, 0x5d, 0x65, 0x30, 0x12, 0x22, 0xb4, 0xa7, 0x59, 0x7e, 0x1e, 0x7a, 0x4a, 0xab, 0xde,
0xd2, 0x77, 0x46, 0x82, 0x65, 0xe3, 0xb0, 0xa3, 0xa6, 0x67, 0xa4, 0x9d, 0x11, 0xc0, 0xcd, 0x3d,
0x21, 0x42, 0x4f, 0x4b, 0xfb, 0x79, 0x5e, 0xcc, 0xf3, 0x31, 0x05, 0x16, 0x06, 0xd0, 0xd5, 0x3a,
0xbd, 0xcc, 0xc0, 0xde, 0x19, 0x82, 0x7f, 0xbd, 0x1f, 0x04, 0xf0, 0xf4, 0x25, 0x04, 0x96, 0x7c,
0xeb, 0x1b, 0x08, 0x6c, 0xf9, 0x36, 0x01, 0xce, 0xe8, 0x97, 0x03, 0x5e, 0xa2, 0x6b, 0xfb, 0x02,
0x9e, 0x26, 0x32, 0x6e, 0xd5, 0xb6, 0x74, 0xeb, 0x0f, 0xa2, 0xff, 0xa2, 0xd1, 0x6e, 0xae, 0xc9,
0xc2, 0x03, 0x70, 0x15, 0xa9, 0x70, 0xb3, 0xe6, 0xbb, 0x4c, 0xb6, 0x7e, 0xc3, 0x81, 0x47, 0xd6,
0x9e, 0x8d, 0x07, 0xe0, 0xeb, 0xf6, 0x32, 0x4e, 0x18, 0xd6, 0x2f, 0xc7, 0x40, 0x3c, 0x6d, 0xa0,
0xa1, 0xc2, 0xf8, 0x04, 0x1d, 0x43, 0x10, 0x6c, 0xf2, 0xeb, 0x0f, 0x6a, 0x86, 0x55, 0x4e, 0x59,
0xa3, 0xbf, 0x0e, 0xb8, 0xc7, 0xe9, 0xe9, 0x94, 0xf0, 0xb0, 0x9a, 0x2a, 0x36, 0x1c, 0xf3, 0x1d,
0xe3, 0x59, 0x21, 0xa4, 0x25, 0x41, 0xf4, 0x3a, 0x1e, 0x00, 0xb2, 0xc2, 0x61, 0x05, 0xa2, 0xf7,
0xf8, 0x00, 0x90, 0x15, 0xda, 0x5b, 0xb8, 0x0f, 0x6d, 0xf9, 0xf5, 0xb8, 0x67, 0xbe, 0xf5, 0x0d,
0x2e, 0x7f, 0x6e, 0x22, 0x0b, 0x3f, 0x57, 0xac, 0xdd, 0x6c, 0xf8, 0xa7, 0x36, 0x40, 0x5b, 0x4d,
0xe6, 0x0a, 0xe9, 0xd4, 0x53, 0x5f, 0xbe, 0x57, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x53, 0xd8,
0xc9, 0xf6, 0x2d, 0x07, 0x00, 0x00,
}

View File

@ -1,5 +1,5 @@
// Code generated by protoc-gen-micro. DO NOT EDIT.
// source: router.proto
// source: micro/go-micro/router/service/proto/router.proto
package go_micro_router
@ -38,7 +38,6 @@ type RouterService interface {
Watch(ctx context.Context, in *WatchRequest, opts ...client.CallOption) (Router_WatchService, error)
Advertise(ctx context.Context, in *Request, opts ...client.CallOption) (Router_AdvertiseService, error)
Process(ctx context.Context, in *Advert, opts ...client.CallOption) (*ProcessResponse, error)
Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error)
}
type routerService struct {
@ -167,16 +166,6 @@ func (c *routerService) Process(ctx context.Context, in *Advert, opts ...client.
return out, nil
}
func (c *routerService) Status(ctx context.Context, in *Request, opts ...client.CallOption) (*StatusResponse, error) {
req := c.c.NewRequest(c.name, "Router.Status", in)
out := new(StatusResponse)
err := c.c.Call(ctx, req, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Router service
type RouterHandler interface {
@ -184,7 +173,6 @@ type RouterHandler interface {
Watch(context.Context, *WatchRequest, Router_WatchStream) error
Advertise(context.Context, *Request, Router_AdvertiseStream) error
Process(context.Context, *Advert, *ProcessResponse) error
Status(context.Context, *Request, *StatusResponse) error
}
func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.HandlerOption) error {
@ -193,7 +181,6 @@ func RegisterRouterHandler(s server.Server, hdlr RouterHandler, opts ...server.H
Watch(ctx context.Context, stream server.Stream) error
Advertise(ctx context.Context, stream server.Stream) error
Process(ctx context.Context, in *Advert, out *ProcessResponse) error
Status(ctx context.Context, in *Request, out *StatusResponse) error
}
type Router struct {
router
@ -284,10 +271,6 @@ func (h *routerHandler) Process(ctx context.Context, in *Advert, out *ProcessRes
return h.RouterHandler.Process(ctx, in, out)
}
func (h *routerHandler) Status(ctx context.Context, in *Request, out *StatusResponse) error {
return h.RouterHandler.Status(ctx, in, out)
}
// Client API for Table service
type TableService interface {

View File

@ -8,7 +8,6 @@ service Router {
rpc Watch(WatchRequest) returns (stream Event) {};
rpc Advertise(Request) returns (stream Advert) {};
rpc Process(Advert) returns (ProcessResponse) {};
rpc Status(Request) returns (StatusResponse) {};
}
service Table {
@ -129,12 +128,3 @@ message Route {
// the metric / score of this route
int64 metric = 7;
}
message Status {
string code = 1;
string error = 2;
}
message StatusResponse {
Status status = 1;
}

View File

@ -2,7 +2,6 @@ package service
import (
"context"
"errors"
"fmt"
"io"
"sync"
@ -19,7 +18,6 @@ type svc struct {
callOpts []client.CallOption
router pb.RouterService
table *table
status *router.Status
exit chan bool
errChan chan error
advertChan chan *router.Advert
@ -43,16 +41,9 @@ func NewRouter(opts ...router.Option) router.Router {
cli = options.Client
}
// set the status to Stopped
status := &router.Status{
Code: router.Stopped,
Error: nil,
}
// NOTE: should we have Client/Service option in router.Options?
s := &svc{
opts: options,
status: status,
router: pb.NewRouterService(router.DefaultName, cli),
}
@ -98,12 +89,6 @@ func (s *svc) Table() router.Table {
func (s *svc) Start() error {
s.Lock()
defer s.Unlock()
s.status = &router.Status{
Code: router.Running,
Error: nil,
}
return nil
}
@ -169,21 +154,16 @@ func (s *svc) Advertise() (<-chan *router.Advert, error) {
s.Lock()
defer s.Unlock()
switch s.status.Code {
case router.Running, router.Advertising:
stream, err := s.router.Advertise(context.Background(), &pb.Request{}, s.callOpts...)
if err != nil {
return nil, fmt.Errorf("failed getting advert stream: %s", err)
}
// create advertise and event channels
advertChan := make(chan *router.Advert)
go s.advertiseEvents(advertChan, stream)
return advertChan, nil
case router.Stopped:
return nil, fmt.Errorf("not running")
stream, err := s.router.Advertise(context.Background(), &pb.Request{}, s.callOpts...)
if err != nil {
return nil, fmt.Errorf("failed getting advert stream: %s", err)
}
return nil, fmt.Errorf("error: %s", s.status.Error)
// create advertise and event channels
advertChan := make(chan *router.Advert)
go s.advertiseEvents(advertChan, stream)
return advertChan, nil
}
// Process processes incoming adverts
@ -220,55 +200,6 @@ func (s *svc) Process(advert *router.Advert) error {
return nil
}
// Status returns router status
func (s *svc) Status() router.Status {
s.Lock()
defer s.Unlock()
// check if its stopped
select {
case <-s.exit:
return router.Status{
Code: router.Stopped,
Error: nil,
}
default:
// don't block
}
// check the remote router
rsp, err := s.router.Status(context.Background(), &pb.Request{}, s.callOpts...)
if err != nil {
return router.Status{
Code: router.Error,
Error: err,
}
}
code := router.Running
var serr error
switch rsp.Status.Code {
case "running":
code = router.Running
case "advertising":
code = router.Advertising
case "stopped":
code = router.Stopped
case "error":
code = router.Error
}
if len(rsp.Status.Error) > 0 {
serr = errors.New(rsp.Status.Error)
}
return router.Status{
Code: code,
Error: serr,
}
}
// Remote router cannot be stopped
func (s *svc) Stop() error {
s.Lock()