1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

Improve aggregation concurrent safe tests (#8021)

I'm taking a stab at improving the ConcurrentSafe tests for aggregations
before taking on the lockless exponential histogram implementation
again.

Part of https://github.com/open-telemetry/opentelemetry-go/issues/7796

This PR includes a few improvements:

* All concurrent-safe tests now use 10 different attribute sets to make
sure we are testing concurrent increments that result in an overflow
(the cardinality limit of the test is 3).
* All concurrent-safe tests for floats now include decimal
valued-inputs.
* Improved the validation of the collected metrics:
    * Validate the total after multiple collects.
* Validate that increments are made to the correct bucket for histograms
* Validate that the overflow attribute set has the correct total value.

This uncovered an apparent race condition where the lastvalue
aggregation can collect a value of zero even when no zero-value is
recorded. I added a TODO, and will fix this in a follow-up.

I used AI to help me design and implement tests, but requested each of
the changes, and reviewed the output.

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
David Ashpole
2026-03-13 08:36:01 -04:00
committed by GitHub
parent 2ffde5a428
commit 65f85fc93a
5 changed files with 275 additions and 154 deletions
+72 -22
View File
@@ -139,42 +139,92 @@ func test[N int64 | float64](meas Measure[N], comp ComputeAggregation, steps []t
}
}
func testAggergationConcurrentSafe[N int64 | float64](
func getConcurrentVals[N int64 | float64]() []N {
// Keep length of v in sync with concurrentNumRecords
// and expectedConcurrentSum.
switch any(*new(N)).(type) {
case float64:
v := []float64{2.5, 6.1, 4.4, 10.0, 22.0, -3.5, -6.5, 3.0, -6.0}
return any(v).([]N)
default:
v := []int64{2, 6, 4, 10, 22, -3, -6, 3, -6}
return any(v).([]N)
}
}
const (
concurrentValsSum = 32
concurrentNumGoroutines = 10
concurrentNumRecords = 90 // Multiple of 9 (length of values sequences)
expectedConcurrentCount = uint64(concurrentNumGoroutines * concurrentNumRecords)
)
func expectedConcurrentSum[N int64 | float64]() N {
return N(int64(concurrentNumGoroutines) * int64(concurrentNumRecords/9) * concurrentValsSum)
}
// testAggregationConcurrentSafe provides a unified stress test for all generic aggregators
// by generating high contention, cardinality limit overflow, and validating exact results.
func testAggregationConcurrentSafe[N int64 | float64](
meas Measure[N],
comp ComputeAggregation,
validate func(t *testing.T, agg metricdata.Aggregation),
validate func(t *testing.T, aggs []metricdata.Aggregation),
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
got := new(metricdata.Aggregation)
ctx := t.Context()
var wg sync.WaitGroup
for _, args := range []arg[N]{
{ctx, 2, alice},
{ctx, 6, alice},
{ctx, 4, alice},
{ctx, 10, alice},
{ctx, 22, alice},
{ctx, -3, bob},
{ctx, -6, bob},
{ctx, 3, bob},
{ctx, 6, bob},
} {
wg.Go(func() {
meas(args.ctx, args.value, args.attr)
})
// Use 10 different attribute sets to force overflow on the AggregationLimit
// which is typically set to 3.
attrs := make([]attribute.Set, concurrentNumGoroutines)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.String(keyUser, strconv.Itoa(i)))
}
vals := getConcurrentVals[N]()
wg.Add(concurrentNumGoroutines)
for i := range concurrentNumGoroutines {
go func(id int) {
defer wg.Done()
// Each goroutine records to a distinct attribute set
attr := attrs[id]
for j := range concurrentNumRecords {
meas(ctx, vals[j%len(vals)], attr)
}
}(i)
}
var results []metricdata.Aggregation
// Run computation concurrently with measurements to stress hot/cold swaps
wg.Go(func() {
for range 2 {
for range concurrentNumRecords {
got := new(metricdata.Aggregation)
comp(got)
// We do not check expected output for each step because
// computeAggregation is run concurrently with steps. Instead,
// we validate that the output is a valid possible output.
validate(t, *got)
results = append(results, *got)
}
})
wg.Wait()
// Final flush to get final values
got := new(metricdata.Aggregation)
comp(got)
results = append(results, *got)
validate(t, results)
}
}
func assertSumEqual[N int64 | float64](t *testing.T, expected, actual N) {
if _, ok := any(*new(N)).(float64); ok {
assert.InDelta(t, float64(expected), float64(actual), 0.0001)
} else {
assert.Equal(t, expected, actual)
}
}
@@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -1052,7 +1053,7 @@ func testDeltaExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
Filter: attrFltr,
AggregationLimit: 3,
}.ExponentialBucketHistogram(4, 20, false, false)
return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
return testAggregationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
}
func testCumulativeExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
@@ -1061,45 +1062,69 @@ func testCumulativeExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T
Filter: attrFltr,
AggregationLimit: 3,
}.ExponentialBucketHistogram(4, 20, false, false)
return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
return testAggregationConcurrentSafe[N](in, out, validateExponentialHistogram[N])
}
func validateExponentialHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
s, ok := got.(metricdata.ExponentialHistogram[N])
if !ok {
t.Fatalf("wrong aggregation type: %+v", got)
func validateExponentialHistogram[N int64 | float64](t *testing.T, aggs []metricdata.Aggregation) {
sums := make(map[attribute.Set]N)
counts := make(map[attribute.Set]uint64)
var isDelta bool
for i, agg := range aggs {
s, ok := agg.(metricdata.ExponentialHistogram[N])
require.True(t, ok)
if s.Temporality == metricdata.DeltaTemporality {
isDelta = true
}
require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded in a single cycle")
for _, dp := range s.DataPoints {
assert.False(t,
dp.Time.Before(dp.StartTime),
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
)
if s.Temporality == metricdata.DeltaTemporality {
sums[dp.Attributes] += dp.Sum
counts[dp.Attributes] += dp.Count
} else if i == len(aggs)-1 {
sums[dp.Attributes] = dp.Sum
counts[dp.Attributes] = dp.Count
}
var totalCount uint64
for _, bc := range dp.PositiveBucket.Counts {
totalCount += bc
}
for _, bc := range dp.NegativeBucket.Counts {
totalCount += bc
}
assert.Equal(t, totalCount, dp.Count)
}
}
for _, dp := range s.DataPoints {
assert.False(t,
dp.Time.Before(dp.StartTime),
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
)
switch dp.Attributes {
case fltrAlice:
// alice observations are always a multiple of 2
assert.Equal(t, int64(0), int64(dp.Sum)%2)
case fltrBob:
// bob observations are always a multiple of 3
assert.Equal(t, int64(0), int64(dp.Sum)%3)
default:
t.Fatalf("wrong attributes %+v", dp.Attributes)
var totalSum N
var totalCount uint64
for attr, sum := range sums {
totalSum += sum
count := counts[attr]
totalCount += count
expectedSingleSum := expectedConcurrentSum[N]() / N(concurrentNumGoroutines)
expectedSingleCount := expectedConcurrentCount / uint64(concurrentNumGoroutines)
if !isDelta {
if attr == overflowSet {
// The overflow set contains all the goroutines that didn't make the limit of 3
assert.Equal(t, uint64(0), count%expectedSingleCount)
assert.Equal(t, count/expectedSingleCount*uint64(expectedSingleSum), uint64(sum))
} else {
// Individual attributes should have exactly one goroutine's worth of data
assert.Equal(t, expectedSingleSum, sum)
assert.Equal(t, expectedSingleCount, count)
}
}
avg := float64(dp.Sum) / float64(dp.Count)
if minVal, ok := dp.Min.Value(); ok {
assert.GreaterOrEqual(t, avg, float64(minVal))
}
if maxVal, ok := dp.Max.Value(); ok {
assert.LessOrEqual(t, avg, float64(maxVal))
}
var totalCount uint64
for _, bc := range dp.PositiveBucket.Counts {
totalCount += bc
}
for _, bc := range dp.NegativeBucket.Counts {
totalCount += bc
}
assert.Equal(t, totalCount, dp.Count)
}
assertSumEqual[N](t, expectedConcurrentSum[N](), totalSum)
assert.Equal(t, expectedConcurrentCount, totalCount)
}
func FuzzGetBin(f *testing.F) {
+81 -43
View File
@@ -230,57 +230,95 @@ func TestHistogramConcurrentSafe(t *testing.T) {
t.Run("Float64/Cumulative", testCumulativeHistConcurrentSafe[float64]())
}
func validateHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
s, ok := got.(metricdata.Histogram[N])
if !ok {
t.Fatalf("wrong aggregation type: %+v", got)
func validateHistogram[N int64 | float64](t *testing.T, aggs []metricdata.Aggregation) {
sums := make(map[attribute.Set]N)
counts := make(map[attribute.Set]uint64)
bucketCounts := make(map[attribute.Set][]uint64)
for i, agg := range aggs {
s, ok := agg.(metricdata.Histogram[N])
require.True(t, ok)
require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded in a single cycle")
for _, dp := range s.DataPoints {
if s.Temporality == metricdata.DeltaTemporality {
sums[dp.Attributes] += dp.Sum
counts[dp.Attributes] += dp.Count
if bucketCounts[dp.Attributes] == nil {
bucketCounts[dp.Attributes] = make([]uint64, len(dp.BucketCounts))
}
for idx, c := range dp.BucketCounts {
bucketCounts[dp.Attributes][idx] += c
}
} else if i == len(aggs)-1 {
sums[dp.Attributes] = dp.Sum
counts[dp.Attributes] = dp.Count
bucketCounts[dp.Attributes] = make([]uint64, len(dp.BucketCounts))
copy(bucketCounts[dp.Attributes], dp.BucketCounts)
}
}
}
for _, dp := range s.DataPoints {
assert.False(t,
dp.Time.Before(dp.StartTime),
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
)
switch dp.Attributes {
case fltrAlice:
// alice observations are always a multiple of 2
assert.Equal(t, int64(0), int64(dp.Sum)%2)
case fltrBob:
// bob observations are always a multiple of 3
assert.Equal(t, int64(0), int64(dp.Sum)%3)
default:
t.Fatalf("wrong attributes %+v", dp.Attributes)
}
avg := float64(dp.Sum) / float64(dp.Count)
if minVal, ok := dp.Min.Value(); ok {
assert.GreaterOrEqual(t, avg, float64(minVal))
}
if maxVal, ok := dp.Max.Value(); ok {
assert.LessOrEqual(t, avg, float64(maxVal))
}
var totalCount uint64
for _, bc := range dp.BucketCounts {
totalCount += bc
}
assert.Equal(t, totalCount, dp.Count)
var totalSum N
var totalCount uint64
totalBuckets := make([]uint64, 4)
for _, val := range sums {
totalSum += val
}
for _, val := range counts {
totalCount += val
}
for _, bc := range bucketCounts {
for idx, c := range bc {
if idx < len(totalBuckets) {
totalBuckets[idx] += c
}
}
}
assertSumEqual[N](t, expectedConcurrentSum[N](), totalSum)
assert.Equal(t, expectedConcurrentCount, totalCount)
var expectedBuckets []uint64
switch any(*new(N)).(type) {
case float64:
// Float sequence: 2.5, 6.1, 4.4, 10.0, 22.0, -3.5, -6.5, 3.0, -6.0
// Bounds {0, 2, 4}:
// (-inf, 0]: -3.5, -6.5, -6.0 (3x)
// (0, 2]: none (0x)
// (2, 4]: 2.5, 3.0 (2x)
// (4, +inf): 6.1, 4.4, 10.0, 22.0 (4x)
// 10 full loops per goroutine * 10 goroutines = 100x
expectedBuckets = []uint64{300, 0, 200, 400}
default:
// Int sequence: 2, 6, 4, 10, 22, -3, -6, 3, -6
// Bounds {0, 2, 4}:
// (-inf, 0]: -3, -6, -6 (3x)
// (0, 2]: 2 (1x)
// (2, 4]: 4, 3 (2x)
// (4, +inf): 6, 10, 22 (3x)
// 10 full loops per goroutine * 10 goroutines = 100x
expectedBuckets = []uint64{300, 100, 200, 300}
}
assert.Equal(t, expectedBuckets, totalBuckets)
}
func testDeltaHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 3,
}.ExplicitBucketHistogram(bounds, noMinMax, false)
return testAggergationConcurrentSafe[N](in, out, validateHistogram[N])
}
func testCumulativeHistConcurrentSafe[N int64 | float64]() func(t *testing.T) {
func testCumulativeHistConcurrentSafe[N int64 | float64]() func(*testing.T) {
in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
AggregationLimit: 3,
}.ExplicitBucketHistogram(bounds, noMinMax, false)
return testAggergationConcurrentSafe[N](in, out, validateHistogram[N])
}.ExplicitBucketHistogram([]float64{0, 2, 4}, false, false)
return testAggregationConcurrentSafe[N](in, out, validateHistogram[N])
}
func testDeltaHistConcurrentSafe[N int64 | float64]() func(*testing.T) {
in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 3,
}.ExplicitBucketHistogram([]float64{0, 2, 4}, false, false)
return testAggregationConcurrentSafe[N](in, out, validateHistogram[N])
}
// hPointSummed returns an HistogramDataPoint that started and ended now with
+22 -22
View File
@@ -8,6 +8,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -481,25 +482,24 @@ func TestLastValueConcurrentSafe(t *testing.T) {
t.Run("Float64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValueConcurrentSafe[float64]())
}
func validateGauge[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
s, ok := got.(metricdata.Gauge[N])
if !ok {
t.Fatalf("wrong aggregation type: %+v", got)
func validateGauge[N int64 | float64](t *testing.T, aggs []metricdata.Aggregation) {
// A gauge takes the *last* recorded value.
// During high concurrency, reading the Gauge can snap any value in the
// iteration cycle of the corresponding Goroutines.
valid := make(map[N]bool)
for _, v := range getConcurrentVals[N]() {
valid[v] = true
}
for _, dp := range s.DataPoints {
assert.False(t,
dp.Time.Before(dp.StartTime),
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
)
switch dp.Attributes {
case fltrAlice:
// alice observations are always a multiple of 2
assert.Equal(t, int64(0), int64(dp.Value)%2)
case fltrBob:
// bob observations are always a multiple of 3
assert.Equal(t, int64(0), int64(dp.Value)%3)
default:
t.Fatalf("wrong attributes %+v", dp.Attributes)
// TODO(dashpole): Fix a concurrency bug where a gauge can be collected with
// the value zero even when no zero-value measurements have been recorded.
valid[0] = true
for _, agg := range aggs {
s, ok := agg.(metricdata.Gauge[N])
require.True(t, ok)
require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded")
for _, dp := range s.DataPoints {
assert.True(t, valid[dp.Value], "Unexpected gauge value: %v", dp.Value)
}
}
}
@@ -510,7 +510,7 @@ func testCumulativeLastValueConcurrentSafe[N int64 | float64]() func(*testing.T)
Filter: attrFltr,
AggregationLimit: 3,
}.LastValue()
return testAggergationConcurrentSafe[N](in, out, validateGauge[N])
return testAggregationConcurrentSafe[N](in, out, validateGauge[N])
}
func testDeltaLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) {
@@ -519,7 +519,7 @@ func testDeltaLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) {
Filter: attrFltr,
AggregationLimit: 3,
}.LastValue()
return testAggergationConcurrentSafe[N](in, out, validateGauge[N])
return testAggregationConcurrentSafe[N](in, out, validateGauge[N])
}
func testDeltaPrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) {
@@ -528,7 +528,7 @@ func testDeltaPrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*test
Filter: attrFltr,
AggregationLimit: 3,
}.PrecomputedLastValue()
return testAggergationConcurrentSafe[N](in, out, validateGauge[N])
return testAggregationConcurrentSafe[N](in, out, validateGauge[N])
}
func testCumulativePrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) {
@@ -537,7 +537,7 @@ func testCumulativePrecomputedLastValueConcurrentSafe[N int64 | float64]() func(
Filter: attrFltr,
AggregationLimit: 3,
}.PrecomputedLastValue()
return testAggergationConcurrentSafe[N](in, out, validateGauge[N])
return testAggregationConcurrentSafe[N](in, out, validateGauge[N])
}
func BenchmarkLastValue(b *testing.B) {
+40 -32
View File
@@ -7,8 +7,9 @@ import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -551,67 +552,74 @@ func TestSumConcurrentSafe(t *testing.T) {
t.Run("Float64/CumulativePrecomputedSum", testCumulativePrecomputedSumConcurrentSafe[float64]())
}
func validateSum[N int64 | float64](t *testing.T, got metricdata.Aggregation) {
s, ok := got.(metricdata.Sum[N])
if !ok {
t.Fatalf("wrong aggregation type: %+v", got)
}
for _, dp := range s.DataPoints {
assert.False(t,
dp.Time.Before(dp.StartTime),
"Timestamp %v must not be before start time %v", dp.Time, dp.StartTime,
)
switch dp.Attributes {
case fltrAlice:
// alice observations are always a multiple of 2
assert.Equal(t, int64(0), int64(dp.Value)%2)
case fltrBob:
// bob observations are always a multiple of 3
assert.Equal(t, int64(0), int64(dp.Value)%3)
default:
t.Fatalf("wrong attributes %+v", dp.Attributes)
//nolint:revive // isPrecomputed is used for configuring validation
func validateSum[N int64 | float64](isPrecomputed bool) func(t *testing.T, aggs []metricdata.Aggregation) {
return func(t *testing.T, aggs []metricdata.Aggregation) {
sums := make(map[attribute.Set]N)
for i, agg := range aggs {
s, ok := agg.(metricdata.Sum[N])
require.True(t, ok)
require.LessOrEqual(t, len(s.DataPoints), 3, "AggregationLimit of 3 exceeded in a single cycle")
for _, dp := range s.DataPoints {
if s.Temporality == metricdata.DeltaTemporality {
sums[dp.Attributes] += dp.Value
} else if i == len(aggs)-1 {
sums[dp.Attributes] = dp.Value
}
}
}
if isPrecomputed {
// Precomputed Sums clear the state when collected concurrently. Due to hot/cold overlap
// during flush, the sum drops intermediate updates, so the final calculation won't cleanly
// add up to the total number of operations performed by the workers. Therefore, skip exact
// invariant check, verifying only that limits and map updates occurred safely.
return
}
var total N
for _, val := range sums {
total += val
}
assertSumEqual[N](t, expectedConcurrentSum[N](), total)
}
}
func testDeltaSumConcurrentSafe[N int64 | float64]() func(t *testing.T) {
mono := false
in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 3,
}.Sum(mono)
return testAggergationConcurrentSafe[N](in, out, validateSum[N])
}.Sum(false)
return testAggregationConcurrentSafe[N](in, out, validateSum[N](false))
}
func testCumulativeSumConcurrentSafe[N int64 | float64]() func(t *testing.T) {
mono := false
in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
AggregationLimit: 3,
}.Sum(mono)
return testAggergationConcurrentSafe[N](in, out, validateSum[N])
}.Sum(false)
return testAggregationConcurrentSafe[N](in, out, validateSum[N](false))
}
func testDeltaPrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) {
mono := false
in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 3,
}.PrecomputedSum(mono)
return testAggergationConcurrentSafe[N](in, out, validateSum[N])
}.PrecomputedSum(false)
return testAggregationConcurrentSafe[N](in, out, validateSum[N](true))
}
func testCumulativePrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) {
mono := false
in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
AggregationLimit: 3,
}.PrecomputedSum(mono)
return testAggergationConcurrentSafe[N](in, out, validateSum[N])
}.PrecomputedSum(false)
return testAggregationConcurrentSafe[N](in, out, validateSum[N](true))
}
func BenchmarkSum(b *testing.B) {