1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-09-16 09:26:25 +02:00

Revert Cleanup interaction of exemplar and aggregation (#5913)

Topic: #5249

This reverts commit 8041156518 (PR: #5899)
due to the performance degradation found by Benchmarks CI
https://github.com/open-telemetry/opentelemetry-go/actions/runs/11447364022/job/31848519243

Here is the benchmark test on my machine:

```
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/sdk/metric
                                       │   old.txt   │                new.txt                 │
                                       │   sec/op    │    sec/op     vs base                  │
Instrument/instrumentImpl/aggregate-10   3.378µ ± 3%   49.366µ ± 1%  +1361.40% (p=0.000 n=10)
Instrument/observable/observe-10         2.288µ ± 2%   37.791µ ± 1%  +1551.73% (p=0.000 n=10)
geomean                                  2.780µ         43.19µ       +1453.65%

                                       │   old.txt    │                 new.txt                 │
                                       │     B/op     │     B/op       vs base                  │
Instrument/instrumentImpl/aggregate-10   1.245Ki ± 1%   22.363Ki ± 0%  +1696.08% (p=0.000 n=10)
Instrument/observable/observe-10           823.0 ± 1%    17432.5 ± 0%  +2018.17% (p=0.000 n=10)
geomean                                  1.000Ki         19.51Ki       +1850.48%

                                       │  old.txt   │                new.txt                │
                                       │ allocs/op  │  allocs/op   vs base                  │
Instrument/instrumentImpl/aggregate-10   1.000 ± 0%   21.000 ± 0%  +2000.00% (p=0.000 n=10)
Instrument/observable/observe-10         1.000 ± 0%   16.000 ± 0%  +1500.00% (p=0.000 n=10)
```
This commit is contained in:
Sam Xie
2024-10-23 10:48:07 -07:00
committed by GitHub
parent 7a153a01c6
commit 3429e15b9a
16 changed files with 169 additions and 165 deletions

View File

@@ -6,7 +6,9 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import ( import (
"runtime" "runtime"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
) )
// ExemplarReservoirProviderSelector selects the // ExemplarReservoirProviderSelector selects the
@@ -14,6 +16,14 @@ import (
// based on the [Aggregation] of the metric. // based on the [Aggregation] of the metric.
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider
// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
}
}
// DefaultExemplarReservoirProviderSelector returns the default // DefaultExemplarReservoirProviderSelector returns the default
// [exemplar.ReservoirProvider] for the // [exemplar.ReservoirProvider] for the
// provided [Aggregation]. // provided [Aggregation].

View File

@@ -8,7 +8,6 @@ import (
"testing" "testing"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
) )
@@ -23,7 +22,7 @@ func BenchmarkInstrument(b *testing.B) {
} }
b.Run("instrumentImpl/aggregate", func(b *testing.B) { b.Run("instrumentImpl/aggregate", func(b *testing.B) {
build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)} build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64] var meas []aggregate.Measure[int64]
build.Temporality = metricdata.CumulativeTemporality build.Temporality = metricdata.CumulativeTemporality
@@ -53,7 +52,7 @@ func BenchmarkInstrument(b *testing.B) {
}) })
b.Run("observable/observe", func(b *testing.B) { b.Run("observable/observe", func(b *testing.B) {
build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)} build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64] var meas []aggregate.Measure[int64]
in, _ := build.PrecomputedLastValue() in, _ := build.PrecomputedLastValue()

View File

@@ -8,7 +8,6 @@ import (
"time" "time"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
) )
@@ -34,12 +33,12 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the // Filter is the attribute filter the aggregate function will use on the
// input of measurements. // input of measurements.
Filter attribute.Filter Filter attribute.Filter
// ExemplarFilter is the filter applied to measurements before offering // ReservoirFunc is the factory function used by aggregate functions to
// them to the exemplar Reservoir. // create new exemplar reservoirs for a new seen attribute set.
ExemplarFilter exemplar.Filter //
// ExemplarReservoirProvider is the factory function used to create a new // If this is not provided a default factory function that returns an
// exemplar Reservoir for a given attribute set. // dropReservoir reservoir will be used.
ExemplarReservoirProvider exemplar.ReservoirProvider ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any // AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be // measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow" // aggregated into a single aggregate for the "otel.metric.overflow"
@@ -50,10 +49,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int AggregationLimit int
} }
func (b Builder[N]) resFunc() func(attribute.Set) *filteredExemplarReservoir[N] { func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
return func(attrs attribute.Set) *filteredExemplarReservoir[N] { if b.ReservoirFunc != nil {
return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs)) return b.ReservoirFunc
} }
return dropReservoir
} }
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)

