1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-08-10 22:31:50 +02:00

sdk/metric: Apply Cardinality Limits to Aggregations (#7081)

Fixes https://github.com/open-telemetry/opentelemetry-go/issues/6977
Towards https://github.com/open-telemetry/opentelemetry-go/issues/6887

## What

- Cardinality limits are enforced during aggregation.
- Exceeding the limit results in dropping excess data or other specified
behavior.
- Performance benchmarks confirm no significant degradation due to the
limit enforcement.

## Notes
Tests will be added in a separate PR, as mentioned in
https://github.com/open-telemetry/opentelemetry-go/issues/6978

---------

Co-authored-by: Damien Mathieu <42@dmathieu.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Yevhenii Solomchenko
2025-08-04 18:13:21 +02:00
committed by GitHub
parent a4d837f552
commit d5b5b05984
5 changed files with 49 additions and 37 deletions

View File

@@ -41,7 +41,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `RPCGRPCResponseMetadata`
- Add `ErrorType` attribute helper function to the `go.opentelmetry.io/otel/semconv/v1.34.0` package. (#6962)
- Add `WithAllowKeyDuplication` in `go.opentelemetry.io/otel/sdk/log` which can be used to disable deduplication for log records. (#6968)
- Add `WithCardinalityLimit` option to configure the cardinality limit in `go.opentelemetry.io/otel/sdk/metric`. (#6996, #7065)
- Add `WithCardinalityLimit` option to configure the cardinality limit in `go.opentelemetry.io/otel/sdk/metric`. (#6996, #7065, #7081)
- Add `Clone` method to `Record` in `go.opentelemetry.io/otel/log` that returns a copy of the record with no shared state. (#7001)
- The `go.opentelemetry.io/otel/semconv/v1.36.0` package.
The package contains semantic conventions from the `v1.36.0` version of the OpenTelemetry Semantic Conventions.

View File

@@ -17,7 +17,6 @@ import (
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
)
@@ -37,7 +36,13 @@ type instrumentSync struct {
compAgg aggregate.ComputeAggregation
}
func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline {
func newPipeline(
res *resource.Resource,
reader Reader,
views []View,
exemplarFilter exemplar.Filter,
cardinalityLimit int,
) *pipeline {
if res == nil {
res = resource.Empty()
}
@@ -48,6 +53,7 @@ func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFi
int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{},
float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{},
exemplarFilter: exemplarFilter,
cardinalityLimit: cardinalityLimit,
// aggregations is lazy allocated when needed.
}
}
@@ -71,6 +77,7 @@ type pipeline struct {
callbacks []func(context.Context) error
multiCallbacks list.List
exemplarFilter exemplar.Filter
cardinalityLimit int
}
// addInt64Measure adds a new int64 measure to the pipeline for each observer.
@@ -388,10 +395,9 @@ func (i *inserter[N]) cachedAggregator(
b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation
// limits for the builder (an all the created aggregates).
// CardinalityLimit.Lookup returns 0 by default if unset (or
// cardinalityLimit will be 0 by default if unset (or
// unrecognized input). Use that value directly.
b.AggregationLimit, _ = x.CardinalityLimit.Lookup()
b.AggregationLimit = i.pipeline.cardinalityLimit
in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
if err != nil {
return aggVal[N]{0, nil, err}
@@ -590,10 +596,16 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
// measurement.
type pipelines []*pipeline
func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
func newPipelines(
res *resource.Resource,
readers []Reader,
views []View,
exemplarFilter exemplar.Filter,
cardinalityLimit int,
) pipelines {
pipes := make([]*pipeline, 0, len(readers))
for _, r := range readers {
p := newPipeline(res, r, views, exemplarFilter)
p := newPipeline(res, r, views, exemplarFilter, cardinalityLimit)
r.register(p)
pipes = append(pipes, p)
}

View File

@@ -392,7 +392,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var c cache[string, instID]
p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter)
p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter, 0)
i := newInserter[N](p, &c)
readerAggregation := i.readerDefaultAggregation(tt.inst.Kind)
input, err := i.Instrument(tt.inst, readerAggregation)
@@ -414,7 +414,7 @@ func TestCreateAggregators(t *testing.T) {
func testInvalidInstrumentShouldPanic[N int64 | float64]() {
var c cache[string, instID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter), &c)
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter, 0), &c)
inst := Instrument{
Name: "foo",
Kind: InstrumentKind(255),
@@ -430,7 +430,7 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) {
func TestPipelinesAggregatorForEachReader(t *testing.T) {
r0, r1 := NewManualReader(), NewManualReader()
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter)
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter, 0)
require.Len(t, pipes, 2, "created pipelines")
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
@@ -504,7 +504,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter)
p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter, 0)
testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount)
testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount)
testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount)
@@ -558,7 +558,7 @@ func TestPipelineRegistryResource(t *testing.T) {
readers := []Reader{NewManualReader()}
views := []View{defaultView, v}
res := resource.NewSchemaless(attribute.String("key", "val"))
pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter)
pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter, 0)
for _, p := range pipes {
assert.True(t, res.Equal(p.resource), "resource not set")
}
@@ -571,7 +571,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
readers := []Reader{testRdrHistogram}
views := []View{defaultView}
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter)
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter, 0)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}
var vc cache[string, instID]
@@ -631,7 +631,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
fooInst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
barInst := Instrument{Name: "bar", Kind: InstrumentKindCounter}
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter)
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter, 0)
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)

