From 69800ee189281df90523a97e8e5e624b282163d6 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 10 May 2024 07:19:46 -0700 Subject: [PATCH] Support Delta & Cumulative temporality for LastValue aggregates (#5305) * Add delta/cumulative/precomputed LastValue agg * Add cumulative testing * Add precomputed testing * Add changelog entry --- CHANGELOG.md | 1 + sdk/metric/instrument_test.go | 4 +- sdk/metric/internal/aggregate/aggregate.go | 29 +- sdk/metric/internal/aggregate/lastvalue.go | 88 ++++- .../internal/aggregate/lastvalue_test.go | 374 +++++++++++++++++- sdk/metric/pipeline.go | 5 +- 6 files changed, 472 insertions(+), 29 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a3bda374..d83413dd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - De-duplicate map attributes added to a `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5230) - The `go.opentelemetry.io/otel/exporters/stdout/stdoutlog` exporter won't print `AttributeValueLengthLimit` and `AttributeCountLimit` fields now, instead it prints the `DroppedAttributes` field. (#5272) - Improved performance in the `Stringer` implementation of `go.opentelemetry.io/otel/baggage.Member` by reducing the number of allocations. (#5286) +- Set the start time for last-value aggregates in `go.opentelemetry.io/otel/sdk/metric`. (#5305) - The `Span` in `go.opentelemetry.io/otel/sdk/trace` will record links without span context if either non-empty `TraceState` or attributes are provided. (#5315) ### Fixed diff --git a/sdk/metric/instrument_test.go b/sdk/metric/instrument_test.go index 712fddc45..60066f425 100644 --- a/sdk/metric/instrument_test.go +++ b/sdk/metric/instrument_test.go @@ -25,7 +25,7 @@ func BenchmarkInstrument(b *testing.B) { build := aggregate.Builder[int64]{} var meas []aggregate.Measure[int64] - in, _ := build.LastValue() + in, _ := build.PrecomputedLastValue() meas = append(meas, in) build.Temporality = metricdata.CumulativeTemporality @@ -50,7 +50,7 @@ func BenchmarkInstrument(b *testing.B) { build := aggregate.Builder[int64]{} var meas []aggregate.Measure[int64] - in, _ := build.LastValue() + in, _ := build.PrecomputedLastValue() meas = append(meas, in) build.Temporality = metricdata.CumulativeTemporality diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 0a97444a4..c9976de6c 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -74,21 +74,26 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] { } // LastValue returns a last-value aggregate function input and output. -// -// The Builder.Temporality is ignored and delta is use always. func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { - // Delta temporality is the only temporality that makes semantic sense for - // a last-value aggregate. lv := newLastValue[N](b.AggregationLimit, b.resFunc()) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(lv.measure), lv.delta + default: + return b.filter(lv.measure), lv.cumulative + } +} - return b.filter(lv.measure), func(dest *metricdata.Aggregation) int { - // Ignore if dest is not a metricdata.Gauge. The chance for memory - // reuse of the DataPoints is missed (better luck next time). - gData, _ := (*dest).(metricdata.Gauge[N]) - lv.computeAggregation(&gData.DataPoints) - *dest = gData - - return len(gData.DataPoints) +// PrecomputedLastValue returns a last-value aggregate function input and +// output. The aggregation returned from the returned ComputeAggregation +// function will always only return values from the previous collection cycle. +func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) { + lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc()) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(lv.measure), lv.delta + default: + return b.filter(lv.measure), lv.cumulative } } diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index f3238974c..8f406dd2b 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -26,6 +26,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *la newRes: r, limit: newLimiter[datapoint[N]](limit), values: make(map[attribute.Distinct]datapoint[N]), + start: now(), } } @@ -36,6 +37,7 @@ type lastValue[N int64 | float64] struct { newRes func() exemplar.Reservoir limit limiter[datapoint[N]] values map[attribute.Distinct]datapoint[N] + start time.Time } func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { @@ -58,23 +60,103 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. s.values[attr.Equivalent()] = d } -func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { +func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + s.Lock() defer s.Unlock() + n := s.copyDpts(&gData.DataPoints) + // Do not report stale values. + clear(s.values) + // Update start time for delta temporality. + s.start = now() + + *dest = gData + + return n +} + +func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + + s.Lock() + defer s.Unlock() + + n := s.copyDpts(&gData.DataPoints) + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + *dest = gData + + return n +} + +// copyDpts copies the datapoints held by s into dest. The number of datapoints +// copied is returned. +func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int { n := len(s.values) *dest = reset(*dest, n, n) var i int for _, v := range s.values { (*dest)[i].Attributes = v.attrs - // The event time is the only meaningful timestamp, StartTime is - // ignored. + (*dest)[i].StartTime = s.start (*dest)[i].Time = v.timestamp (*dest)[i].Value = v.value collectExemplars(&(*dest)[i].Exemplars, v.res.Collect) i++ } + return n +} + +// newPrecomputedLastValue returns an aggregator that summarizes a set of +// observations as the last one made. +func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *precomputedLastValue[N] { + return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} +} + +// precomputedLastValue summarizes a set of observations as the last one made. +type precomputedLastValue[N int64 | float64] struct { + *lastValue[N] +} + +func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + + s.Lock() + defer s.Unlock() + + n := s.copyDpts(&gData.DataPoints) // Do not report stale values. clear(s.values) + // Update start time for delta temporality. + s.start = now() + + *dest = gData + + return n +} + +func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + + s.Lock() + defer s.Unlock() + + n := s.copyDpts(&gData.DataPoints) + // Do not report stale values. + clear(s.values) + *dest = gData + + return n } diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index 4aae11e7d..8504e3b19 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -14,14 +14,29 @@ func TestLastValue(t *testing.T) { c := new(clock) t.Cleanup(c.Register()) - t.Run("Int64", testLastValue[int64]()) + t.Run("Int64/DeltaLastValue", testDeltaLastValue[int64]()) + c.Reset() + t.Run("Float64/DeltaLastValue", testDeltaLastValue[float64]()) c.Reset() - t.Run("Float64", testLastValue[float64]()) + t.Run("Int64/CumulativeLastValue", testCumulativeLastValue[int64]()) + c.Reset() + t.Run("Float64/CumulativeLastValue", testCumulativeLastValue[float64]()) + c.Reset() + + t.Run("Int64/DeltaPrecomputedLastValue", testDeltaPrecomputedLastValue[int64]()) + c.Reset() + t.Run("Float64/DeltaPrecomputedLastValue", testDeltaPrecomputedLastValue[float64]()) + c.Reset() + + t.Run("Int64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValue[int64]()) + c.Reset() + t.Run("Float64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValue[float64]()) } -func testLastValue[N int64 | float64]() func(*testing.T) { +func testDeltaLastValue[N int64 | float64]() func(*testing.T) { in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, }.LastValue() @@ -45,12 +60,14 @@ func testLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - Time: y2kPlus(3), + StartTime: y2kPlus(1), + Time: y2kPlus(5), Value: 2, }, { Attributes: fltrBob, - Time: y2kPlus(4), + StartTime: y2kPlus(1), + Time: y2kPlus(6), Value: -10, }, }, @@ -71,12 +88,14 @@ func testLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - Time: y2kPlus(5), + StartTime: y2kPlus(8), + Time: y2kPlus(9), Value: 10, }, { Attributes: fltrBob, - Time: y2kPlus(6), + StartTime: y2kPlus(8), + Time: y2kPlus(10), Value: 3, }, }, @@ -96,17 +115,350 @@ func testLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, - Time: y2kPlus(7), + StartTime: y2kPlus(11), + Time: y2kPlus(12), Value: 1, }, { Attributes: fltrBob, - Time: y2kPlus(8), + StartTime: y2kPlus(11), + Time: y2kPlus(13), Value: 1, }, { Attributes: overflowSet, + StartTime: y2kPlus(11), + Time: y2kPlus(15), + Value: 1, + }, + }, + }, + }, + }, + }) +} + +func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.LastValue() + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + // Empty output if nothing is measured. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, fltrAlice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(0), + Time: y2kPlus(4), + Value: 2, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(0), + Time: y2kPlus(5), + Value: -10, + }, + }, + }, + }, + }, { + // Cumulative temporality means no resets. + input: []arg[N]{}, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(0), + Time: y2kPlus(4), + Value: 2, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(0), + Time: y2kPlus(5), + Value: -10, + }, + }, + }, + }, + }, { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(0), + Time: y2kPlus(6), + Value: 10, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(0), + Time: y2kPlus(7), + Value: 3, + }, + }, + }, + }, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(0), + Time: y2kPlus(8), + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(0), + Time: y2kPlus(9), + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: y2kPlus(0), + Time: y2kPlus(11), + Value: 1, + }, + }, + }, + }, + }, + }) +} + +func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.PrecomputedLastValue() + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + // Empty output if nothing is measured. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, fltrAlice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(1), + Time: y2kPlus(5), + Value: 2, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(1), + Time: y2kPlus(6), + Value: -10, + }, + }, + }, + }, + }, { + // Everything resets, do not report old measurements. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(8), + Time: y2kPlus(9), + Value: 10, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(8), Time: y2kPlus(10), + Value: 3, + }, + }, + }, + }, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(11), + Time: y2kPlus(12), + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(11), + Time: y2kPlus(13), + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: y2kPlus(11), + Time: y2kPlus(15), + Value: 1, + }, + }, + }, + }, + }, + }) +} + +func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.PrecomputedLastValue() + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + // Empty output if nothing is measured. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, fltrAlice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(0), + Time: y2kPlus(4), + Value: 2, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(0), + Time: y2kPlus(5), + Value: -10, + }, + }, + }, + }, + }, { + // Everything resets, do not report old measurements. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(0), + Time: y2kPlus(6), + Value: 10, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(0), + Time: y2kPlus(7), + Value: 3, + }, + }, + }, + }, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: y2kPlus(0), + Time: y2kPlus(8), + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: y2kPlus(0), + Time: y2kPlus(9), + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: y2kPlus(0), + Time: y2kPlus(11), Value: 1, }, }, @@ -117,6 +469,6 @@ func testLastValue[N int64 | float64]() func(*testing.T) { } func BenchmarkLastValue(b *testing.B) { - b.Run("Int64", benchmarkAggregate(Builder[int64]{}.LastValue)) - b.Run("Float64", benchmarkAggregate(Builder[float64]{}.LastValue)) + b.Run("Int64", benchmarkAggregate(Builder[int64]{}.PrecomputedLastValue)) + b.Run("Float64", benchmarkAggregate(Builder[float64]{}.PrecomputedLastValue)) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index f21679746..45dab6619 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -447,7 +447,10 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg Aggregation, kin case AggregationDrop: // Return nil in and out to signify the drop aggregator. case AggregationLastValue: - meas, comp = b.LastValue() + if kind == InstrumentKindObservableGauge { + meas, comp = b.PrecomputedLastValue() + } + // TODO (#5304): Support synchronous gauges. case AggregationSum: switch kind { case InstrumentKindObservableCounter: