1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-01-24 03:46:37 +02:00

Merge pull request #130 from bilibili/warden/p2c_filter

add filter for p2c
This commit is contained in:
Tony 2019-06-03 10:28:24 +08:00 committed by GitHub
commit d0d18804b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 12 deletions

View File

@ -1,6 +1,9 @@
### net/rpc/warden ### net/rpc/warden
##### Version 1.1.12 ##### Version 1.1.14
1. p2c balancer增加filter保护
##### Version 1.1.13
1. 设置 caller 为 no_user 如果 user 不存在 1. 设置 caller 为 no_user 如果 user 不存在
##### Version 1.1.12 ##### Version 1.1.12

View File

@ -67,10 +67,24 @@ type subConn struct {
reqs int64 reqs int64
} }
func (sc *subConn) valid() bool {
return sc.health() > 500 && atomic.LoadUint64(&sc.svrCPU) < 900
}
func (sc *subConn) health() uint64 { func (sc *subConn) health() uint64 {
return atomic.LoadUint64(&sc.success) return atomic.LoadUint64(&sc.success)
} }
func (sc *subConn) load() uint64 {
lag := uint64(math.Sqrt(float64(atomic.LoadUint64(&sc.lag))) + 1)
load := atomic.LoadUint64(&sc.svrCPU) * lag * uint64(atomic.LoadInt64(&sc.inflight))
if load == 0 {
// penalty是初始化没有数据时的惩罚值,默认为1e9 * 250
load = penalty
}
return load
}
func (sc *subConn) cost() uint64 { func (sc *subConn) cost() uint64 {
load := atomic.LoadUint64(&sc.svrCPU) * atomic.LoadUint64(&sc.lag) * uint64(atomic.LoadInt64(&sc.inflight)) load := atomic.LoadUint64(&sc.svrCPU) * atomic.LoadUint64(&sc.lag) * uint64(atomic.LoadInt64(&sc.inflight))
if load == 0 { if load == 0 {
@ -155,6 +169,24 @@ func (p *p2cPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balanc
return p.pick(ctx, opts) return p.pick(ctx, opts)
} }
// choose two distinct nodes
func (p *p2cPicker) prePick() (nodeA *subConn, nodeB *subConn) {
for i := 0; i < 3; i++ {
p.lk.Lock()
a := p.r.Intn(len(p.subConns))
b := p.r.Intn(len(p.subConns) - 1)
p.lk.Unlock()
if b >= a {
b = b + 1
}
nodeA, nodeB = p.subConns[a], p.subConns[b]
if nodeA.valid() || nodeB.valid() {
break
}
}
return
}
func (p *p2cPicker) pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { func (p *p2cPicker) pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
var pc, upc *subConn var pc, upc *subConn
start := time.Now().UnixNano() start := time.Now().UnixNano()
@ -164,17 +196,9 @@ func (p *p2cPicker) pick(ctx context.Context, opts balancer.PickOptions) (balanc
} else if len(p.subConns) == 1 { } else if len(p.subConns) == 1 {
pc = p.subConns[0] pc = p.subConns[0]
} else { } else {
// choose two distinct nodes nodeA, nodeB := p.prePick()
p.lk.Lock()
a := p.r.Intn(len(p.subConns))
b := p.r.Intn(len(p.subConns) - 1)
p.lk.Unlock()
if b >= a {
b = b + 1
}
nodeA, nodeB := p.subConns[a], p.subConns[b]
// meta.Weight为服务发布者在disocvery中设置的权重 // meta.Weight为服务发布者在disocvery中设置的权重
if nodeA.cost()*nodeB.health()*nodeB.meta.Weight > nodeB.cost()*nodeA.health()*nodeA.meta.Weight { if nodeA.load()*nodeB.health()*nodeB.meta.Weight > nodeB.load()*nodeA.health()*nodeA.meta.Weight {
pc, upc = nodeB, nodeA pc, upc = nodeB, nodeA
} else { } else {
pc, upc = nodeA, nodeB pc, upc = nodeA, nodeB
@ -258,7 +282,7 @@ func (p *p2cPicker) printStats() {
stat.inflight = atomic.LoadInt64(&conn.inflight) stat.inflight = atomic.LoadInt64(&conn.inflight)
stat.lantency = atomic.LoadUint64(&conn.lag) stat.lantency = atomic.LoadUint64(&conn.lag)
stat.reqs = atomic.SwapInt64(&conn.reqs, 0) stat.reqs = atomic.SwapInt64(&conn.reqs, 0)
load := stat.cpu * uint64(stat.inflight) * stat.lantency load := conn.load()
if load != 0 { if load != 0 {
stat.score = float64(stat.cs*conn.meta.Weight*1e8) / float64(load) stat.score = float64(stat.cs*conn.meta.Weight*1e8) / float64(load)
} }