diff --git a/CHANGELOG.md b/CHANGELOG.md index 96cb4d4bf..4c921778d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `RPCGRPCResponseMetadata` - Add `ErrorType` attribute helper function to the `go.opentelmetry.io/otel/semconv/v1.34.0` package. (#6962) - Add `WithAllowKeyDuplication` in `go.opentelemetry.io/otel/sdk/log` which can be used to disable deduplication for log records. (#6968) -- Add `WithCardinalityLimit` option to configure the cardinality limit in `go.opentelemetry.io/otel/sdk/metric`. (#6996, #7065) +- Add `WithCardinalityLimit` option to configure the cardinality limit in `go.opentelemetry.io/otel/sdk/metric`. (#6996, #7065, #7081) - Add `Clone` method to `Record` in `go.opentelemetry.io/otel/log` that returns a copy of the record with no shared state. (#7001) - The `go.opentelemetry.io/otel/semconv/v1.36.0` package. The package contains semantic conventions from the `v1.36.0` version of the OpenTelemetry Semantic Conventions. diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index d5042da65..3a256d92a 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -17,7 +17,6 @@ import ( "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" - "go.opentelemetry.io/otel/sdk/metric/internal/x" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" ) @@ -37,17 +36,24 @@ type instrumentSync struct { compAgg aggregate.ComputeAggregation } -func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline { +func newPipeline( + res *resource.Resource, + reader Reader, + views []View, + exemplarFilter exemplar.Filter, + cardinalityLimit int, +) *pipeline { if res == nil { res = resource.Empty() } return &pipeline{ - resource: res, - reader: reader, - views: views, - int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{}, - float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{}, - exemplarFilter: exemplarFilter, + resource: res, + reader: reader, + views: views, + int64Measures: map[observableID[int64]][]aggregate.Measure[int64]{}, + float64Measures: map[observableID[float64]][]aggregate.Measure[float64]{}, + exemplarFilter: exemplarFilter, + cardinalityLimit: cardinalityLimit, // aggregations is lazy allocated when needed. } } @@ -65,12 +71,13 @@ type pipeline struct { views []View sync.Mutex - int64Measures map[observableID[int64]][]aggregate.Measure[int64] - float64Measures map[observableID[float64]][]aggregate.Measure[float64] - aggregations map[instrumentation.Scope][]instrumentSync - callbacks []func(context.Context) error - multiCallbacks list.List - exemplarFilter exemplar.Filter + int64Measures map[observableID[int64]][]aggregate.Measure[int64] + float64Measures map[observableID[float64]][]aggregate.Measure[float64] + aggregations map[instrumentation.Scope][]instrumentSync + callbacks []func(context.Context) error + multiCallbacks list.List + exemplarFilter exemplar.Filter + cardinalityLimit int } // addInt64Measure adds a new int64 measure to the pipeline for each observer. @@ -388,10 +395,9 @@ func (i *inserter[N]) cachedAggregator( b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation // limits for the builder (an all the created aggregates). - // CardinalityLimit.Lookup returns 0 by default if unset (or + // cardinalityLimit will be 0 by default if unset (or // unrecognized input). Use that value directly. - b.AggregationLimit, _ = x.CardinalityLimit.Lookup() - + b.AggregationLimit = i.pipeline.cardinalityLimit in, out, err := i.aggregateFunc(b, stream.Aggregation, kind) if err != nil { return aggVal[N]{0, nil, err} @@ -590,10 +596,16 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error { // measurement. type pipelines []*pipeline -func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines { +func newPipelines( + res *resource.Resource, + readers []Reader, + views []View, + exemplarFilter exemplar.Filter, + cardinalityLimit int, +) pipelines { pipes := make([]*pipeline, 0, len(readers)) for _, r := range readers { - p := newPipeline(res, r, views, exemplarFilter) + p := newPipeline(res, r, views, exemplarFilter, cardinalityLimit) r.register(p) pipes = append(pipes, p) } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 3d10654c9..175cd0a80 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -392,7 +392,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { var c cache[string, instID] - p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter) + p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter, 0) i := newInserter[N](p, &c) readerAggregation := i.readerDefaultAggregation(tt.inst.Kind) input, err := i.Instrument(tt.inst, readerAggregation) @@ -414,7 +414,7 @@ func TestCreateAggregators(t *testing.T) { func testInvalidInstrumentShouldPanic[N int64 | float64]() { var c cache[string, instID] - i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter), &c) + i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter, 0), &c) inst := Instrument{ Name: "foo", Kind: InstrumentKind(255), @@ -430,7 +430,7 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) { func TestPipelinesAggregatorForEachReader(t *testing.T) { r0, r1 := NewManualReader(), NewManualReader() - pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter) + pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter, 0) require.Len(t, pipes, 2, "created pipelines") inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} @@ -504,7 +504,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter) + p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter, 0) testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount) testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount) testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount) @@ -558,7 +558,7 @@ func TestPipelineRegistryResource(t *testing.T) { readers := []Reader{NewManualReader()} views := []View{defaultView, v} res := resource.NewSchemaless(attribute.String("key", "val")) - pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter) + pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter, 0) for _, p := range pipes { assert.True(t, res.Equal(p.resource), "resource not set") } @@ -571,7 +571,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { readers := []Reader{testRdrHistogram} views := []View{defaultView} - p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter) + p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter, 0) inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge} var vc cache[string, instID] @@ -631,7 +631,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { fooInst := Instrument{Name: "foo", Kind: InstrumentKindCounter} barInst := Instrument{Name: "bar", Kind: InstrumentKindCounter} - p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter) + p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter, 0) var vc cache[string, instID] ri := newResolver[int64](p, &vc) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index b601e2242..492626160 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -42,7 +42,7 @@ func testSumAggregateOutput(dest *metricdata.Aggregation) int { } func TestNewPipeline(t *testing.T) { - pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter) + pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0) output := metricdata.ResourceMetrics{} err := pipe.produce(context.Background(), &output) @@ -68,7 +68,7 @@ func TestNewPipeline(t *testing.T) { func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) - pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter) + pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter, 0) output := metricdata.ResourceMetrics{} err := pipe.produce(context.Background(), &output) @@ -77,7 +77,7 @@ func TestPipelineUsesResource(t *testing.T) { } func TestPipelineConcurrentSafe(t *testing.T) { - pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter) + pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0) ctx := context.Background() var output metricdata.ResourceMetrics @@ -142,13 +142,13 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { }{ { name: "NoView", - pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter), + pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter, 0), }, { name: "NoMatchingView", pipe: newPipeline(nil, reader, []View{ NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}), - }, exemplar.AlwaysOffFilter), + }, exemplar.AlwaysOffFilter, 0), }, } @@ -233,7 +233,7 @@ func TestLogConflictName(t *testing.T) { return instID{Name: tc.existing} }) - i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc) + i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0), &vc) i.logConflict(instID{Name: tc.name}) if tc.conflict { @@ -275,7 +275,7 @@ func TestLogConflictSuggestView(t *testing.T) { var vc cache[string, instID] name := strings.ToLower(orig.Name) _ = vc.Lookup(name, func() instID { return orig }) - i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc) + i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter, 0), &vc) viewSuggestion := func(inst instID, stream string) string { return `"NewView(Instrument{` + @@ -380,7 +380,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) { } var vc cache[string, instID] - pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter) + pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter, 0) i := newInserter[int64](pipe, &vc) readerAggregation := i.readerDefaultAggregation(kind) @@ -621,7 +621,7 @@ func TestPipelineWithMultipleReaders(t *testing.T) { func TestPipelineProduceErrors(t *testing.T) { // Create a test pipeline with aggregations pipeReader := NewManualReader() - pipe := newPipeline(nil, pipeReader, nil, exemplar.AlwaysOffFilter) + pipe := newPipeline(nil, pipeReader, nil, exemplar.AlwaysOffFilter, 0) // Set up an observable with callbacks var testObsID observableID[int64] diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index 2fca89e5a..b0a6ec580 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -42,7 +42,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { flush, sdown := conf.readerSignals() mp := &MeterProvider{ - pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter), + pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter, conf.cardinalityLimit), forceFlush: flush, shutdown: sdown, }