View File

@@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
) )
@@ -73,12 +72,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig } return func() { now = orig }
} }
func newNoopReservoir(attribute.Set) exemplar.Reservoir { func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
return exemplar.NewFixedSizeReservoir(0) return dropReservoir[N](attr)
}
func dropExemplars[N int64 | float64](attr attribute.Set) *filteredExemplarReservoir[N] {
return newFilteredExemplarReservoir[N](exemplar.AlwaysOffFilter, newNoopReservoir(attr))
} }
func TestBuilderFilter(t *testing.T) { func TestBuilderFilter(t *testing.T) {
@@ -104,8 +99,8 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) {
} }
} }
t.Run("NoFilter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir}, attr, nil)) t.Run("NoFilter", run(Builder[N]{}, attr, nil))
t.Run("Filter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir, Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue})) t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
} }
} }

View File

@@ -0,0 +1,26 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
return &dropRes[N]{}
}
type dropRes[N int64 | float64] struct{}
// Offer does nothing, all measurements offered will be dropped.
func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}
// Collect resets dest. No exemplars will ever be returned.
func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) {
*dest = (*dest)[:0]
}

View File

@@ -0,0 +1,27 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate
import (
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
func TestDrop(t *testing.T) {
t.Run("Int64", testDropFiltered[int64])
t.Run("Float64", testDropFiltered[float64])
}
func testDropFiltered[N int64 | float64](t *testing.T) {
r := dropReservoir[N](*attribute.EmptySet())
var dest []exemplar.Exemplar
r.Collect(&dest)
assert.Empty(t, dest, "non-sampled context should not be offered")
}

View File

@@ -30,7 +30,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram. // expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct { type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set attrs attribute.Set
res *filteredExemplarReservoir[N] res FilteredExemplarReservoir[N]
count uint64 count uint64
min N min N
@@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of // newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes // measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in. // and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *expoHistogram[N] { func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{ return &expoHistogram[N]{
noSum: noSum, noSum: noSum,
noMinMax: noMinMax, noMinMax: noMinMax,
@@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int maxSize int
maxScale int32 maxScale int32
newRes func(attribute.Set) *filteredExemplarReservoir[N] newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]] limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N] values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex valuesMu sync.Mutex

View File

@@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
) )
@@ -683,30 +682,22 @@ func BenchmarkExponentialHistogram(b *testing.B) {
b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{ return Builder[int64]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
})) }))
b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{ return Builder[int64]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
})) }))
b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{ return Builder[float64]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
})) }))
b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{ return Builder[float64]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
})) }))
} }
@@ -753,11 +744,9 @@ func TestExponentialHistogramAggregation(t *testing.T) {
func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 2, AggregationLimit: 2,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(4, 20, false, false) }.ExponentialBucketHistogram(4, 20, false, false)
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -882,11 +871,9 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 2, AggregationLimit: 2,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(4, 20, false, false) }.ExponentialBucketHistogram(4, 20, false, false)
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{

View File

@@ -11,16 +11,29 @@ import (
"go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/exemplar"
) )
// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
type FilteredExemplarReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]exemplar.Exemplar)
}
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made. // filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct { type filteredExemplarReservoir[N int64 | float64] struct {
filter exemplar.Filter filter exemplar.Filter
reservoir exemplar.Reservoir reservoir exemplar.Reservoir
} }
// newFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which // NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
// only offers values that are allowed by the filter. If the provided filter is // that are allowed by the filter.
// nil, all measurements are dropped.. func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) *filteredExemplarReservoir[N] {
return &filteredExemplarReservoir[N]{ return &filteredExemplarReservoir[N]{
filter: f, filter: f,
reservoir: r, reservoir: r,

View File

@@ -16,7 +16,7 @@ import (
type buckets[N int64 | float64] struct { type buckets[N int64 | float64] struct {
attrs attribute.Set attrs attribute.Set
res *filteredExemplarReservoir[N] res FilteredExemplarReservoir[N]
counts []uint64 counts []uint64
count uint64 count uint64
@@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool noSum bool
bounds []float64 bounds []float64
newRes func(attribute.Set) *filteredExemplarReservoir[N] newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*buckets[N]] limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N] values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex valuesMu sync.Mutex
} }
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histValues[N] { func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the // The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy // passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have // here so we can always guarantee this. Or, in the case of failure, have
@@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
// newHistogram returns an Aggregator that summarizes a set of measurements as // newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram. // an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histogram[N] { func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{ return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r), histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax, noMinMax: noMinMax,

View File

@@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
) )
@@ -52,11 +51,9 @@ type conf[N int64 | float64] struct {
func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum)
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -141,11 +138,9 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum)
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -380,30 +375,22 @@ func TestDeltaHistogramReset(t *testing.T) {
func BenchmarkHistogram(b *testing.B) { func BenchmarkHistogram(b *testing.B) {
b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{ return Builder[int64]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExplicitBucketHistogram(bounds, noMinMax, false) }.ExplicitBucketHistogram(bounds, noMinMax, false)
})) }))
b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{ return Builder[int64]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExplicitBucketHistogram(bounds, noMinMax, false) }.ExplicitBucketHistogram(bounds, noMinMax, false)
})) }))
b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{ return Builder[float64]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExplicitBucketHistogram(bounds, noMinMax, false) }.ExplicitBucketHistogram(bounds, noMinMax, false)
})) }))
b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{ return Builder[float64]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExplicitBucketHistogram(bounds, noMinMax, false) }.ExplicitBucketHistogram(bounds, noMinMax, false)
})) }))
} }

