From 3daca16dc520fcac73ca3bb6a07047f0a07fce39 Mon Sep 17 00:00:00 2001
From: HeXiaoliang <hel13140302@126.com>
Date: Wed, 17 Feb 2021 16:41:38 +0800
Subject: [PATCH] fix: bbr limiter maxPass minRt cache problem (#690)

---
 pkg/ratelimit/bbr/bbr.go      | 62 +++++++++++++++++++++++++----------
 pkg/ratelimit/bbr/bbr_test.go | 41 +++++++++++++++++++++--
 2 files changed, 83 insertions(+), 20 deletions(-)

diff --git a/pkg/ratelimit/bbr/bbr.go b/pkg/ratelimit/bbr/bbr.go
index 7bc898b3c..738336e9d 100644
--- a/pkg/ratelimit/bbr/bbr.go
+++ b/pkg/ratelimit/bbr/bbr.go
@@ -71,11 +71,20 @@ type BBR struct {
 	rtStat          metric.RollingCounter
 	inFlight        int64
 	winBucketPerSec int64
+	bucketDuration  time.Duration
+	winSize         int
 	conf            *Config
 	prevDrop        atomic.Value
-	prevDropHit     int32
-	rawMaxPASS      int64
-	rawMinRt        int64
+	maxPASSCache    atomic.Value
+	minRtCache      atomic.Value
+}
+
+// CounterCache is used to cache maxPASS and minRt result.
+// Value of current bucket is not counted in real time.
+// Cache time is equal to a bucket duration.
+type CounterCache struct {
+	val  int64
+	time time.Time
 }
 
 // Config contains configs of bbr limiter.
@@ -89,11 +98,14 @@ type Config struct {
 }
 
 func (l *BBR) maxPASS() int64 {
-	rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS)
-	if rawMaxPass > 0 && l.passStat.Timespan() < 1 {
-		return rawMaxPass
+	passCache := l.maxPASSCache.Load()
+	if passCache != nil {
+		ps := passCache.(*CounterCache)
+		if l.timespan(ps.time) < 1 {
+			return ps.val
+		}
 	}
-	rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
+	rawMaxPass := int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
 		var result = 1.0
 		for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
 			bucket := iterator.Bucket()
@@ -108,16 +120,30 @@ func (l *BBR) maxPASS() int64 {
 	if rawMaxPass == 0 {
 		rawMaxPass = 1
 	}
-	atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass)
+	l.maxPASSCache.Store(&CounterCache{
+		val:  rawMaxPass,
+		time: time.Now(),
+	})
 	return rawMaxPass
 }
 
-func (l *BBR) minRT() int64 {
-	rawMinRT := atomic.LoadInt64(&l.rawMinRt)
-	if rawMinRT > 0 && l.rtStat.Timespan() < 1 {
-		return rawMinRT
+func (l *BBR) timespan(lastTime time.Time) int {
+	v := int(time.Since(lastTime) / l.bucketDuration)
+	if v > -1 {
+		return v
 	}
-	rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
+	return l.winSize
+}
+
+func (l *BBR) minRT() int64 {
+	rtCache := l.minRtCache.Load()
+	if rtCache != nil {
+		rc := rtCache.(*CounterCache)
+		if l.timespan(rc.time) < 1 {
+			return rc.val
+		}
+	}
+	rawMinRT := int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
 		var result = math.MaxFloat64
 		for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
 			bucket := iterator.Bucket()
@@ -136,7 +162,10 @@ func (l *BBR) minRT() int64 {
 	if rawMinRT <= 0 {
 		rawMinRT = 1
 	}
-	atomic.StoreInt64(&l.rawMinRt, rawMinRT)
+	l.minRtCache.Store(&CounterCache{
+		val:  rawMinRT,
+		time: time.Now(),
+	})
 	return rawMinRT
 }
 
@@ -151,9 +180,6 @@ func (l *BBR) shouldDrop() bool {
 			return false
 		}
 		if time.Since(initTime)-prevDrop <= time.Second {
-			if atomic.LoadInt32(&l.prevDropHit) == 0 {
-				atomic.StoreInt32(&l.prevDropHit, 1)
-			}
 			inFlight := atomic.LoadInt64(&l.inFlight)
 			return inFlight > 1 && inFlight > l.maxFlight()
 		}
@@ -226,6 +252,8 @@ func newLimiter(conf *Config) limit.Limiter {
 		passStat:        passStat,
 		rtStat:          rtStat,
 		winBucketPerSec: int64(time.Second) / (int64(conf.Window) / int64(conf.WinBucket)),
+		bucketDuration:  bucketDuration,
+		winSize:         conf.WinBucket,
 	}
 	return limiter
 }
diff --git a/pkg/ratelimit/bbr/bbr_test.go b/pkg/ratelimit/bbr/bbr_test.go
index a807d8525..93345e5ff 100644
--- a/pkg/ratelimit/bbr/bbr_test.go
+++ b/pkg/ratelimit/bbr/bbr_test.go
@@ -86,19 +86,33 @@ func TestBBRMaxPass(t *testing.T) {
 	assert.Equal(t, int64(1), bbr.maxPASS())
 }
 
+func TestBBRMaxPassWithCache(t *testing.T) {
+	bucketDuration := time.Millisecond * 100
+	bbr := newLimiter(confForTest()).(*BBR)
+	// witch cache, value of latest bucket is not counted instently.
+	// after a bucket duration time, this bucket will be fullly counted.
+	for i := 1; i <= 11; i++ {
+		bbr.passStat.Add(int64(i * 50))
+		time.Sleep(bucketDuration / 2)
+		_ = bbr.maxPASS()
+		bbr.passStat.Add(int64(i * 50))
+		time.Sleep(bucketDuration / 2)
+	}
+	bbr.passStat.Add(int64(1))
+	assert.Equal(t, int64(1000), bbr.maxPASS())
+}
+
 func TestBBRMinRt(t *testing.T) {
 	bucketDuration := time.Millisecond * 100
 	bbr := newLimiter(confForTest()).(*BBR)
-	rtStat := metric.NewRollingCounter(metric.RollingCounterOpts{Size: 10, BucketDuration: bucketDuration})
 	for i := 0; i < 10; i++ {
 		for j := i*10 + 1; j <= i*10+10; j++ {
-			rtStat.Add(int64(j))
+			bbr.rtStat.Add(int64(j))
 		}
 		if i != 9 {
 			time.Sleep(bucketDuration)
 		}
 	}
-	bbr.rtStat = rtStat
 	assert.Equal(t, int64(6), bbr.minRT())
 
 	// default max min rt is equal to maxFloat64.
@@ -108,6 +122,27 @@ func TestBBRMinRt(t *testing.T) {
 	assert.Equal(t, int64(1), bbr.minRT())
 }
 
+func TestBBRMinRtWithCache(t *testing.T) {
+	bucketDuration := time.Millisecond * 100
+	bbr := newLimiter(confForTest()).(*BBR)
+	for i := 0; i < 10; i++ {
+		for j := i*10 + 1; j <= i*10+5; j++ {
+			bbr.rtStat.Add(int64(j))
+		}
+		if i != 9 {
+			time.Sleep(bucketDuration / 2)
+		}
+		_ = bbr.minRT()
+		for j := i*10 + 6; j <= i*10+10; j++ {
+			bbr.rtStat.Add(int64(j))
+		}
+		if i != 9 {
+			time.Sleep(bucketDuration / 2)
+		}
+	}
+	assert.Equal(t, int64(6), bbr.minRT())
+}
+
 func TestBBRMaxQps(t *testing.T) {
 	bbr := newLimiter(confForTest()).(*BBR)
 	bucketDuration := time.Millisecond * 100