1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

Merge pull request #775 from micro/proxy-watcher

Fix the proxy watcher
This commit is contained in:
Asim Aslam 2019-09-20 16:36:00 +01:00 committed by GitHub
commit 95e4ed8ee9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 18 additions and 8 deletions

View File

@ -8,6 +8,7 @@ import (
"sort" "sort"
"strings" "strings"
"sync" "sync"
"time"
"github.com/micro/go-micro/client" "github.com/micro/go-micro/client"
"github.com/micro/go-micro/codec" "github.com/micro/go-micro/codec"
@ -440,7 +441,16 @@ func NewProxy(opts ...options.Option) proxy.Proxy {
// watch router service routes // watch router service routes
p.errChan = make(chan error, 1) p.errChan = make(chan error, 1)
go p.watchRoutes()
go func() {
// continuously attempt to watch routes
for {
// watch the routes
p.watchRoutes()
// in case of failure just wait a second
time.Sleep(time.Second)
}
}()
return p return p
} }

View File

@ -164,7 +164,7 @@ func (r *Router) Watch(ctx context.Context, req *pb.WatchRequest, stream pb.Rout
for { for {
event, err := watcher.Next() event, err := watcher.Next()
if err == router.ErrWatcherStopped { if err == router.ErrWatcherStopped {
break return errors.InternalServerError("go.micro.router", "watcher stopped")
} }
if err != nil { if err != nil {

View File

@ -357,7 +357,9 @@ func (s *svc) Watch(opts ...router.WatchOption) (router.Watcher, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
var options router.WatchOptions options := router.WatchOptions{
Service: "*",
}
for _, o := range opts { for _, o := range opts {
o(&options) o(&options)
} }

View File

@ -70,11 +70,9 @@ func (w *watcher) watch(stream pb.Router_WatchService) error {
Route: route, Route: route,
} }
for { select {
select { case w.resChan <- event:
case w.resChan <- event: case <-w.done:
case <-w.done:
}
} }
} }