mirror of
https://github.com/go-kratos/kratos.git
synced 2025-02-13 13:48:51 +02:00
fix resolver subset bug
This commit is contained in:
parent
a6cc978bdf
commit
7f56c6bed2
@ -1,5 +1,8 @@
|
||||
### net/rpc/warden
|
||||
|
||||
##### Version 1.1.18
|
||||
1. 修复resolver过滤导致的子集bug
|
||||
|
||||
##### Version 1.1.17
|
||||
1. 移除 bbr feature flag,默认开启自适应限流
|
||||
|
||||
|
@ -135,16 +135,43 @@ func (r *Resolver) updateproc() {
|
||||
instances = append(instances, value...)
|
||||
}
|
||||
}
|
||||
if r.subsetSize > 0 && len(instances) > 0 {
|
||||
instances = r.subset(instances, env.Hostname, r.subsetSize)
|
||||
}
|
||||
if len(instances) > 0 {
|
||||
r.newAddress(instances)
|
||||
}
|
||||
r.newAddress(r.filter(instances))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Resolver) filter(backends []*naming.Instance) (instances []*naming.Instance) {
|
||||
for _, ins := range backends {
|
||||
//如果r.clusters的长度大于0说明需要进行集群选择
|
||||
if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok && len(r.clusters) > 0 {
|
||||
continue
|
||||
}
|
||||
var addr string
|
||||
for _, a := range ins.Addrs {
|
||||
u, err := url.Parse(a)
|
||||
if err == nil && u.Scheme == Scheme {
|
||||
addr = u.Host
|
||||
}
|
||||
}
|
||||
if addr == "" {
|
||||
fmt.Fprintf(os.Stderr, "resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
|
||||
log.Warn("resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
|
||||
continue
|
||||
}
|
||||
instances = append(instances, ins)
|
||||
}
|
||||
if len(instances) == 0 {
|
||||
for _, bkend := range backends {
|
||||
log.Warn("resolver: backends(%d) invalid instance:%v", len(backends), bkend)
|
||||
}
|
||||
return
|
||||
}
|
||||
if r.subsetSize > 0 {
|
||||
instances = r.subset(instances, env.Hostname, r.subsetSize)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (r *Resolver) subset(backends []*naming.Instance, clientID string, size int64) []*naming.Instance {
|
||||
if len(backends) <= int(size) {
|
||||
return backends
|
||||
@ -172,12 +199,6 @@ func (r *Resolver) newAddress(instances []*naming.Instance) {
|
||||
}
|
||||
addrs := make([]resolver.Address, 0, len(instances))
|
||||
for _, ins := range instances {
|
||||
if len(r.clusters) > 0 {
|
||||
if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
var weight int64
|
||||
if weight, _ = strconv.ParseInt(ins.Metadata[naming.MetaWeight], 10, 64); weight <= 0 {
|
||||
weight = 10
|
||||
@ -189,11 +210,6 @@ func (r *Resolver) newAddress(instances []*naming.Instance) {
|
||||
rpc = u.Host
|
||||
}
|
||||
}
|
||||
if rpc == "" {
|
||||
fmt.Fprintf(os.Stderr, "warden/resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
|
||||
log.Warn("warden/resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
|
||||
continue
|
||||
}
|
||||
addr := resolver.Address{
|
||||
Addr: rpc,
|
||||
Type: resolver.Backend,
|
||||
@ -202,5 +218,6 @@ func (r *Resolver) newAddress(instances []*naming.Instance) {
|
||||
}
|
||||
addrs = append(addrs, addr)
|
||||
}
|
||||
log.Info("resolver: finally get %d instances", len(addrs))
|
||||
r.cc.NewAddress(addrs)
|
||||
}
|
||||
|
125
pkg/net/rpc/warden/resolver/resolver_test.go
Normal file
125
pkg/net/rpc/warden/resolver/resolver_test.go
Normal file
@ -0,0 +1,125 @@
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/bilibili/kratos/pkg/naming"
|
||||
)
|
||||
|
||||
func Test_FilterLittle(t *testing.T) {
|
||||
var backs []*naming.Instance
|
||||
for i := 0; i < 3; i++ {
|
||||
backs = append(backs, &naming.Instance{
|
||||
Zone: "sh1",
|
||||
Env: "prod",
|
||||
AppID: "2233",
|
||||
Hostname: fmt.Sprintf("linux-%d", i),
|
||||
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
|
||||
LastTs: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
r := &Resolver{
|
||||
quit: make(chan struct{}, 1),
|
||||
zone: "sh1",
|
||||
subsetSize: 50,
|
||||
}
|
||||
|
||||
if len(r.filter(backs)) != 3 {
|
||||
t.Fatalf("backends length must be 3")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_FilterBig(t *testing.T) {
|
||||
var backs []*naming.Instance
|
||||
for i := 0; i < 100; i++ {
|
||||
backs = append(backs, &naming.Instance{
|
||||
Zone: "sh1",
|
||||
Env: "prod",
|
||||
AppID: "2233",
|
||||
Hostname: fmt.Sprintf("linux-%d", i),
|
||||
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
|
||||
LastTs: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
r := &Resolver{
|
||||
quit: make(chan struct{}, 1),
|
||||
zone: "sh1",
|
||||
subsetSize: 50,
|
||||
}
|
||||
|
||||
if len(r.filter(backs)) != 50 {
|
||||
t.Fatalf("backends length must be 50")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_FilterNone(t *testing.T) {
|
||||
var backs []*naming.Instance
|
||||
for i := 0; i < 100; i++ {
|
||||
backs = append(backs, &naming.Instance{
|
||||
Zone: "sh1",
|
||||
Env: "prod",
|
||||
AppID: "2233",
|
||||
Metadata: map[string]string{naming.MetaCluster: "c1"},
|
||||
Hostname: fmt.Sprintf("linux-%d", i),
|
||||
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
|
||||
LastTs: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
r := &Resolver{
|
||||
quit: make(chan struct{}, 1),
|
||||
zone: "sh1",
|
||||
subsetSize: 50,
|
||||
clusters: map[string]struct{}{"c2": struct{}{}},
|
||||
}
|
||||
|
||||
if len(r.filter(backs)) != 0 {
|
||||
t.Fatalf("backends length must be 0")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_FilterSome(t *testing.T) {
|
||||
var backs []*naming.Instance
|
||||
for i := 0; i < 40; i++ {
|
||||
backs = append(backs, &naming.Instance{
|
||||
Zone: "sh1",
|
||||
Env: "prod",
|
||||
AppID: "2233",
|
||||
Metadata: map[string]string{naming.MetaCluster: "c1"},
|
||||
Hostname: fmt.Sprintf("linux-%d", i),
|
||||
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
|
||||
LastTs: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
for i := 50; i < 150; i++ {
|
||||
backs = append(backs, &naming.Instance{
|
||||
Zone: "sh1",
|
||||
Env: "prod",
|
||||
AppID: "2233",
|
||||
Metadata: map[string]string{naming.MetaCluster: "c2"},
|
||||
Hostname: fmt.Sprintf("linux-%d", i),
|
||||
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
|
||||
LastTs: time.Now().Unix(),
|
||||
})
|
||||
}
|
||||
r := &Resolver{
|
||||
quit: make(chan struct{}, 1),
|
||||
zone: "sh1",
|
||||
subsetSize: 50,
|
||||
clusters: map[string]struct{}{"c2": struct{}{}},
|
||||
}
|
||||
if len(r.filter(backs)) != 50 {
|
||||
t.Fatalf("backends length must be 0")
|
||||
}
|
||||
|
||||
r2 := &Resolver{
|
||||
quit: make(chan struct{}, 1),
|
||||
zone: "sh1",
|
||||
subsetSize: 50,
|
||||
clusters: map[string]struct{}{"c1": struct{}{}},
|
||||
}
|
||||
if len(r2.filter(backs)) != 40 {
|
||||
t.Fatalf("backends length must be 0")
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user