1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-07-17 01:12:45 +02:00

Simplify the sum aggregators (#4357)

* Simplify the sum aggregators

* Comment how memory reuse misses are handled
This commit is contained in:
Tyler Yahn
2023-07-26 13:32:45 -07:00
committed by GitHub
parent 830fae8d70
commit b4264c53bc
4 changed files with 512 additions and 423 deletions

View File

@ -88,39 +88,23 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// PrecomputedSum returns a sum aggregate function input and output. The // PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values. // arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) { func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
var s aggregator[N] s := newPrecomputedSum[N](monotonic)
switch b.Temporality { switch b.Temporality {
case metricdata.DeltaTemporality: case metricdata.DeltaTemporality:
s = newPrecomputedDeltaSum[N](monotonic) return b.filter(s.measure), s.delta
default: default:
s = newPrecomputedCumulativeSum[N](monotonic) return b.filter(s.measure), s.cumulative
}
return b.input(s), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = s.Aggregation()
sData, _ := (*dest).(metricdata.Sum[N])
return len(sData.DataPoints)
} }
} }
// Sum returns a sum aggregate function input and output. // Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
var s aggregator[N] s := newSum[N](monotonic)
switch b.Temporality { switch b.Temporality {
case metricdata.DeltaTemporality: case metricdata.DeltaTemporality:
s = newDeltaSum[N](monotonic) return b.filter(s.measure), s.delta
default: default:
s = newCumulativeSum[N](monotonic) return b.filter(s.measure), s.cumulative
}
return b.input(s), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = s.Aggregation()
sData, _ := (*dest).(metricdata.Sum[N])
return len(sData.DataPoints)
} }
} }

View File

@ -38,10 +38,6 @@ func monoIncr[N int64 | float64]() setMap[N] {
return setMap[N]{alice: 1, bob: 10, carol: 2} return setMap[N]{alice: 1, bob: 10, carol: 2}
} }
func nonMonoIncr[N int64 | float64]() setMap[N] {
return setMap[N]{alice: 1, bob: -1, carol: 2}
}
// setMap maps attribute sets to a number. // setMap maps attribute sets to a number.
type setMap[N int64 | float64] map[attribute.Set]N type setMap[N int64 | float64] map[attribute.Set]N

View File

