// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( "context" "testing" "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) func TestSum(t *testing.T) { c := new(clock) t.Cleanup(c.Register()) t.Run("Int64/DeltaSum", testDeltaSum[int64]()) c.Reset() t.Run("Float64/DeltaSum", testDeltaSum[float64]()) c.Reset() t.Run("Int64/CumulativeSum", testCumulativeSum[int64]()) c.Reset() t.Run("Float64/CumulativeSum", testCumulativeSum[float64]()) c.Reset() t.Run("Int64/DeltaPrecomputedSum", testDeltaPrecomputedSum[int64]()) c.Reset() t.Run("Float64/DeltaPrecomputedSum", testDeltaPrecomputedSum[float64]()) c.Reset() t.Run("Int64/CumulativePrecomputedSum", testCumulativePrecomputedSum[int64]()) c.Reset() t.Run("Float64/CumulativePrecomputedSum", testCumulativePrecomputedSum[float64]()) } func testDeltaSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ { input: []arg[N]{}, expect: output{ n: 0, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{}, }, }, }, { input: []arg[N]{ {ctx, 1, alice}, {ctx, -1, bob}, {ctx, 1, alice}, {ctx, 2, alice}, {ctx, -10, bob}, }, expect: output{ n: 2, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(1), Time: y2kPlus(2), Value: 4, }, { Attributes: fltrBob, StartTime: y2kPlus(1), Time: y2kPlus(2), Value: -11, }, }, }, }, }, { input: []arg[N]{ {ctx, 10, alice}, {ctx, 3, bob}, }, expect: output{ n: 2, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(2), Time: y2kPlus(3), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(2), Time: y2kPlus(3), Value: 3, }, }, }, }, }, { input: []arg[N]{}, // Delta sums are expected to reset. expect: output{ n: 0, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{}, }, }, }, { input: []arg[N]{ {ctx, 1, alice}, {ctx, 1, bob}, // These will exceed cardinality limit. {ctx, 1, carol}, {ctx, 1, dave}, }, expect: output{ n: 3, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(4), Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(4), Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(4), Time: y2kPlus(5), Value: 2, }, }, }, }, }, }) } func testCumulativeSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ { input: []arg[N]{}, expect: output{ n: 0, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[N]{}, }, }, }, { input: []arg[N]{ {ctx, 1, alice}, {ctx, -1, bob}, {ctx, 1, alice}, {ctx, 2, alice}, {ctx, -10, bob}, }, expect: output{ n: 2, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(0), Time: y2kPlus(2), Value: 4, }, { Attributes: fltrBob, StartTime: y2kPlus(0), Time: y2kPlus(2), Value: -11, }, }, }, }, }, { input: []arg[N]{ {ctx, 10, alice}, {ctx, 3, bob}, }, expect: output{ n: 2, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(0), Time: y2kPlus(3), Value: 14, }, { Attributes: fltrBob, StartTime: y2kPlus(0), Time: y2kPlus(3), Value: -8, }, }, }, }, }, { input: []arg[N]{ // These will exceed cardinality limit. {ctx, 1, carol}, {ctx, 1, dave}, }, expect: output{ n: 3, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(0), Time: y2kPlus(4), Value: 14, }, { Attributes: fltrBob, StartTime: y2kPlus(0), Time: y2kPlus(4), Value: -8, }, { Attributes: overflowSet, StartTime: y2kPlus(0), Time: y2kPlus(4), Value: 2, }, }, }, }, }, }) } func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ { input: []arg[N]{}, expect: output{ n: 0, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{}, }, }, }, { input: []arg[N]{ {ctx, 1, alice}, {ctx, -1, bob}, {ctx, 1, fltrAlice}, {ctx, 2, alice}, {ctx, -10, bob}, }, expect: output{ n: 2, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(1), Time: y2kPlus(2), Value: 4, }, { Attributes: fltrBob, StartTime: y2kPlus(1), Time: y2kPlus(2), Value: -11, }, }, }, }, }, { input: []arg[N]{ {ctx, 1, fltrAlice}, {ctx, 10, alice}, {ctx, 3, bob}, }, expect: output{ n: 2, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(2), Time: y2kPlus(3), Value: 7, }, { Attributes: fltrBob, StartTime: y2kPlus(2), Time: y2kPlus(3), Value: 14, }, }, }, }, }, { input: []arg[N]{}, // Precomputed sums are expected to reset. expect: output{ n: 0, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{}, }, }, }, { input: []arg[N]{ {ctx, 1, alice}, {ctx, 1, bob}, // These will exceed cardinality limit. {ctx, 1, carol}, {ctx, 1, dave}, }, expect: output{ n: 3, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(4), Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(4), Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(4), Time: y2kPlus(5), Value: 2, }, }, }, }, }, }) } func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ { input: []arg[N]{}, expect: output{ n: 0, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[N]{}, }, }, }, { input: []arg[N]{ {ctx, 1, alice}, {ctx, -1, bob}, {ctx, 1, fltrAlice}, {ctx, 2, alice}, {ctx, -10, bob}, }, expect: output{ n: 2, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(0), Time: y2kPlus(2), Value: 4, }, { Attributes: fltrBob, StartTime: y2kPlus(0), Time: y2kPlus(2), Value: -11, }, }, }, }, }, { input: []arg[N]{ {ctx, 1, fltrAlice}, {ctx, 10, alice}, {ctx, 3, bob}, }, expect: output{ n: 2, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(0), Time: y2kPlus(3), Value: 11, }, { Attributes: fltrBob, StartTime: y2kPlus(0), Time: y2kPlus(3), Value: 3, }, }, }, }, }, { input: []arg[N]{}, // Precomputed sums are expected to reset. expect: output{ n: 0, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[N]{}, }, }, }, { input: []arg[N]{ {ctx, 1, alice}, {ctx, 1, bob}, // These will exceed cardinality limit. {ctx, 1, carol}, {ctx, 1, dave}, }, expect: output{ n: 3, agg: metricdata.Sum[N]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, StartTime: y2kPlus(0), Time: y2kPlus(5), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(0), Time: y2kPlus(5), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(0), Time: y2kPlus(5), Value: 2, }, }, }, }, }, }) } func TestSumConcurrentSafe(t *testing.T) { t.Run("Int64/DeltaSum", testDeltaSumConcurrentSafe[int64]()) t.Run("Float64/DeltaSum", testDeltaSumConcurrentSafe[float64]()) t.Run("Int64/CumulativeSum", testCumulativeSumConcurrentSafe[int64]()) t.Run("Float64/CumulativeSum", testCumulativeSumConcurrentSafe[float64]()) t.Run("Int64/DeltaPrecomputedSum", testDeltaPrecomputedSumConcurrentSafe[int64]()) t.Run("Float64/DeltaPrecomputedSum", testDeltaPrecomputedSumConcurrentSafe[float64]()) t.Run("Int64/CumulativePrecomputedSum", testCumulativePrecomputedSumConcurrentSafe[int64]()) t.Run("Float64/CumulativePrecomputedSum", testCumulativePrecomputedSumConcurrentSafe[float64]()) } func validateSum[N int64 | float64](t *testing.T, got metricdata.Aggregation) { s, ok := got.(metricdata.Sum[N]) if !ok { t.Fatalf("wrong aggregation type: %+v", got) } for _, dp := range s.DataPoints { assert.False(t, dp.Time.Before(dp.StartTime), "Timestamp %v must not be before start time %v", dp.Time, dp.StartTime, ) switch dp.Attributes { case fltrAlice: // alice observations are always a multiple of 2 assert.Equal(t, int64(0), int64(dp.Value)%2) case fltrBob: // bob observations are always a multiple of 3 assert.Equal(t, int64(0), int64(dp.Value)%3) default: t.Fatalf("wrong attributes %+v", dp.Attributes) } } } func testDeltaSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, }.Sum(mono) return testAggergationConcurrentSafe[N](in, out, validateSum[N]) } func testCumulativeSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, }.Sum(mono) return testAggergationConcurrentSafe[N](in, out, validateSum[N]) } func testDeltaPrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, }.PrecomputedSum(mono) return testAggergationConcurrentSafe[N](in, out, validateSum[N]) } func testCumulativePrecomputedSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, }.PrecomputedSum(mono) return testAggergationConcurrentSafe[N](in, out, validateSum[N]) } func BenchmarkSum(b *testing.B) { // The monotonic argument is only used to annotate the Sum returned from // the Aggregation method. It should not have an effect on operational // performance, therefore, only monotonic=false is benchmarked here. b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ Temporality: metricdata.CumulativeTemporality, }.Sum(false) })) b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ Temporality: metricdata.DeltaTemporality, }.Sum(false) })) b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ Temporality: metricdata.CumulativeTemporality, }.Sum(false) })) b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ Temporality: metricdata.DeltaTemporality, }.Sum(false) })) b.Run("Precomputed/Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ Temporality: metricdata.CumulativeTemporality, }.PrecomputedSum(false) })) b.Run("Precomputed/Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ Temporality: metricdata.DeltaTemporality, }.PrecomputedSum(false) })) b.Run("Precomputed/Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ Temporality: metricdata.CumulativeTemporality, }.PrecomputedSum(false) })) b.Run("Precomputed/Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ Temporality: metricdata.DeltaTemporality, }.PrecomputedSum(false) })) }