1
0
mirror of https://github.com/go-kratos/kratos.git synced 2024-12-26 20:54:38 +02:00

feat: improve node alg (#3015)

* improve node alg

* fix node

* add weight

* fix lint

---------

Co-authored-by: caoguoliang01 <caoguoliang01@bilibili.com>
This commit is contained in:
longxboy 2023-12-19 19:15:44 +08:00 committed by GitHub
parent 856bc9a17b
commit 85740b179b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 100 additions and 72 deletions

View File

@ -1,11 +1,9 @@
package ewma
import (
"container/list"
"context"
"math"
"net"
"sync"
"sync/atomic"
"time"
@ -17,7 +15,7 @@ const (
// The mean lifetime of `cost`, it reaches its half-life after Tau*ln(2).
tau = int64(time.Millisecond * 600)
// if statistic not collected,we add a big lag penalty to endpoint
penalty = uint64(time.Second * 10)
penalty = uint64(time.Microsecond * 100)
)
var (
@ -33,18 +31,21 @@ type Node struct {
lag int64
success uint64
inflight int64
inflights *list.List
inflights [200]int64
// last collected timestamp
stamp int64
predictTs int64
predict int64
stamp int64
// request number in a period time
reqs int64
// last lastPick timestamp
lastPick int64
errHandler func(err error) (isErr bool)
lk sync.RWMutex
errHandler func(err error) (isErr bool)
cachedWeight *atomic.Value
}
type nodeWeight struct {
value float64
updateAt int64
}
// Builder is ewma node builder.
@ -55,12 +56,12 @@ type Builder struct {
// Build create a weighted node.
func (b *Builder) Build(n selector.Node) selector.WeightedNode {
s := &Node{
Node: n,
lag: 0,
success: 1000,
inflight: 1,
inflights: list.New(),
errHandler: b.ErrHandler,
Node: n,
lag: 0,
success: 1000,
inflight: 1,
errHandler: b.ErrHandler,
cachedWeight: &atomic.Value{},
}
return s
}
@ -72,64 +73,58 @@ func (n *Node) health() uint64 {
func (n *Node) load() (load uint64) {
now := time.Now().UnixNano()
avgLag := atomic.LoadInt64(&n.lag)
lastPredictTs := atomic.LoadInt64(&n.predictTs)
predictInterval := avgLag / 5
if predictInterval < int64(time.Millisecond*5) {
predictInterval = int64(time.Millisecond * 5)
}
if predictInterval > int64(time.Millisecond*200) {
predictInterval = int64(time.Millisecond * 200)
}
if now-lastPredictTs > predictInterval && atomic.CompareAndSwapInt64(&n.predictTs, lastPredictTs, now) {
var (
total int64
count int
predict int64
)
n.lk.RLock()
first := n.inflights.Front()
for first != nil {
lag := now - first.Value.(int64)
if lag > avgLag {
count++
total += lag
}
first = first.Next()
}
if count > (n.inflights.Len()/2 + 1) {
predict = total / int64(count)
}
n.lk.RUnlock()
atomic.StoreInt64(&n.predict, predict)
}
predict := n.predict(avgLag, now)
if avgLag == 0 {
// penalty is the penalty value when there is no data when the node is just started.
// The default value is 1e9 * 10
load = penalty * uint64(atomic.LoadInt64(&n.inflight))
return
}
predict := atomic.LoadInt64(&n.predict)
if predict > avgLag {
avgLag = predict
}
// add 5ms to eliminate the latency gap between different zones
avgLag += int64(time.Millisecond * 5)
avgLag = int64(math.Sqrt(float64(avgLag)))
load = uint64(avgLag) * uint64(atomic.LoadInt64(&n.inflight))
return load
}
func (n *Node) predict(avgLag int64, now int64) (predict int64) {
var (
total int64
slowNum int
totalNum int
)
for i := range n.inflights {
start := atomic.LoadInt64(&n.inflights[i])
if start != 0 {
totalNum++
lag := now - start
if lag > avgLag {
slowNum++
total += lag
}
}
}
if slowNum >= (totalNum/2 + 1) {
predict = total / int64(slowNum)
}
return
}
// Pick pick a node.
func (n *Node) Pick() selector.DoneFunc {
now := time.Now().UnixNano()
atomic.StoreInt64(&n.lastPick, now)
start := time.Now().UnixNano()
atomic.StoreInt64(&n.lastPick, start)
atomic.AddInt64(&n.inflight, 1)
atomic.AddInt64(&n.reqs, 1)
n.lk.Lock()
e := n.inflights.PushBack(now)
n.lk.Unlock()
reqs := atomic.AddInt64(&n.reqs, 1)
slot := reqs % 200
swapped := atomic.CompareAndSwapInt64(&n.inflights[slot], 0, start)
return func(ctx context.Context, di selector.DoneInfo) {
n.lk.Lock()
n.inflights.Remove(e)
n.lk.Unlock()
if swapped {
atomic.CompareAndSwapInt64(&n.inflights[slot], start, 0)
}
atomic.AddInt64(&n.inflight, -1)
now := time.Now().UnixNano()
@ -141,7 +136,6 @@ func (n *Node) Pick() selector.DoneFunc {
}
w := math.Exp(float64(-td) / float64(tau))
start := e.Value.(int64)
lag := now - start
if lag < 0 {
lag = 0
@ -172,7 +166,19 @@ func (n *Node) Pick() selector.DoneFunc {
// Weight is node effective weight.
func (n *Node) Weight() (weight float64) {
weight = float64(n.health()*uint64(time.Second)) / float64(n.load())
w, ok := n.cachedWeight.Load().(*nodeWeight)
now := time.Now().UnixNano()
if !ok || time.Duration(now-w.updateAt) > (time.Millisecond*5) {
health := n.health()
load := n.load()
weight = float64(health*uint64(time.Microsecond)*10) / float64(load)
n.cachedWeight.Store(&nodeWeight{
value: weight,
updateAt: now,
})
} else {
weight = w.value
}
return
}

View File

@ -36,16 +36,16 @@ func TestDirect(t *testing.T) {
t.Errorf("done2 is equal to nil")
}
time.Sleep(time.Millisecond * 10)
time.Sleep(time.Millisecond * 15)
done(context.Background(), selector.DoneInfo{})
if float64(30000) >= wn.Weight() {
if float64(70) >= wn.Weight() {
t.Errorf("float64(30000) >= wn.Weight()(%v)", wn.Weight())
}
if float64(60000) <= wn.Weight() {
t.Errorf("float64(60000) <= wn.Weight()(%v)", wn.Weight())
if float64(1200) <= wn.Weight() {
t.Errorf("float64(1000) <= wn.Weight()(%v)", wn.Weight())
}
if time.Millisecond*15 <= wn.PickElapsed() {
t.Errorf("time.Millisecond*15 <= wn.PickElapsed()(%v)", wn.PickElapsed())
if time.Millisecond*30 <= wn.PickElapsed() {
t.Errorf("time.Millisecond*30 <= wn.PickElapsed()(%v)", wn.PickElapsed())
}
if time.Millisecond*5 >= wn.PickElapsed() {
t.Errorf("time.Millisecond*5 >= wn.PickElapsed()(%v)", wn.PickElapsed())
@ -77,11 +77,11 @@ func TestDirectError(t *testing.T) {
time.Sleep(time.Millisecond * 20)
done(context.Background(), selector.DoneInfo{Err: err})
}
if float64(30000) >= wn.Weight() {
t.Errorf("float64(30000) >= wn.Weight()(%v)", wn.Weight())
if float64(1000) >= wn.Weight() {
t.Errorf("float64(1000) >= wn.Weight()(%v)", wn.Weight())
}
if float64(60000) <= wn.Weight() {
t.Errorf("float64(60000) <= wn.Weight()(%v)", wn.Weight())
if float64(2000) <= wn.Weight() {
t.Errorf("float64(2000) <= wn.Weight()(%v)", wn.Weight())
}
}
@ -118,10 +118,32 @@ func TestDirectErrorHandler(t *testing.T) {
time.Sleep(time.Millisecond * 20)
done(context.Background(), selector.DoneInfo{Err: err})
}
if float64(30000) >= wn.Weight() {
t.Errorf("float64(30000) >= wn.Weight()(%v)", wn.Weight())
if float64(1000) >= wn.Weight() {
t.Errorf("float64(100) >= wn.Weight()(%v)", wn.Weight())
}
if float64(60000) <= wn.Weight() {
t.Errorf("float64(60000) <= wn.Weight()(%v)", wn.Weight())
if float64(2000) <= wn.Weight() {
t.Errorf("float64(200) <= wn.Weight()(%v)", wn.Weight())
}
}
func BenchmarkPickAndWeight(b *testing.B) {
bu := &Builder{}
node := bu.Build(selector.NewNode(
"http",
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Name: "helloworld",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:9090"},
Metadata: map[string]string{"weight": "10"},
}))
di := selector.DoneInfo{}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
done := node.Pick()
node.Weight()
done(context.Background(), di)
}
})
}