View File

@@ -42,7 +42,7 @@ func testSumAggregateOutput(dest *metricdata.Aggregation) int {
}
func TestNewPipeline(t *testing.T) {
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0)
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
@@ -68,7 +68,7 @@ func TestNewPipeline(t *testing.T) {
func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter, 0)
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
@@ -77,7 +77,7 @@ func TestPipelineUsesResource(t *testing.T) {
}
func TestPipelineConcurrentSafe(t *testing.T) {
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0)
ctx := context.Background()
var output metricdata.ResourceMetrics
@@ -142,13 +142,13 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
}{
{
name: "NoView",
pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter),
pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter, 0),
},
{
name: "NoMatchingView",
pipe: newPipeline(nil, reader, []View{
NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}),
}, exemplar.AlwaysOffFilter),
}, exemplar.AlwaysOffFilter, 0),
},
}
@@ -233,7 +233,7 @@ func TestLogConflictName(t *testing.T) {
return instID{Name: tc.existing}
})
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0), &vc)
i.logConflict(instID{Name: tc.name})
if tc.conflict {
@@ -275,7 +275,7 @@ func TestLogConflictSuggestView(t *testing.T) {
var vc cache[string, instID]
name := strings.ToLower(orig.Name)
_ = vc.Lookup(name, func() instID { return orig })
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0), &vc)
viewSuggestion := func(inst instID, stream string) string {
return `"NewView(Instrument{` +
@@ -380,7 +380,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) {
}
var vc cache[string, instID]
pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter, 0)
i := newInserter[int64](pipe, &vc)
readerAggregation := i.readerDefaultAggregation(kind)
@@ -621,7 +621,7 @@ func TestPipelineWithMultipleReaders(t *testing.T) {
func TestPipelineProduceErrors(t *testing.T) {
// Create a test pipeline with aggregations
pipeReader := NewManualReader()
pipe := newPipeline(nil, pipeReader, nil, exemplar.AlwaysOffFilter)
pipe := newPipeline(nil, pipeReader, nil, exemplar.AlwaysOffFilter, 0)
// Set up an observable with callbacks
var testObsID observableID[int64]

View File

@@ -42,7 +42,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {
flush, sdown := conf.readerSignals()
mp := &MeterProvider{
pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter),
pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter, conf.cardinalityLimit),
forceFlush: flush,
shutdown: sdown,
}