diff --git a/CHANGELOG.md b/CHANGELOG.md index 8635a86f7..5cab19a25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `go.opentelemetry.io/otel/semconv/v1.24.0` package. The package contains semantic conventions from the `v1.24.0` version of the OpenTelemetry Semantic Conventions. (#4770) - Add `WithResourceAsConstantLabels` option to apply resource attributes for every metric emitted by the Prometheus exporter. (#4733) +- Experimental cardinality limiting is added to the metric SDK. + See [metric documentation](./sdk/metric/EXPERIMENTAL.md#cardinality-limit) for more information about this feature and how to enable it. (#4457) ### Changed diff --git a/sdk/metric/EXPERIMENTAL.md b/sdk/metric/EXPERIMENTAL.md new file mode 100644 index 000000000..d2a5a0470 --- /dev/null +++ b/sdk/metric/EXPERIMENTAL.md @@ -0,0 +1,50 @@ +# Experimental Features + +The metric SDK contains features that have not yet stabilized in the OpenTelemetry specification. +These features are added to the OpenTelemetry Go metric SDK prior to stabilization in the specification so that users can start experimenting with them and provide feedback. + +These feature may change in backwards incompatible ways as feedback is applied. +See the [Compatibility and Stability](#compatibility-and-stability) section for more information. + +## Features + +- [Cardinality Limit](#cardinality-limit) + +### Cardinality Limit + +The cardinality limit is the hard limit on the number of metric streams that can be collected for a single instrument. + +This experimental feature can be enabled by setting the `OTEL_GO_X_CARDINALITY_LIMIT` environment value. +The value must be an integer value. +All other values are ignored. + +If the value set is less than or equal to `0`, no limit will be applied. + +#### Examples + +Set the cardinality limit to 2000. + +```console +export OTEL_GO_X_CARDINALITY_LIMIT=2000 +``` + +Set an infinite cardinality limit (functionally equivalent to disabling the feature). + +```console +export OTEL_GO_X_CARDINALITY_LIMIT=-1 +``` + +Disable the cardinality limit. + +```console +unset OTEL_GO_X_CARDINALITY_LIMIT +``` + +## Compatibility and Stability + +Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../VERSIONING.md). +These features may be removed or modified in successive version releases, including patch versions. + +When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release. +There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version. +If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support. diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 8dec14237..c61f85137 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -44,6 +44,14 @@ type Builder[N int64 | float64] struct { // Filter is the attribute filter the aggregate function will use on the // input of measurements. Filter attribute.Filter + // AggregationLimit is the cardinality limit of measurement attributes. Any + // measurement for new attributes once the limit has been reached will be + // aggregated into a single aggregate for the "otel.metric.overflow" + // attribute. + // + // If AggregationLimit is less than or equal to zero there will not be an + // aggregation limit imposed (i.e. unlimited attribute sets). + AggregationLimit int } func (b Builder[N]) filter(f Measure[N]) Measure[N] { @@ -63,7 +71,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] { func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // Delta temporality is the only temporality that makes semantic sense for // a last-value aggregate. - lv := newLastValue[N]() + lv := newLastValue[N](b.AggregationLimit) return b.filter(lv.measure), func(dest *metricdata.Aggregation) int { // Ignore if dest is not a metricdata.Gauge. The chance for memory @@ -79,7 +87,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { // PrecomputedSum returns a sum aggregate function input and output. The // arguments passed to the input are expected to be the precomputed sum values. func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) { - s := newPrecomputedSum[N](monotonic) + s := newPrecomputedSum[N](monotonic, b.AggregationLimit) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta @@ -90,7 +98,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati // Sum returns a sum aggregate function input and output. func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { - s := newSum[N](monotonic) + s := newSum[N](monotonic, b.AggregationLimit) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(s.measure), s.delta @@ -102,7 +110,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { // ExplicitBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { - h := newHistogram[N](boundaries, noMinMax, noSum) + h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta @@ -114,7 +122,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu // ExponentialBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) { - h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum) + h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit) switch b.Temporality { case metricdata.DeltaTemporality: return b.filter(h.measure), h.delta diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 79031b402..384ca51c8 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -31,11 +31,15 @@ var ( keyUser = "user" userAlice = attribute.String(keyUser, "Alice") userBob = attribute.String(keyUser, "Bob") + userCarol = attribute.String(keyUser, "Carol") + userDave = attribute.String(keyUser, "Dave") adminTrue = attribute.Bool("admin", true) adminFalse = attribute.Bool("admin", false) alice = attribute.NewSet(userAlice, adminTrue) bob = attribute.NewSet(userBob, adminFalse) + carol = attribute.NewSet(userCarol, adminFalse) + dave = attribute.NewSet(userDave, adminFalse) // Filtered. attrFltr = func(kv attribute.KeyValue) bool { diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index 98b7dc1e0..e9c25980a 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -288,13 +288,14 @@ func (b *expoBuckets) downscale(delta int) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, maxSize: int(maxSize), maxScale: int(maxScale), + limit: newLimiter[*expoHistogramDataPoint[N]](limit), values: make(map[attribute.Set]*expoHistogramDataPoint[N]), start: now(), @@ -309,6 +310,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int + limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Set]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -324,6 +326,7 @@ func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Se e.valuesMu.Lock() defer e.valuesMu.Unlock() + attr = e.limit.Attributes(attr, e.values) v, ok := e.values[attr] if !ok { v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index 44a8328bb..40af40263 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -183,7 +183,7 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[int64](4, 20, false, false) + h := newExponentialHistogram[int64](4, 20, false, false, 0) for _, v := range tt.values { h.measure(context.Background(), v, alice) } @@ -225,7 +225,7 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) { restore := withHandler(t) defer restore() - h := newExponentialHistogram[float64](4, 20, false, false) + h := newExponentialHistogram[float64](4, 20, false, false, 0) for _, v := range tt.values { h.measure(context.Background(), v, alice) } @@ -747,8 +747,9 @@ func TestExponentialHistogramAggregation(t *testing.T) { func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 2, }.ExponentialBucketHistogram(4, 20, false, false) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -805,13 +806,67 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 4, alice}, + {ctx, 4, alice}, + {ctx, 4, alice}, + {ctx, 2, alice}, + {ctx, 16, alice}, + {ctx, 1, alice}, + // These will exceed the cardinality limit. + {ctx, 4, bob}, + {ctx, 4, bob}, + {ctx, 4, bob}, + {ctx, 2, carol}, + {ctx, 16, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 2, + agg: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + }, + }, + }, + }, }) } func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 2, }.ExponentialBucketHistogram(4, 20, false, false) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -911,6 +966,53 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + // These will exceed the cardinality limit. + {ctx, 4, bob}, + {ctx, 4, bob}, + {ctx, 4, bob}, + {ctx, 2, carol}, + {ctx, 16, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 2, + agg: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Count: 9, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 44, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 6, 2}, + }, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + }, + }, + }, + }, }) } diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 62ec51e1f..5d886360b 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -54,11 +54,12 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 + limit limiter[*buckets[N]] values map[attribute.Set]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -69,6 +70,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[ return &histValues[N]{ noSum: noSum, bounds: b, + limit: newLimiter[*buckets[N]](limit), values: make(map[attribute.Set]*buckets[N]), } } @@ -86,6 +88,7 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) s.valuesMu.Lock() defer s.valuesMu.Unlock() + attr = s.limit.Attributes(attr, s.values) b, ok := s.values[attr] if !ok { // N+1 buckets. For example: @@ -108,9 +111,9 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int) *histogram[N] { return &histogram[N]{ - histValues: newHistValues[N](boundaries, noSum), + histValues: newHistValues[N](boundaries, noSum, limit), noMinMax: noMinMax, start: now(), } diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index ab44607e5..e51e9fc8f 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -53,8 +53,9 @@ type conf[N int64 | float64] struct { func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -114,13 +115,34 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{ + c.hPt(fltrAlice, 1, 1), + c.hPt(fltrBob, 1, 1), + c.hPt(overflowSet, 1, 2), + }, + }, + }, + }, }) } func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -182,6 +204,24 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{ + c.hPt(fltrAlice, 2, 4), + c.hPt(fltrBob, 10, 3), + c.hPt(overflowSet, 1, 2), + }, + }, + }, + }, }) } @@ -273,7 +313,7 @@ func TestHistogramImmutableBounds(t *testing.T) { cpB := make([]float64, len(b)) copy(cpB, b) - h := newHistogram[int64](b, false, false) + h := newHistogram[int64](b, false, false, 0) require.Equal(t, cpB, h.bounds) b[0] = 10 @@ -289,7 +329,7 @@ func TestHistogramImmutableBounds(t *testing.T) { } func TestCumulativeHistogramImutableCounts(t *testing.T) { - h := newHistogram[int64](bounds, noMinMax, false) + h := newHistogram[int64](bounds, noMinMax, false, 0) h.measure(context.Background(), 5, alice) var data metricdata.Aggregation = metricdata.Histogram[int64]{} @@ -307,7 +347,7 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) { func TestDeltaHistogramReset(t *testing.T) { t.Cleanup(mockTime(now)) - h := newHistogram[int64](bounds, noMinMax, false) + h := newHistogram[int64](bounds, noMinMax, false, 0) var data metricdata.Aggregation = metricdata.Histogram[int64]{} require.Equal(t, 0, h.delta(&data)) diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 6af2d6061..b79e80a0c 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -29,20 +29,25 @@ type datapoint[N int64 | float64] struct { value N } -func newLastValue[N int64 | float64]() *lastValue[N] { - return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])} +func newLastValue[N int64 | float64](limit int) *lastValue[N] { + return &lastValue[N]{ + limit: newLimiter[datapoint[N]](limit), + values: make(map[attribute.Set]datapoint[N]), + } } // lastValue summarizes a set of measurements as the last one made. type lastValue[N int64 | float64] struct { sync.Mutex + limit limiter[datapoint[N]] values map[attribute.Set]datapoint[N] } func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) { d := datapoint[N]{timestamp: now(), value: value} s.Lock() + attr = s.limit.Attributes(attr, s.values) s.values[attr] = d s.Unlock() } diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index c758eb370..479232ad4 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -29,7 +29,10 @@ func TestLastValue(t *testing.T) { } func testLastValue[N int64 | float64]() func(*testing.T) { - in, out := Builder[N]{Filter: attrFltr}.LastValue() + in, out := Builder[N]{ + Filter: attrFltr, + AggregationLimit: 3, + }.LastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ { @@ -87,6 +90,36 @@ func testLastValue[N int64 | float64]() func(*testing.T) { }, }, }, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + Time: staticTime, + Value: 1, + }, + }, + }, + }, }, }) } diff --git a/sdk/metric/internal/aggregate/limit.go b/sdk/metric/internal/aggregate/limit.go new file mode 100644 index 000000000..d3de84272 --- /dev/null +++ b/sdk/metric/internal/aggregate/limit.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import "go.opentelemetry.io/otel/attribute" + +// overflowSet is the attribute set used to record a measurement when adding +// another distinct attribute set to the aggregate would exceed the aggregate +// limit. +var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true)) + +// limiter limits aggregate values. +type limiter[V any] struct { + // aggLimit is the maximum number of metric streams that can be aggregated. + // + // Any metric stream with attributes distinct from any set already + // aggregated once the aggLimit will be meet will instead be aggregated + // into an "overflow" metric stream. That stream will only contain the + // "otel.metric.overflow"=true attribute. + aggLimit int +} + +// newLimiter returns a new Limiter with the provided aggregation limit. +func newLimiter[V any](aggregation int) limiter[V] { + return limiter[V]{aggLimit: aggregation} +} + +// Attributes checks if adding a measurement for attrs will exceed the +// aggregation cardinality limit for the existing measurements. If it will, +// overflowSet is returned. Otherwise, if it will not exceed the limit, or the +// limit is not set (limit <= 0), attr is returned. +func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Set]V) attribute.Set { + if l.aggLimit > 0 { + _, exists := measurements[attrs] + if !exists && len(measurements) >= l.aggLimit-1 { + return overflowSet + } + } + + return attrs +} diff --git a/sdk/metric/internal/aggregate/limit_test.go b/sdk/metric/internal/aggregate/limit_test.go new file mode 100644 index 000000000..cd524d339 --- /dev/null +++ b/sdk/metric/internal/aggregate/limit_test.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/attribute" +) + +func TestLimiterAttributes(t *testing.T) { + m := map[attribute.Set]struct{}{alice: {}} + t.Run("NoLimit", func(t *testing.T) { + l := newLimiter[struct{}](0) + assert.Equal(t, alice, l.Attributes(alice, m)) + assert.Equal(t, bob, l.Attributes(bob, m)) + }) + + t.Run("NotAtLimit/Exists", func(t *testing.T) { + l := newLimiter[struct{}](3) + assert.Equal(t, alice, l.Attributes(alice, m)) + }) + + t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) { + l := newLimiter[struct{}](3) + assert.Equal(t, bob, l.Attributes(bob, m)) + }) + + t.Run("AtLimit/Exists", func(t *testing.T) { + l := newLimiter[struct{}](2) + assert.Equal(t, alice, l.Attributes(alice, m)) + }) + + t.Run("AtLimit/DoesNotExist", func(t *testing.T) { + l := newLimiter[struct{}](2) + assert.Equal(t, overflowSet, l.Attributes(bob, m)) + }) +} + +var limitedAttr attribute.Set + +func BenchmarkLimiterAttributes(b *testing.B) { + m := map[attribute.Set]struct{}{alice: {}} + l := newLimiter[struct{}](2) + + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + limitedAttr = l.Attributes(alice, m) + limitedAttr = l.Attributes(bob, m) + } +} diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 1e52ff0d1..a0d26e1dd 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -26,15 +26,20 @@ import ( // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex + limit limiter[N] values map[attribute.Set]N } -func newValueMap[N int64 | float64]() *valueMap[N] { - return &valueMap[N]{values: make(map[attribute.Set]N)} +func newValueMap[N int64 | float64](limit int) *valueMap[N] { + return &valueMap[N]{ + limit: newLimiter[N](limit), + values: make(map[attribute.Set]N), + } } func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { s.Lock() + attr = s.limit.Attributes(attr, s.values) s.values[attr] += value s.Unlock() } @@ -42,9 +47,9 @@ func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) { // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int) *sum[N] { return &sum[N]{ - valueMap: newValueMap[N](), + valueMap: newValueMap[N](limit), monotonic: monotonic, start: now(), } @@ -129,9 +134,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observatrions as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int) *precomputedSum[N] { return &precomputedSum[N]{ - valueMap: newValueMap[N](), + valueMap: newValueMap[N](limit), monotonic: monotonic, start: now(), } diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index 3ac675a09..b169fcad4 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -40,8 +40,9 @@ func TestSum(t *testing.T) { func testDeltaSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -125,14 +126,51 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + }, + }, + }, + }, }) } func testCumulativeSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -204,14 +242,49 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 14, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -8, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + }, + }, + }, + }, }) } func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -296,14 +369,51 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + }, + }, + }, + }, }) } func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -388,6 +498,42 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { }, }, }, + { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Sum[N]{ + IsMonotonic: mono, + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + }, + }, + }, + }, }) } diff --git a/sdk/metric/internal/x/x.go b/sdk/metric/internal/x/x.go index 289139572..541160f94 100644 --- a/sdk/metric/internal/x/x.go +++ b/sdk/metric/internal/x/x.go @@ -43,6 +43,9 @@ var ( // // To enable this feature set the OTEL_GO_X_CARDINALITY_LIMIT environment // variable to the integer limit value you want to use. + // + // Setting OTEL_GO_X_CARDINALITY_LIMIT to a value less than or equal to 0 + // will disable the cardinality limits. CardinalityLimit = newFeature("CARDINALITY_LIMIT", func(v string) (int, bool) { n, err := strconv.Atoi(v) if err != nil { diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 75e3af49b..47d9fe07a 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -29,6 +29,7 @@ import ( "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + "go.opentelemetry.io/otel/sdk/metric/internal/x" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" ) @@ -361,6 +362,12 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum Temporality: i.pipeline.reader.temporality(kind), } b.Filter = stream.AttributeFilter + // A value less than or equal to zero will disable the aggregation + // limits for the builder (an all the created aggregates). + // CardinalityLimit.Lookup returns 0 by default if unset (or + // unrecognized input). Use that value directly. + b.AggregationLimit, _ = x.CardinalityLimit.Lookup() + in, out, err := i.aggregateFunc(b, stream.Aggregation, kind) if err != nil { return aggVal[N]{0, nil, err}