diff --git a/selector/node/ewma/node.go b/selector/node/ewma/node.go index c718f2525..540a0bb3c 100644 --- a/selector/node/ewma/node.go +++ b/selector/node/ewma/node.go @@ -1,11 +1,9 @@ package ewma import ( - "container/list" "context" "math" "net" - "sync" "sync/atomic" "time" @@ -17,7 +15,7 @@ const ( // The mean lifetime of `cost`, it reaches its half-life after Tau*ln(2). tau = int64(time.Millisecond * 600) // if statistic not collected,we add a big lag penalty to endpoint - penalty = uint64(time.Second * 10) + penalty = uint64(time.Microsecond * 100) ) var ( @@ -33,18 +31,21 @@ type Node struct { lag int64 success uint64 inflight int64 - inflights *list.List + inflights [200]int64 // last collected timestamp - stamp int64 - predictTs int64 - predict int64 + stamp int64 // request number in a period time reqs int64 // last lastPick timestamp lastPick int64 - errHandler func(err error) (isErr bool) - lk sync.RWMutex + errHandler func(err error) (isErr bool) + cachedWeight *atomic.Value +} + +type nodeWeight struct { + value float64 + updateAt int64 } // Builder is ewma node builder. @@ -55,12 +56,12 @@ type Builder struct { // Build create a weighted node. func (b *Builder) Build(n selector.Node) selector.WeightedNode { s := &Node{ - Node: n, - lag: 0, - success: 1000, - inflight: 1, - inflights: list.New(), - errHandler: b.ErrHandler, + Node: n, + lag: 0, + success: 1000, + inflight: 1, + errHandler: b.ErrHandler, + cachedWeight: &atomic.Value{}, } return s } @@ -72,64 +73,58 @@ func (n *Node) health() uint64 { func (n *Node) load() (load uint64) { now := time.Now().UnixNano() avgLag := atomic.LoadInt64(&n.lag) - lastPredictTs := atomic.LoadInt64(&n.predictTs) - predictInterval := avgLag / 5 - if predictInterval < int64(time.Millisecond*5) { - predictInterval = int64(time.Millisecond * 5) - } - if predictInterval > int64(time.Millisecond*200) { - predictInterval = int64(time.Millisecond * 200) - } - if now-lastPredictTs > predictInterval && atomic.CompareAndSwapInt64(&n.predictTs, lastPredictTs, now) { - var ( - total int64 - count int - predict int64 - ) - n.lk.RLock() - first := n.inflights.Front() - for first != nil { - lag := now - first.Value.(int64) - if lag > avgLag { - count++ - total += lag - } - first = first.Next() - } - if count > (n.inflights.Len()/2 + 1) { - predict = total / int64(count) - } - n.lk.RUnlock() - atomic.StoreInt64(&n.predict, predict) - } + predict := n.predict(avgLag, now) if avgLag == 0 { // penalty is the penalty value when there is no data when the node is just started. - // The default value is 1e9 * 10 load = penalty * uint64(atomic.LoadInt64(&n.inflight)) return } - predict := atomic.LoadInt64(&n.predict) if predict > avgLag { avgLag = predict } + // add 5ms to eliminate the latency gap between different zones + avgLag += int64(time.Millisecond * 5) + avgLag = int64(math.Sqrt(float64(avgLag))) load = uint64(avgLag) * uint64(atomic.LoadInt64(&n.inflight)) + return load +} + +func (n *Node) predict(avgLag int64, now int64) (predict int64) { + var ( + total int64 + slowNum int + totalNum int + ) + for i := range n.inflights { + start := atomic.LoadInt64(&n.inflights[i]) + if start != 0 { + totalNum++ + lag := now - start + if lag > avgLag { + slowNum++ + total += lag + } + } + } + if slowNum >= (totalNum/2 + 1) { + predict = total / int64(slowNum) + } return } // Pick pick a node. func (n *Node) Pick() selector.DoneFunc { - now := time.Now().UnixNano() - atomic.StoreInt64(&n.lastPick, now) + start := time.Now().UnixNano() + atomic.StoreInt64(&n.lastPick, start) atomic.AddInt64(&n.inflight, 1) - atomic.AddInt64(&n.reqs, 1) - n.lk.Lock() - e := n.inflights.PushBack(now) - n.lk.Unlock() + reqs := atomic.AddInt64(&n.reqs, 1) + slot := reqs % 200 + swapped := atomic.CompareAndSwapInt64(&n.inflights[slot], 0, start) return func(ctx context.Context, di selector.DoneInfo) { - n.lk.Lock() - n.inflights.Remove(e) - n.lk.Unlock() + if swapped { + atomic.CompareAndSwapInt64(&n.inflights[slot], start, 0) + } atomic.AddInt64(&n.inflight, -1) now := time.Now().UnixNano() @@ -141,7 +136,6 @@ func (n *Node) Pick() selector.DoneFunc { } w := math.Exp(float64(-td) / float64(tau)) - start := e.Value.(int64) lag := now - start if lag < 0 { lag = 0 @@ -172,7 +166,19 @@ func (n *Node) Pick() selector.DoneFunc { // Weight is node effective weight. func (n *Node) Weight() (weight float64) { - weight = float64(n.health()*uint64(time.Second)) / float64(n.load()) + w, ok := n.cachedWeight.Load().(*nodeWeight) + now := time.Now().UnixNano() + if !ok || time.Duration(now-w.updateAt) > (time.Millisecond*5) { + health := n.health() + load := n.load() + weight = float64(health*uint64(time.Microsecond)*10) / float64(load) + n.cachedWeight.Store(&nodeWeight{ + value: weight, + updateAt: now, + }) + } else { + weight = w.value + } return } diff --git a/selector/node/ewma/node_test.go b/selector/node/ewma/node_test.go index d930495c3..7f4d59b8f 100644 --- a/selector/node/ewma/node_test.go +++ b/selector/node/ewma/node_test.go @@ -36,16 +36,16 @@ func TestDirect(t *testing.T) { t.Errorf("done2 is equal to nil") } - time.Sleep(time.Millisecond * 10) + time.Sleep(time.Millisecond * 15) done(context.Background(), selector.DoneInfo{}) - if float64(30000) >= wn.Weight() { + if float64(70) >= wn.Weight() { t.Errorf("float64(30000) >= wn.Weight()(%v)", wn.Weight()) } - if float64(60000) <= wn.Weight() { - t.Errorf("float64(60000) <= wn.Weight()(%v)", wn.Weight()) + if float64(1200) <= wn.Weight() { + t.Errorf("float64(1000) <= wn.Weight()(%v)", wn.Weight()) } - if time.Millisecond*15 <= wn.PickElapsed() { - t.Errorf("time.Millisecond*15 <= wn.PickElapsed()(%v)", wn.PickElapsed()) + if time.Millisecond*30 <= wn.PickElapsed() { + t.Errorf("time.Millisecond*30 <= wn.PickElapsed()(%v)", wn.PickElapsed()) } if time.Millisecond*5 >= wn.PickElapsed() { t.Errorf("time.Millisecond*5 >= wn.PickElapsed()(%v)", wn.PickElapsed()) @@ -77,11 +77,11 @@ func TestDirectError(t *testing.T) { time.Sleep(time.Millisecond * 20) done(context.Background(), selector.DoneInfo{Err: err}) } - if float64(30000) >= wn.Weight() { - t.Errorf("float64(30000) >= wn.Weight()(%v)", wn.Weight()) + if float64(1000) >= wn.Weight() { + t.Errorf("float64(1000) >= wn.Weight()(%v)", wn.Weight()) } - if float64(60000) <= wn.Weight() { - t.Errorf("float64(60000) <= wn.Weight()(%v)", wn.Weight()) + if float64(2000) <= wn.Weight() { + t.Errorf("float64(2000) <= wn.Weight()(%v)", wn.Weight()) } } @@ -118,10 +118,32 @@ func TestDirectErrorHandler(t *testing.T) { time.Sleep(time.Millisecond * 20) done(context.Background(), selector.DoneInfo{Err: err}) } - if float64(30000) >= wn.Weight() { - t.Errorf("float64(30000) >= wn.Weight()(%v)", wn.Weight()) + if float64(1000) >= wn.Weight() { + t.Errorf("float64(100) >= wn.Weight()(%v)", wn.Weight()) } - if float64(60000) <= wn.Weight() { - t.Errorf("float64(60000) <= wn.Weight()(%v)", wn.Weight()) + if float64(2000) <= wn.Weight() { + t.Errorf("float64(200) <= wn.Weight()(%v)", wn.Weight()) } } + +func BenchmarkPickAndWeight(b *testing.B) { + bu := &Builder{} + node := bu.Build(selector.NewNode( + "http", + "127.0.0.1:9090", + ®istry.ServiceInstance{ + ID: "127.0.0.1:9090", + Name: "helloworld", + Version: "v1.0.0", + Endpoints: []string{"http://127.0.0.1:9090"}, + Metadata: map[string]string{"weight": "10"}, + })) + di := selector.DoneInfo{} + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + done := node.Pick() + node.Weight() + done(context.Background(), di) + } + }) +}