diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 9b0725a1a..7dbc7bd81 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -6,6 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" "strconv" + "sync" "sync/atomic" "testing" "time" @@ -138,6 +139,49 @@ func test[N int64 | float64](meas Measure[N], comp ComputeAggregation, steps []t } } +func testAggergationConcurrentSafe[N int64 | float64]( + meas Measure[N], + comp ComputeAggregation, + validate func(t *testing.T, agg metricdata.Aggregation), +) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + + got := new(metricdata.Aggregation) + ctx := t.Context() + var wg sync.WaitGroup + for _, args := range []arg[N]{ + {ctx, 2, alice}, + {ctx, 6, alice}, + {ctx, 4, alice}, + {ctx, 10, alice}, + {ctx, 22, alice}, + {ctx, -3, bob}, + {ctx, -6, bob}, + {ctx, 3, bob}, + {ctx, 6, bob}, + } { + wg.Add(1) + go func() { + defer wg.Done() + meas(args.ctx, args.value, args.attr) + }() + } + wg.Add(1) + go func() { + defer wg.Done() + for range 2 { + comp(got) + // We do not check expected output for each step because + // computeAggregation is run concurrently with steps. Instead, + // we validate that the output is a valid possible output. + validate(t, *got) + } + }() + wg.Wait() + } +} + func benchmarkAggregate[N int64 | float64](factory func() (Measure[N], ComputeAggregation)) func(*testing.B) { counts := []int{1, 10, 100} return func(b *testing.B) { diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index 5d743be56..a27bdc73a 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -1040,6 +1040,69 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { }) } +func TestExponentialHistogramAggregationConcurrentSafe(t *testing.T) { + t.Run("Int64/Delta", testDeltaExpoHistConcurrentSafe[int64]()) + t.Run("Float64/Delta", testDeltaExpoHistConcurrentSafe[float64]()) + t.Run("Int64/Cumulative", testCumulativeExpoHistConcurrentSafe[int64]()) + t.Run("Float64/Cumulative", testCumulativeExpoHistConcurrentSafe[float64]()) +} + +func testDeltaExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.ExponentialBucketHistogram(4, 20, false, false) + return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N]) +} + +func testCumulativeExpoHistConcurrentSafe[N int64 | float64]() func(t *testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.ExponentialBucketHistogram(4, 20, false, false) + return testAggergationConcurrentSafe[N](in, out, validateExponentialHistogram[N]) +} + +func validateExponentialHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) { + s, ok := got.(metricdata.ExponentialHistogram[N]) + if !ok { + t.Fatalf("wrong aggregation type: %+v", got) + } + for _, dp := range s.DataPoints { + assert.False(t, + dp.Time.Before(dp.StartTime), + "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, + ) + switch dp.Attributes { + case fltrAlice: + // alice observations are always a multiple of 2 + assert.Equal(t, int64(0), int64(dp.Sum)%2) + case fltrBob: + // bob observations are always a multiple of 3 + assert.Equal(t, int64(0), int64(dp.Sum)%3) + default: + t.Fatalf("wrong attributes %+v", dp.Attributes) + } + avg := float64(dp.Sum) / float64(dp.Count) + if minVal, ok := dp.Min.Value(); ok { + assert.GreaterOrEqual(t, avg, float64(minVal)) + } + if maxVal, ok := dp.Max.Value(); ok { + assert.LessOrEqual(t, avg, float64(maxVal)) + } + var totalCount uint64 + for _, bc := range dp.PositiveBucket.Counts { + totalCount += bc + } + for _, bc := range dp.NegativeBucket.Counts { + totalCount += bc + } + assert.Equal(t, totalCount, dp.Count) + } +} + func FuzzGetBin(f *testing.F) { values := []float64{ 2.0, diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index a8e68730d..6e0f3948d 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -223,6 +223,66 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { }) } +func TestHistogramConcurrentSafe(t *testing.T) { + t.Run("Int64/Delta", testDeltaHistConcurrentSafe[int64]()) + t.Run("Float64/Delta", testDeltaHistConcurrentSafe[float64]()) + t.Run("Int64/Cumulative", testCumulativeHistConcurrentSafe[int64]()) + t.Run("Float64/Cumulative", testCumulativeHistConcurrentSafe[float64]()) +} + +func validateHistogram[N int64 | float64](t *testing.T, got metricdata.Aggregation) { + s, ok := got.(metricdata.Histogram[N]) + if !ok { + t.Fatalf("wrong aggregation type: %+v", got) + } + for _, dp := range s.DataPoints { + assert.False(t, + dp.Time.Before(dp.StartTime), + "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, + ) + switch dp.Attributes { + case fltrAlice: + // alice observations are always a multiple of 2 + assert.Equal(t, int64(0), int64(dp.Sum)%2) + case fltrBob: + // bob observations are always a multiple of 3 + assert.Equal(t, int64(0), int64(dp.Sum)%3) + default: + t.Fatalf("wrong attributes %+v", dp.Attributes) + } + avg := float64(dp.Sum) / float64(dp.Count) + if minVal, ok := dp.Min.Value(); ok { + assert.GreaterOrEqual(t, avg, float64(minVal)) + } + if maxVal, ok := dp.Max.Value(); ok { + assert.LessOrEqual(t, avg, float64(maxVal)) + } + var totalCount uint64 + for _, bc := range dp.BucketCounts { + totalCount += bc + } + assert.Equal(t, totalCount, dp.Count) + } +} + +func testDeltaHistConcurrentSafe[N int64 | float64]() func(t *testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.ExplicitBucketHistogram(bounds, noMinMax, false) + return testAggergationConcurrentSafe[N](in, out, validateHistogram[N]) +} + +func testCumulativeHistConcurrentSafe[N int64 | float64]() func(t *testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.ExplicitBucketHistogram(bounds, noMinMax, false) + return testAggergationConcurrentSafe[N](in, out, validateHistogram[N]) +} + // hPointSummed returns an HistogramDataPoint that started and ended now with // multi number of measurements values v. It includes a min and max (set to v). func hPointSummed[N int64 | float64]( diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index 77e0d283b..f7c2199f5 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -7,6 +7,8 @@ import ( "context" "testing" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -468,6 +470,76 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { }) } +func TestLastValueConcurrentSafe(t *testing.T) { + t.Run("Int64/DeltaLastValue", testDeltaLastValueConcurrentSafe[int64]()) + t.Run("Float64/DeltaLastValue", testDeltaLastValueConcurrentSafe[float64]()) + t.Run("Int64/CumulativeLastValue", testCumulativeLastValueConcurrentSafe[int64]()) + t.Run("Float64/CumulativeLastValue", testCumulativeLastValueConcurrentSafe[float64]()) + t.Run("Int64/DeltaPrecomputedLastValue", testDeltaPrecomputedLastValueConcurrentSafe[int64]()) + t.Run("Float64/DeltaPrecomputedLastValue", testDeltaPrecomputedLastValueConcurrentSafe[float64]()) + t.Run("Int64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValueConcurrentSafe[int64]()) + t.Run("Float64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValueConcurrentSafe[float64]()) +} + +func validateGauge[N int64 | float64](t *testing.T, got metricdata.Aggregation) { + s, ok := got.(metricdata.Gauge[N]) + if !ok { + t.Fatalf("wrong aggregation type: %+v", got) + } + for _, dp := range s.DataPoints { + assert.False(t, + dp.Time.Before(dp.StartTime), + "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, + ) + switch dp.Attributes { + case fltrAlice: + // alice observations are always a multiple of 2 + assert.Equal(t, int64(0), int64(dp.Value)%2) + case fltrBob: + // bob observations are always a multiple of 3 + assert.Equal(t, int64(0), int64(dp.Value)%3) + default: + t.Fatalf("wrong attributes %+v", dp.Attributes) + } + } +} + +func testCumulativeLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.LastValue() + return testAggergationConcurrentSafe[N](in, out, validateGauge[N]) +} + +func testDeltaLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.LastValue() + return testAggergationConcurrentSafe[N](in, out, validateGauge[N]) +} + +func testDeltaPrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.PrecomputedLastValue() + return testAggergationConcurrentSafe[N](in, out, validateGauge[N]) +} + +func testCumulativePrecomputedLastValueConcurrentSafe[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.PrecomputedLastValue() + return testAggergationConcurrentSafe[N](in, out, validateGauge[N]) +} + func BenchmarkLastValue(b *testing.B) { b.Run("Int64", benchmarkAggregate(Builder[int64]{}.PrecomputedLastValue)) b.Run("Float64", benchmarkAggregate(Builder[float64]{}.PrecomputedLastValue)) diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index bb825e183..1b6c896f8 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -7,6 +7,8 @@ import ( "context" "testing" + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -538,6 +540,80 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { }) } +func TestSumConcurrentSafe(t *testing.T) { + t.Run("Int64/DeltaSum", testDeltaSumConcurrentSafe[int64]()) + t.Run("Float64/DeltaSum", testDeltaSumConcurrentSafe[float64]()) + t.Run("Int64/CumulativeSum", testCumulativeSumConcurrentSafe[int64]()) + t.Run("Float64/CumulativeSum", testCumulativeSumConcurrentSafe[float64]()) + t.Run("Int64/DeltaPrecomputedSum", testDeltaPrecomputedSumConcurrentSafe[int64]()) + t.Run("Float64/DeltaPrecomputedSum", testDeltaPrecomputedSumConcurrentSafe[float64]()) + t.Run("Int64/CumulativePrecomputedSum", testCumulativePrecomputedSumConcurrentSafe[int64]()) + t.Run("Float64/CumulativePrecomputedSum", testCumulativePrecomputedSumConcurrentSafe[float64]()) +} + +func validateSum[N int64 | float64](t *testing.T, got metricdata.Aggregation) { + s, ok := got.(metricdata.Sum[N]) + if !ok { + t.Fatalf("wrong aggregation type: %+v", got) + } + for _, dp := range s.DataPoints { + assert.False(t, + dp.Time.Before(dp.StartTime), + "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, + ) + switch dp.Attributes { + case fltrAlice: + // alice observations are always a multiple of 2 + assert.Equal(t, int64(0), int64(dp.Value)%2) + case fltrBob: + // bob observations are always a multiple of 3 + assert.Equal(t, int64(0), int64(dp.Value)%3) + default: + t.Fatalf("wrong attributes %+v", dp.Attributes) + } + } +} + +func testDeltaSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { + mono := false + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.Sum(mono) + return testAggergationConcurrentSafe[N](in, out, validateSum[N]) +} + +func testCumulativeSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { + mono := false + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.Sum(mono) + return testAggergationConcurrentSafe[N](in, out, validateSum[N]) +} + +func testDeltaPrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { + mono := false + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.PrecomputedSum(mono) + return testAggergationConcurrentSafe[N](in, out, validateSum[N]) +} + +func testCumulativePrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { + mono := false + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.PrecomputedSum(mono) + return testAggergationConcurrentSafe[N](in, out, validateSum[N]) +} + func BenchmarkSum(b *testing.B) { // The monotonic argument is only used to annotate the Sum returned from // the Aggregation method. It should not have an effect on operational