View File

@@ -16,10 +16,10 @@ import (
type datapoint[N int64 | float64] struct { type datapoint[N int64 | float64] struct {
attrs attribute.Set attrs attribute.Set
value N value N
res *filteredExemplarReservoir[N] res FilteredExemplarReservoir[N]
} }
func newLastValue[N int64 | float64](limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *lastValue[N] { func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{ return &lastValue[N]{
newRes: r, newRes: r,
limit: newLimiter[datapoint[N]](limit), limit: newLimiter[datapoint[N]](limit),
@@ -32,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func(attribute.Set) *filteredE
type lastValue[N int64 | float64] struct { type lastValue[N int64 | float64] struct {
sync.Mutex sync.Mutex
newRes func(attribute.Set) *filteredExemplarReservoir[N] newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[datapoint[N]] limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N] values map[attribute.Distinct]datapoint[N]
start time.Time start time.Time
@@ -114,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in
// newPrecomputedLastValue returns an aggregator that summarizes a set of // newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made. // observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *precomputedLastValue[N] { func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
} }

View File

@@ -7,7 +7,6 @@ import (
"context" "context"
"testing" "testing"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
) )
@@ -37,11 +36,9 @@ func TestLastValue(t *testing.T) {
func testDeltaLastValue[N int64 | float64]() func(*testing.T) { func testDeltaLastValue[N int64 | float64]() func(*testing.T) {
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.LastValue() }.LastValue()
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -143,11 +140,9 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) {
func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.LastValue() }.LastValue()
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -267,11 +262,9 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) {
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedLastValue() }.PrecomputedLastValue()
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -373,11 +366,9 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) {
func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) {
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedLastValue() }.PrecomputedLastValue()
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -478,12 +469,6 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) {
} }
func BenchmarkLastValue(b *testing.B) { func BenchmarkLastValue(b *testing.B) {
b.Run("Int64", benchmarkAggregate(Builder[int64]{ b.Run("Int64", benchmarkAggregate(Builder[int64]{}.PrecomputedLastValue))
ExemplarFilter: exemplar.AlwaysOffFilter, b.Run("Float64", benchmarkAggregate(Builder[float64]{}.PrecomputedLastValue))
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedLastValue))
b.Run("Float64", benchmarkAggregate(Builder[float64]{
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedLastValue))
} }

View File

@@ -14,19 +14,19 @@ import (
type sumValue[N int64 | float64] struct { type sumValue[N int64 | float64] struct {
n N n N
res *filteredExemplarReservoir[N] res FilteredExemplarReservoir[N]
attrs attribute.Set attrs attribute.Set
} }
// valueMap is the storage for sums. // valueMap is the storage for sums.
type valueMap[N int64 | float64] struct { type valueMap[N int64 | float64] struct {
sync.Mutex sync.Mutex
newRes func(attribute.Set) *filteredExemplarReservoir[N] newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[sumValue[N]] limit limiter[sumValue[N]]
values map[attribute.Distinct]sumValue[N] values map[attribute.Distinct]sumValue[N]
} }
func newValueMap[N int64 | float64](limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *valueMap[N] { func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] {
return &valueMap[N]{ return &valueMap[N]{
newRes: r, newRes: r,
limit: newLimiter[sumValue[N]](limit), limit: newLimiter[sumValue[N]](limit),
@@ -54,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
// newSum returns an aggregator that summarizes a set of measurements as their // newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in. // the measurements were made in.
func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *sum[N] { func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] {
return &sum[N]{ return &sum[N]{
valueMap: newValueMap[N](limit, r), valueMap: newValueMap[N](limit, r),
monotonic: monotonic, monotonic: monotonic,
@@ -143,7 +143,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
// newPrecomputedSum returns an aggregator that summarizes a set of // newPrecomputedSum returns an aggregator that summarizes a set of
// observations as their arithmetic sum. Each sum is scoped by attributes and // observations as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in. // the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *precomputedSum[N] { func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] {
return &precomputedSum[N]{ return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r), valueMap: newValueMap[N](limit, r),
monotonic: monotonic, monotonic: monotonic,

View File

@@ -7,7 +7,6 @@ import (
"context" "context"
"testing" "testing"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
) )
@@ -42,11 +41,9 @@ func TestSum(t *testing.T) {
func testDeltaSum[N int64 | float64]() func(t *testing.T) { func testDeltaSum[N int64 | float64]() func(t *testing.T) {
mono := false mono := false
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.Sum(mono) }.Sum(mono)
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -172,11 +169,9 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) {
func testCumulativeSum[N int64 | float64]() func(t *testing.T) { func testCumulativeSum[N int64 | float64]() func(t *testing.T) {
mono := false mono := false
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.Sum(mono) }.Sum(mono)
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -288,11 +283,9 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) {
func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) {
mono := false mono := false
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedSum(mono) }.PrecomputedSum(mono)
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -419,11 +412,9 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) {
func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) {
mono := false mono := false
in, out := Builder[N]{ in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr, Filter: attrFltr,
AggregationLimit: 3, AggregationLimit: 3,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedSum(mono) }.PrecomputedSum(mono)
ctx := context.Background() ctx := context.Background()
return test[N](in, out, []teststep[N]{ return test[N](in, out, []teststep[N]{
@@ -553,59 +544,43 @@ func BenchmarkSum(b *testing.B) {
// performance, therefore, only monotonic=false is benchmarked here. // performance, therefore, only monotonic=false is benchmarked here.
b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{ return Builder[int64]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.Sum(false) }.Sum(false)
})) }))
b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{ return Builder[int64]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.Sum(false) }.Sum(false)
})) }))
b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{ return Builder[float64]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.Sum(false) }.Sum(false)
})) }))
b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{ return Builder[float64]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.Sum(false) }.Sum(false)
})) }))
b.Run("Precomputed/Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { b.Run("Precomputed/Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{ return Builder[int64]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedSum(false) }.PrecomputedSum(false)
})) }))
b.Run("Precomputed/Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { b.Run("Precomputed/Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{ return Builder[int64]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedSum(false) }.PrecomputedSum(false)
})) }))
b.Run("Precomputed/Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { b.Run("Precomputed/Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{ return Builder[float64]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedSum(false) }.PrecomputedSum(false)
})) }))
b.Run("Precomputed/Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { b.Run("Precomputed/Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{ return Builder[float64]{
Temporality: metricdata.DeltaTemporality, Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.PrecomputedSum(false) }.PrecomputedSum(false)
})) }))
} }

View File

@@ -360,9 +360,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
normID := id.normalize() normID := id.normalize()
cv := i.aggregators.Lookup(normID, func() aggVal[N] { cv := i.aggregators.Lookup(normID, func() aggVal[N] {
b := aggregate.Builder[N]{ b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind), Temporality: i.pipeline.reader.temporality(kind),
ExemplarReservoirProvider: stream.ExemplarReservoirProviderSelector(stream.Aggregation), ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter),
ExemplarFilter: i.pipeline.exemplarFilter,
} }
b.Filter = stream.AttributeFilter b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation // A value less than or equal to zero will disable the aggregation