@ -15,6 +15,7 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import ( import (
"context"
"sync" "sync"
"time" "time"
@ -32,257 +33,190 @@ func newValueMap[N int64 | float64]() *valueMap[N] {
return &valueMap[N]{values: make(map[attribute.Set]N)} return &valueMap[N]{values: make(map[attribute.Set]N)}
} }
func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) { func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) {
s.Lock() s.Lock()
s.values[attr] += value s.values[attr] += value
s.Unlock() s.Unlock()
} }
// newDeltaSum returns an Aggregator that summarizes a set of measurements as // newSum returns an aggregator that summarizes a set of measurements as their
// their arithmetic sum. Each sum is scoped by attributes and the aggregation // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// cycle the measurements were made in. // the measurements were made in.
// func newSum[N int64 | float64](monotonic bool) *sum[N] {
// The monotonic value is used to communicate the produced Aggregation is return &sum[N]{
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregation method is called it will reset all sums to zero.
func newDeltaSum[N int64 | float64](monotonic bool) aggregator[N] {
return &deltaSum[N]{
valueMap: newValueMap[N](), valueMap: newValueMap[N](),
monotonic: monotonic, monotonic: monotonic,
start: now(), start: now(),
} }
} }
// deltaSum summarizes a set of measurements made in a single aggregation // sum summarizes a set of measurements made as their arithmetic sum.
// cycle as their arithmetic sum. type sum[N int64 | float64] struct {
type deltaSum[N int64 | float64] struct {
*valueMap[N] *valueMap[N]
monotonic bool monotonic bool
start time.Time start time.Time
} }
func (s *deltaSum[N]) Aggregation() metricdata.Aggregation { func (s *sum[N]) delta(dest *metricdata.Aggregation) int {
t := now()
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.DeltaTemporality
sData.IsMonotonic = s.monotonic
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if len(s.values) == 0 { n := len(s.values)
return nil dPts := reset(sData.DataPoints, n, n)
}
t := now() var i int
out := metricdata.Sum[N]{
Temporality: metricdata.DeltaTemporality,
IsMonotonic: s.monotonic,
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, value := range s.values { for attr, value := range s.values {
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ dPts[i].Attributes = attr
Attributes: attr, dPts[i].StartTime = s.start
StartTime: s.start, dPts[i].Time = t
Time: t, dPts[i].Value = value
Value: value, // Do not report stale values.
})
// Unused attribute sets do not report.
delete(s.values, attr) delete(s.values, attr)
i++
} }
// The delta collection cycle resets. // The delta collection cycle resets.
s.start = t s.start = t
return out
sData.DataPoints = dPts
*dest = sData
return n
} }
// newCumulativeSum returns an Aggregator that summarizes a set of func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
// measurements as their arithmetic sum. Each sum is scoped by attributes and t := now()
// the aggregation cycle the measurements were made in.
//
// The monotonic value is used to communicate the produced Aggregation is
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregation method is called it will reset all sums to zero.
func newCumulativeSum[N int64 | float64](monotonic bool) aggregator[N] {
return &cumulativeSum[N]{
valueMap: newValueMap[N](),
monotonic: monotonic,
start: now(),
}
}
// cumulativeSum summarizes a set of measurements made over all aggregation // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// cycles as their arithmetic sum. // use the zero-value sData and hope for better alignment next cycle.
type cumulativeSum[N int64 | float64] struct { sData, _ := (*dest).(metricdata.Sum[N])
*valueMap[N] sData.Temporality = metricdata.CumulativeTemporality
sData.IsMonotonic = s.monotonic
monotonic bool
start time.Time
}
func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if len(s.values) == 0 { n := len(s.values)
return nil dPts := reset(sData.DataPoints, n, n)
}
t := now() var i int
out := metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: s.monotonic,
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, value := range s.values { for attr, value := range s.values {
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ dPts[i].Attributes = attr
Attributes: attr, dPts[i].StartTime = s.start
StartTime: s.start, dPts[i].Time = t
Time: t, dPts[i].Value = value
Value: value,
})
// TODO (#3006): This will use an unbounded amount of memory if there // TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute // are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not // sets that become "stale" need to be forgotten so this will not
// overload the system. // overload the system.
i++
} }
return out
sData.DataPoints = dPts
*dest = sData
return n
} }
// newPrecomputedDeltaSum returns an Aggregator that summarizes a set of // newPrecomputedSum returns an aggregator that summarizes a set of
// pre-computed sums. Each sum is scoped by attributes and the aggregation // observatrions as their arithmetic sum. Each sum is scoped by attributes and
// cycle the measurements were made in. // the aggregation cycle the measurements were made in.
// func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] {
// The monotonic value is used to communicate the produced Aggregation is return &precomputedSum[N]{
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// The output Aggregation will report recorded values as delta temporality.
func newPrecomputedDeltaSum[N int64 | float64](monotonic bool) aggregator[N] {
return &precomputedDeltaSum[N]{
valueMap: newValueMap[N](), valueMap: newValueMap[N](),
reported: make(map[attribute.Set]N),
monotonic: monotonic, monotonic: monotonic,
start: now(), start: now(),
} }
} }
// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all // precomputedSum summarizes a set of observatrions as their arithmetic sum.
// aggregation cycles as the delta of these sums. type precomputedSum[N int64 | float64] struct {
type precomputedDeltaSum[N int64 | float64] struct {
*valueMap[N] *valueMap[N]
reported map[attribute.Set]N
monotonic bool monotonic bool
start time.Time start time.Time
reported map[attribute.Set]N
} }
// Aggregation returns the recorded pre-computed sums as an Aggregation. The func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int {
// sum values are expressed as the delta between what was measured this t := now()
// collection cycle and the previous.
//
// All pre-computed sums that were recorded for attributes sets reduced by an
// attribute filter (filtered-sums) are summed together and added to any
// pre-computed sum value recorded directly for the resulting attribute set
// (unfiltered-sum). The filtered-sums are reset to zero for the next
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
newReported := make(map[attribute.Set]N) newReported := make(map[attribute.Set]N)
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
// use the zero-value sData and hope for better alignment next cycle.
sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.DeltaTemporality
sData.IsMonotonic = s.monotonic
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if len(s.values) == 0 { n := len(s.values)
s.reported = newReported dPts := reset(sData.DataPoints, n, n)
return nil
}
t := now() var i int
out := metricdata.Sum[N]{
Temporality: metricdata.DeltaTemporality,
IsMonotonic: s.monotonic,
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, value := range s.values { for attr, value := range s.values {
delta := value - s.reported[attr] delta := value - s.reported[attr]
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr, dPts[i].Attributes = attr
StartTime: s.start, dPts[i].StartTime = s.start
Time: t, dPts[i].Time = t
Value: delta, dPts[i].Value = delta
})
newReported[attr] = value newReported[attr] = value
// Unused attribute sets do not report. // Unused attribute sets do not report.
delete(s.values, attr) delete(s.values, attr)
i++
} }
// Unused attribute sets are forgotten. // Unused attribute sets are forgotten.
s.reported = newReported s.reported = newReported
// The delta collection cycle resets. // The delta collection cycle resets.
s.start = t s.start = t
return out
sData.DataPoints = dPts
*dest = sData
return n
} }
// newPrecomputedCumulativeSum returns an Aggregator that summarizes a set of func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int {
// pre-computed sums. Each sum is scoped by attributes and the aggregation t := now()
// cycle the measurements were made in.
//
// The monotonic value is used to communicate the produced Aggregation is
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// The output Aggregation will report recorded values as cumulative
// temporality.
func newPrecomputedCumulativeSum[N int64 | float64](monotonic bool) aggregator[N] {
return &precomputedCumulativeSum[N]{
valueMap: newValueMap[N](),
monotonic: monotonic,
start: now(),
}
}
// precomputedCumulativeSum directly records and reports a set of pre-computed sums. // If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
type precomputedCumulativeSum[N int64 | float64] struct { // use the zero-value sData and hope for better alignment next cycle.
*valueMap[N] sData, _ := (*dest).(metricdata.Sum[N])
sData.Temporality = metricdata.CumulativeTemporality
sData.IsMonotonic = s.monotonic
monotonic bool
start time.Time
}
// Aggregation returns the recorded pre-computed sums as an Aggregation. The
// sum values are expressed directly as they are assumed to be recorded as the
// cumulative sum of a some measured phenomena.
//
// All pre-computed sums that were recorded for attributes sets reduced by an
// attribute filter (filtered-sums) are summed together and added to any
// pre-computed sum value recorded directly for the resulting attribute set
// (unfiltered-sum). The filtered-sums are reset to zero for the next
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
if len(s.values) == 0 { n := len(s.values)
return nil dPts := reset(sData.DataPoints, n, n)
}
t := now() var i int
out := metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: s.monotonic,
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, value := range s.values { for attr, value := range s.values {
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ dPts[i].Attributes = attr
Attributes: attr, dPts[i].StartTime = s.start
StartTime: s.start, dPts[i].Time = t
Time: t, dPts[i].Value = value
Value: value,
})
// Unused attribute sets do not report. // Unused attribute sets do not report.
delete(s.values, attr) delete(s.values, attr)
i++
} }
return out
sData.DataPoints = dPts
*dest = sData
return n
} }

View File

@ -15,250 +15,425 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import ( import (
"context"
"testing" "testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
) )
func TestSum(t *testing.T) { func TestSum(t *testing.T) {
t.Cleanup(mockTime(now)) t.Cleanup(mockTime(now))
t.Run("Int64", testSum[int64])
t.Run("Float64", testSum[float64]) t.Run("Int64/DeltaSum", testDeltaSum[int64]())
t.Run("Float64/DeltaSum", testDeltaSum[float64]())
t.Run("Int64/CumulativeSum", testCumulativeSum[int64]())
t.Run("Float64/CumulativeSum", testCumulativeSum[float64]())
t.Run("Int64/DeltaPrecomputedSum", testDeltaPrecomputedSum[int64]())
t.Run("Float64/DeltaPrecomputedSum", testDeltaPrecomputedSum[float64]())
t.Run("Int64/CumulativePrecomputedSum", testCumulativePrecomputedSum[int64]())
t.Run("Float64/CumulativePrecomputedSum", testCumulativePrecomputedSum[float64]())
} }
func testSum[N int64 | float64](t *testing.T) { func testDeltaSum[N int64 | float64]() func(t *testing.T) {
tester := &aggregatorTester[N]{ mono := false
GoroutineN: defaultGoroutines, in, out := Builder[N]{
MeasurementN: defaultMeasurements,
CycleN: defaultCycles,
}
totalMeasurements := defaultGoroutines * defaultMeasurements
t.Run("Delta", func(t *testing.T) {
incr, mono := monoIncr[N](), true
eFunc := deltaExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(newDeltaSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false
eFunc = deltaExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(newDeltaSum[N](mono), incr, eFunc))
})
t.Run("Cumulative", func(t *testing.T) {
incr, mono := monoIncr[N](), true
eFunc := cumuExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(newCumulativeSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false
eFunc = cumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(newCumulativeSum[N](mono), incr, eFunc))
})
t.Run("PreComputedDelta", func(t *testing.T) {
incr, mono := monoIncr[N](), true
eFunc := preDeltaExpecter[N](incr, mono, N(totalMeasurements))
t.Run("Monotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false
eFunc = preDeltaExpecter[N](incr, mono, N(totalMeasurements))
t.Run("NonMonotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc))
})
t.Run("PreComputedCumulative", func(t *testing.T) {
incr, mono := monoIncr[N](), true
eFunc := preCumuExpecter[N](incr, mono, N(totalMeasurements))
t.Run("Monotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false
eFunc = preCumuExpecter[N](incr, mono, N(totalMeasurements))
t.Run("NonMonotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc))
})
}
func deltaExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
return func(m int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point(a, v*N(m)))
}
return sum
}
}
func cumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
var cycle N
sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono}
return func(m int) metricdata.Aggregation {
cycle++
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point(a, v*cycle*N(m)))
}
return sum
}
}
func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
last := make(map[attribute.Set]N)
return func(int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
l := last[a]
sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*(N(v)-l)))
last[a] = N(v)
}
return sum
}
}
func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono}
return func(int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*N(v)))
}
return sum
}
}
// point returns a DataPoint that started and ended now.
func point[N int64 | float64](a attribute.Set, v N) metricdata.DataPoint[N] {
return metricdata.DataPoint[N]{
Attributes: a,
StartTime: now(),
Time: now(),
Value: N(v),
}
}
func testDeltaSumReset[N int64 | float64](t *testing.T) {
t.Cleanup(mockTime(now))
a := newDeltaSum[N](false)
assert.Nil(t, a.Aggregation())
a.Aggregate(1, alice)
expect := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality}
expect.DataPoints = []metricdata.DataPoint[N]{point[N](alice, 1)}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())
// The attr set should be forgotten once Aggregations is called.
expect.DataPoints = nil
assert.Nil(t, a.Aggregation())
// Aggregating another set should not affect the original (alice).
a.Aggregate(1, bob)
expect.DataPoints = []metricdata.DataPoint[N]{point[N](bob, 1)}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())
}
func TestDeltaSumReset(t *testing.T) {
t.Run("Int64", testDeltaSumReset[int64])
t.Run("Float64", testDeltaSumReset[float64])
}
func TestPreComputedDeltaSum(t *testing.T) {
var mono bool
agg := newPrecomputedDeltaSum[int64](mono)
require.Implements(t, (*aggregator[int64])(nil), agg)
attrs := attribute.NewSet(attribute.String("key", "val"))
agg.Aggregate(1, attrs)
want := metricdata.Sum[int64]{
IsMonotonic: mono,
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, Filter: attrFltr,
} }.Sum(mono)
opt := metricdatatest.IgnoreTimestamp() ctx := context.Background()
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) return test[N](in, out, []teststep[N]{
{
// No observation results in an empty aggregation, and causes previous input: []arg[N]{},
// observations to be forgotten. expect: output{
metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt) n: 0,
agg: metricdata.Sum[N]{
agg.Aggregate(1, attrs) IsMonotonic: mono,
// measured(+): 1, previous(-): 0 Temporality: metricdata.DeltaTemporality,
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} DataPoints: []metricdata.DataPoint[N]{},
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) },
},
// Duplicate observations add },
agg.Aggregate(2, attrs) {
agg.Aggregate(5, attrs) input: []arg[N]{
agg.Aggregate(3, attrs) {ctx, 1, alice},
agg.Aggregate(10, attrs) {ctx, -1, bob},
// measured(+): 20, previous(-): 1 {ctx, 1, alice},
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 19)} {ctx, 2, alice},
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) {ctx, -10, bob},
},
expect: output{
n: 2,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Value: 4,
},
{
Attributes: fltrBob,
StartTime: staticTime,
Time: staticTime,
Value: -11,
},
},
},
},
},
{
input: []arg[N]{
{ctx, 10, alice},
{ctx, 3, bob},
},
expect: output{
n: 2,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Value: 10,
},
{
Attributes: fltrBob,
StartTime: staticTime,
Time: staticTime,
Value: 3,
},
},
},
},
},
{
input: []arg[N]{},
// Delta sums are expected to reset.
expect: output{
n: 0,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.DataPoint[N]{},
},
},
},
})
} }
func TestPreComputedCumulativeSum(t *testing.T) { func testCumulativeSum[N int64 | float64]() func(t *testing.T) {
var mono bool mono := false
agg := newPrecomputedCumulativeSum[int64](mono) in, out := Builder[N]{
require.Implements(t, (*aggregator[int64])(nil), agg)
attrs := attribute.NewSet(attribute.String("key", "val"))
agg.Aggregate(1, attrs)
want := metricdata.Sum[int64]{
IsMonotonic: mono,
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, Filter: attrFltr,
} }.Sum(mono)
opt := metricdatatest.IgnoreTimestamp() ctx := context.Background()
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) return test[N](in, out, []teststep[N]{
{
// Cumulative values should not persist. input: []arg[N]{},
metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt) expect: output{
n: 0,
agg.Aggregate(1, attrs) agg: metricdata.Sum[N]{
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} IsMonotonic: mono,
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[N]{},
// Duplicate measurements add },
agg.Aggregate(5, attrs) },
agg.Aggregate(3, attrs) },
agg.Aggregate(10, attrs) {
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} input: []arg[N]{
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) {ctx, 1, alice},
{ctx, -1, bob},
{ctx, 1, alice},
{ctx, 2, alice},
{ctx, -10, bob},
},
expect: output{
n: 2,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Value: 4,
},
{
Attributes: fltrBob,
StartTime: staticTime,
Time: staticTime,
Value: -11,
},
},
},
},
},
{
input: []arg[N]{
{ctx, 10, alice},
{ctx, 3, bob},
},
expect: output{
n: 2,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Value: 14,
},
{
Attributes: fltrBob,
StartTime: staticTime,
Time: staticTime,
Value: -8,
},
},
},
},
},
})
} }
func TestEmptySumNilAggregation(t *testing.T) { func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) {
assert.Nil(t, newCumulativeSum[int64](true).Aggregation()) mono := false
assert.Nil(t, newCumulativeSum[int64](false).Aggregation()) in, out := Builder[N]{
assert.Nil(t, newCumulativeSum[float64](true).Aggregation()) Temporality: metricdata.DeltaTemporality,
assert.Nil(t, newCumulativeSum[float64](false).Aggregation()) Filter: attrFltr,
assert.Nil(t, newDeltaSum[int64](true).Aggregation()) }.PrecomputedSum(mono)
assert.Nil(t, newDeltaSum[int64](false).Aggregation()) ctx := context.Background()
assert.Nil(t, newDeltaSum[float64](true).Aggregation()) return test[N](in, out, []teststep[N]{
assert.Nil(t, newDeltaSum[float64](false).Aggregation()) {
assert.Nil(t, newPrecomputedCumulativeSum[int64](true).Aggregation()) input: []arg[N]{},
assert.Nil(t, newPrecomputedCumulativeSum[int64](false).Aggregation()) expect: output{
assert.Nil(t, newPrecomputedCumulativeSum[float64](true).Aggregation()) n: 0,
assert.Nil(t, newPrecomputedCumulativeSum[float64](false).Aggregation()) agg: metricdata.Sum[N]{
assert.Nil(t, newPrecomputedDeltaSum[int64](true).Aggregation()) IsMonotonic: mono,
assert.Nil(t, newPrecomputedDeltaSum[int64](false).Aggregation()) Temporality: metricdata.DeltaTemporality,
assert.Nil(t, newPrecomputedDeltaSum[float64](true).Aggregation()) DataPoints: []metricdata.DataPoint[N]{},
assert.Nil(t, newPrecomputedDeltaSum[float64](false).Aggregation()) },
},
},
{
input: []arg[N]{
{ctx, 1, alice},
{ctx, -1, bob},
{ctx, 1, fltrAlice},
{ctx, 2, alice},
{ctx, -10, bob},
},
expect: output{
n: 2,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Value: 4,
},
{
Attributes: fltrBob,
StartTime: staticTime,
Time: staticTime,
Value: -11,
},
},
},
},
},
{
input: []arg[N]{
{ctx, 1, fltrAlice},
{ctx, 10, alice},
{ctx, 3, bob},
},
expect: output{
n: 2,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Value: 7,
},
{
Attributes: fltrBob,
StartTime: staticTime,
Time: staticTime,
Value: 14,
},
},
},
},
},
{
input: []arg[N]{},
// Precomputed sums are expected to reset.
expect: output{
n: 0,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.DataPoint[N]{},
},
},
},
})
}
func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) {
mono := false
in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
}.PrecomputedSum(mono)
ctx := context.Background()
return test[N](in, out, []teststep[N]{
{
input: []arg[N]{},
expect: output{
n: 0,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[N]{},
},
},
},
{
input: []arg[N]{
{ctx, 1, alice},
{ctx, -1, bob},
{ctx, 1, fltrAlice},
{ctx, 2, alice},
{ctx, -10, bob},
},
expect: output{
n: 2,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Value: 4,
},
{
Attributes: fltrBob,
StartTime: staticTime,
Time: staticTime,
Value: -11,
},
},
},
},
},
{
input: []arg[N]{
{ctx, 1, fltrAlice},
{ctx, 10, alice},
{ctx, 3, bob},
},
expect: output{
n: 2,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: staticTime,
Time: staticTime,
Value: 11,
},
{
Attributes: fltrBob,
StartTime: staticTime,
Time: staticTime,
Value: 3,
},
},
},
},
},
{
input: []arg[N]{},
// Precomputed sums are expected to reset.
expect: output{
n: 0,
agg: metricdata.Sum[N]{
IsMonotonic: mono,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[N]{},
},
},
},
})
} }
func BenchmarkSum(b *testing.B) { func BenchmarkSum(b *testing.B) {
b.Run("Int64", benchmarkSum[int64])
b.Run("Float64", benchmarkSum[float64])
}
func benchmarkSum[N int64 | float64](b *testing.B) {
// The monotonic argument is only used to annotate the Sum returned from // The monotonic argument is only used to annotate the Sum returned from
// the Aggregation method. It should not have an effect on operational // the Aggregation method. It should not have an effect on operational
// performance, therefore, only monotonic=false is benchmarked here. // performance, therefore, only monotonic=false is benchmarked here.
factory := func() aggregator[N] { return newDeltaSum[N](false) } b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
b.Run("Delta", benchmarkAggregator(factory)) return Builder[int64]{
factory = func() aggregator[N] { return newCumulativeSum[N](false) } Temporality: metricdata.CumulativeTemporality,
b.Run("Cumulative", benchmarkAggregator(factory)) }.Sum(false)
}))
b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{
Temporality: metricdata.DeltaTemporality,
}.Sum(false)
}))
b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.CumulativeTemporality,
}.Sum(false)
}))
b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.DeltaTemporality,
}.Sum(false)
}))
b.Run("Precomputed/Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{
Temporality: metricdata.CumulativeTemporality,
}.PrecomputedSum(false)
}))
b.Run("Precomputed/Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{
Temporality: metricdata.DeltaTemporality,
}.PrecomputedSum(false)
}))
b.Run("Precomputed/Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.CumulativeTemporality,
}.PrecomputedSum(false)
}))
b.Run("Precomputed/Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.DeltaTemporality,
}.PrecomputedSum(false)
}))
} }