From 75348b05dbbdf3e3f2846b7934fe5322a77fa1b0 Mon Sep 17 00:00:00 2001
From: mikellxy <37923310+mikellxy@users.noreply.github.com>
Date: Tue, 26 Nov 2019 10:07:24 +0800
Subject: [PATCH] opt last append time of rolling policy (#430)

* opt last append time of rolling policy

* add test of rolling policy add

* modify test cases
---
 pkg/stat/metric/rolling_policy.go      |  7 ++-
 pkg/stat/metric/rolling_policy_test.go | 67 ++++++++++++++++++++++++++
 2 files changed, 72 insertions(+), 2 deletions(-)
 create mode 100644 pkg/stat/metric/rolling_policy_test.go

diff --git a/pkg/stat/metric/rolling_policy.go b/pkg/stat/metric/rolling_policy.go
index a474e0fa5..13e5516b2 100644
--- a/pkg/stat/metric/rolling_policy.go
+++ b/pkg/stat/metric/rolling_policy.go
@@ -38,7 +38,7 @@ func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy {
 
 func (r *RollingPolicy) timespan() int {
 	v := int(time.Since(r.lastAppendTime) / r.bucketDuration)
-	if v < r.size && v > -1 { // maybe time backwards
+	if v > -1 { // maybe time backwards
 		return v
 	}
 	return r.size
@@ -48,9 +48,13 @@ func (r *RollingPolicy) add(f func(offset int, val float64), val float64) {
 	r.mu.Lock()
 	timespan := r.timespan()
 	if timespan > 0 {
+		r.lastAppendTime = r.lastAppendTime.Add(time.Duration(timespan * int(r.bucketDuration)))
 		offset := r.offset
 		// reset the expired buckets
 		s := offset + 1
+		if timespan > r.size {
+			timespan = r.size
+		}
 		e, e1 := s+timespan, 0 // e: reset offset must start from offset+1
 		if e > r.size {
 			e1 = e - r.size
@@ -65,7 +69,6 @@ func (r *RollingPolicy) add(f func(offset int, val float64), val float64) {
 			offset = i
 		}
 		r.offset = offset
-		r.lastAppendTime = time.Now()
 	}
 	f(r.offset, val)
 	r.mu.Unlock()
diff --git a/pkg/stat/metric/rolling_policy_test.go b/pkg/stat/metric/rolling_policy_test.go
new file mode 100644
index 000000000..2792605f6
--- /dev/null
+++ b/pkg/stat/metric/rolling_policy_test.go
@@ -0,0 +1,67 @@
+package metric
+
+import (
+	"fmt"
+	"math/rand"
+	"testing"
+	"time"
+)
+
+func GetRollingPolicy() *RollingPolicy {
+	w := NewWindow(WindowOpts{Size: 10})
+	return NewRollingPolicy(w, RollingPolicyOpts{BucketDuration: 300 * time.Millisecond})
+}
+
+func Handler(t *testing.T, table []map[string][]int) {
+	for _, hm := range table {
+		var totalTs, lastOffset int
+		offsetAndPoints := hm["offsetAndPoints"]
+		timeSleep := hm["timeSleep"]
+		policy := GetRollingPolicy()
+		for i, n := range timeSleep {
+			totalTs += n
+			time.Sleep(time.Duration(n) * time.Millisecond)
+			policy.Add(1)
+			offset, points := offsetAndPoints[2*i], offsetAndPoints[2*i+1]
+
+			if int(policy.window.window[offset].Points[0]) != points {
+				t.Errorf("error, time since last append: %vms, last offset: %v", totalTs, lastOffset)
+			}
+			lastOffset = offset
+		}
+	}
+}
+
+func TestRollingPolicy_Add(t *testing.T) {
+	rand.Seed(time.Now().Unix())
+
+	// test add after 400ms and 601ms relative to the policy created time
+	policy := GetRollingPolicy()
+	time.Sleep(400 * time.Millisecond)
+	policy.Add(1)
+	time.Sleep(201 * time.Millisecond)
+	policy.Add(1)
+	for _, b := range policy.window.window {
+		fmt.Println(b.Points)
+	}
+	if int(policy.window.window[1].Points[0]) != 1 {
+		t.Errorf("error, time since last append: %vms, last offset: %v", 300, 0)
+	}
+	if int(policy.window.window[2].Points[0]) != 1 {
+		t.Errorf("error, time since last append: %vms, last offset: %v", 301, 0)
+	}
+
+	// test func timespan return real span
+	table := []map[string][]int{
+		{
+			"timeSleep":       []int{294, 3200},
+			"offsetAndPoints": []int{0, 1, 0, 1},
+		},
+		{
+			"timeSleep":       []int{305, 3200, 6400},
+			"offsetAndPoints": []int{1, 1, 1, 1, 1, 1},
+		},
+	}
+
+	Handler(t, table)
+}