You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
Add tests for exponential histogram concurrent-safety edge-cases (#8024)
Part of https://github.com/open-telemetry/opentelemetry-go/issues/7796 Improve test coverage of the concurrent-safety of exponential histogram aggregations. This adds two tests: * ZeroValues, which checks that the handling of zero values is correct in concurrent scenarios * RescalingStress, which generates inputs that intentionally cause lots of concurrent rescales. The RescalingStress test inputs the same values into a serial version and parallel version of the exponential histogram aggregation and verifies that the output buckets match afterwards. I used AI to identify where we were missing coverage, and to design and help implement the tests. I've reviewed and modified the tests as needed.
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -1187,3 +1188,121 @@ func lowerBound(index, scale int32) float64 {
|
||||
// 2 ^ (index * 2 ^ (-scale))
|
||||
return math.Exp2(math.Ldexp(float64(index), -int(scale)))
|
||||
}
|
||||
|
||||
func TestExponentialHistogramConcurrentSafeEdgeCases(t *testing.T) {
|
||||
t.Run("Int64/Delta", testExpoHistConcurrentSafeEdgeCases[int64](metricdata.DeltaTemporality))
|
||||
t.Run("Float64/Delta", testExpoHistConcurrentSafeEdgeCases[float64](metricdata.DeltaTemporality))
|
||||
t.Run("Int64/Cumulative", testExpoHistConcurrentSafeEdgeCases[int64](metricdata.CumulativeTemporality))
|
||||
t.Run("Float64/Cumulative", testExpoHistConcurrentSafeEdgeCases[float64](metricdata.CumulativeTemporality))
|
||||
}
|
||||
|
||||
func testExpoHistConcurrentSafeEdgeCases[N int64 | float64](temporality metricdata.Temporality) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
t.Run("ZeroValues", func(t *testing.T) {
|
||||
meas, comp := Builder[N]{
|
||||
Temporality: temporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.ExponentialBucketHistogram(160, 20, false, false)
|
||||
|
||||
ctx := t.Context()
|
||||
var wg sync.WaitGroup
|
||||
const numGoroutines = 10
|
||||
const numRecords = 100
|
||||
wg.Add(numGoroutines)
|
||||
for range numGoroutines {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for range numRecords {
|
||||
meas(ctx, 0, alice)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
dest := new(metricdata.Aggregation)
|
||||
comp(dest)
|
||||
h := (*dest).(metricdata.ExponentialHistogram[N])
|
||||
require.Len(t, h.DataPoints, 1)
|
||||
assert.Equal(t, uint64(numGoroutines*numRecords), h.DataPoints[0].ZeroCount)
|
||||
assert.Equal(t, uint64(numGoroutines*numRecords), h.DataPoints[0].Count)
|
||||
})
|
||||
|
||||
t.Run("RescalingStress", func(t *testing.T) {
|
||||
meas, comp := Builder[N]{
|
||||
Temporality: temporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.ExponentialBucketHistogram(160, 20, false, false)
|
||||
|
||||
ctx := t.Context()
|
||||
var wg sync.WaitGroup
|
||||
const numGoroutines = 10
|
||||
const numRecords = 100
|
||||
|
||||
// To verify exact outcome, we sequentially record the same to a reference.
|
||||
refMeas, refComp := Builder[N]{
|
||||
Temporality: temporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.ExponentialBucketHistogram(160, 20, false, false)
|
||||
var m sync.Mutex
|
||||
|
||||
wg.Add(numGoroutines)
|
||||
for i := range numGoroutines {
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
for j := range numRecords {
|
||||
// generate a mix of very large and very small powers of 2
|
||||
valFloat := math.Exp2((float64(j) / float64(numRecords)) * 60.0)
|
||||
if id%2 == 0 {
|
||||
valFloat = -valFloat
|
||||
}
|
||||
val := N(valFloat)
|
||||
// For integers, values less than 1 will truncate to 0. Mix things up.
|
||||
if id%3 == 0 {
|
||||
val = N(float64(id+1) * 100.0)
|
||||
}
|
||||
|
||||
meas(ctx, val, alice)
|
||||
|
||||
m.Lock()
|
||||
refMeas(ctx, val, alice)
|
||||
m.Unlock()
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
dest := new(metricdata.Aggregation)
|
||||
comp(dest)
|
||||
h := (*dest).(metricdata.ExponentialHistogram[N])
|
||||
require.Len(t, h.DataPoints, 1)
|
||||
|
||||
refDest := new(metricdata.Aggregation)
|
||||
refComp(refDest)
|
||||
refH := (*refDest).(metricdata.ExponentialHistogram[N])
|
||||
require.Len(t, refH.DataPoints, 1)
|
||||
|
||||
// StartTime/Time will differ slightly.
|
||||
h.DataPoints[0].StartTime = refH.DataPoints[0].StartTime
|
||||
h.DataPoints[0].Time = refH.DataPoints[0].Time
|
||||
|
||||
// Float sums might be slightly slightly off due to summing wildly different magnitudes
|
||||
// in different orders concurrently versus sequentially.
|
||||
actualSum := float64(h.DataPoints[0].Sum)
|
||||
expectedSum := float64(refH.DataPoints[0].Sum)
|
||||
if actualSum != expectedSum {
|
||||
assert.InEpsilon(t, expectedSum, actualSum, 0.5, "Sum")
|
||||
// Force equality for the deep struct comparison below
|
||||
h.DataPoints[0].Sum = refH.DataPoints[0].Sum
|
||||
}
|
||||
|
||||
// Normalize Exemplars to avoid nil vs empty slice comparison failures
|
||||
h.DataPoints[0].Exemplars = nil
|
||||
refH.DataPoints[0].Exemplars = nil
|
||||
|
||||
assert.Equal(t, refH, h)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user