diff --git a/pkg/stat/metric/rolling_policy.go b/pkg/stat/metric/rolling_policy.go index a474e0fa5..13e5516b2 100644 --- a/pkg/stat/metric/rolling_policy.go +++ b/pkg/stat/metric/rolling_policy.go @@ -38,7 +38,7 @@ func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy { func (r *RollingPolicy) timespan() int { v := int(time.Since(r.lastAppendTime) / r.bucketDuration) - if v < r.size && v > -1 { // maybe time backwards + if v > -1 { // maybe time backwards return v } return r.size @@ -48,9 +48,13 @@ func (r *RollingPolicy) add(f func(offset int, val float64), val float64) { r.mu.Lock() timespan := r.timespan() if timespan > 0 { + r.lastAppendTime = r.lastAppendTime.Add(time.Duration(timespan * int(r.bucketDuration))) offset := r.offset // reset the expired buckets s := offset + 1 + if timespan > r.size { + timespan = r.size + } e, e1 := s+timespan, 0 // e: reset offset must start from offset+1 if e > r.size { e1 = e - r.size @@ -65,7 +69,6 @@ func (r *RollingPolicy) add(f func(offset int, val float64), val float64) { offset = i } r.offset = offset - r.lastAppendTime = time.Now() } f(r.offset, val) r.mu.Unlock() diff --git a/pkg/stat/metric/rolling_policy_test.go b/pkg/stat/metric/rolling_policy_test.go new file mode 100644 index 000000000..2792605f6 --- /dev/null +++ b/pkg/stat/metric/rolling_policy_test.go @@ -0,0 +1,67 @@ +package metric + +import ( + "fmt" + "math/rand" + "testing" + "time" +) + +func GetRollingPolicy() *RollingPolicy { + w := NewWindow(WindowOpts{Size: 10}) + return NewRollingPolicy(w, RollingPolicyOpts{BucketDuration: 300 * time.Millisecond}) +} + +func Handler(t *testing.T, table []map[string][]int) { + for _, hm := range table { + var totalTs, lastOffset int + offsetAndPoints := hm["offsetAndPoints"] + timeSleep := hm["timeSleep"] + policy := GetRollingPolicy() + for i, n := range timeSleep { + totalTs += n + time.Sleep(time.Duration(n) * time.Millisecond) + policy.Add(1) + offset, points := offsetAndPoints[2*i], offsetAndPoints[2*i+1] + + if int(policy.window.window[offset].Points[0]) != points { + t.Errorf("error, time since last append: %vms, last offset: %v", totalTs, lastOffset) + } + lastOffset = offset + } + } +} + +func TestRollingPolicy_Add(t *testing.T) { + rand.Seed(time.Now().Unix()) + + // test add after 400ms and 601ms relative to the policy created time + policy := GetRollingPolicy() + time.Sleep(400 * time.Millisecond) + policy.Add(1) + time.Sleep(201 * time.Millisecond) + policy.Add(1) + for _, b := range policy.window.window { + fmt.Println(b.Points) + } + if int(policy.window.window[1].Points[0]) != 1 { + t.Errorf("error, time since last append: %vms, last offset: %v", 300, 0) + } + if int(policy.window.window[2].Points[0]) != 1 { + t.Errorf("error, time since last append: %vms, last offset: %v", 301, 0) + } + + // test func timespan return real span + table := []map[string][]int{ + { + "timeSleep": []int{294, 3200}, + "offsetAndPoints": []int{0, 1, 0, 1}, + }, + { + "timeSleep": []int{305, 3200, 6400}, + "offsetAndPoints": []int{1, 1, 1, 1, 1, 1}, + }, + } + + Handler(t, table) +}