1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-01-14 02:33:03 +02:00

fix(registry/consul):fix can't get service instance in async mode (#1731)

* fix(registry/consul):fix can't get service instance in async mode

* fix(registry/consul): return error if find service failed
This commit is contained in:
letian 2022-01-01 11:48:38 +08:00 committed by GitHub
parent 17201cd284
commit 1e749de5db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -144,33 +144,42 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
}
if !ok {
go r.resolve(set)
err := r.resolve(set)
if err != nil {
return nil, err
}
}
return w, nil
}
func (r *Registry) resolve(ss *serviceSet) {
func (r *Registry) resolve(ss *serviceSet) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
services, idx, err := r.cli.Service(ctx, ss.serviceName, 0, true)
cancel()
if err == nil && len(services) > 0 {
if err != nil {
return err
} else if len(services) > 0 {
ss.broadcast(services)
}
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
<-ticker.C
ctx, cancel := context.WithTimeout(context.Background(), time.Second*120)
tmpService, tmpIdx, err := r.cli.Service(ctx, ss.serviceName, idx, true)
cancel()
if err != nil {
time.Sleep(time.Second)
continue
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
<-ticker.C
ctx, cancel := context.WithTimeout(context.Background(), time.Second*120)
tmpService, tmpIdx, err := r.cli.Service(ctx, ss.serviceName, idx, true)
cancel()
if err != nil {
time.Sleep(time.Second)
continue
}
if len(tmpService) != 0 && tmpIdx != idx {
services = tmpService
ss.broadcast(services)
}
idx = tmpIdx
}
if len(tmpService) != 0 && tmpIdx != idx {
services = tmpService
ss.broadcast(services)
}
idx = tmpIdx
}
}()
return nil
}