1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

sdk/metric: Support specifying cardinality limits per instrument kinds (#7855)

Previously, we only had `WithCardinalityLimit()`, which adds a global
cardinality limit.

This PR adds a new API on the reader `WithCardinalityLimitSelector` that
can be used to specify limits per instrument kinds.


[spec](https://github.com/open-telemetry/opentelemetry-specification/blob/49845849d2d8df07059f82033f39e96c561927cf/specification/metrics/sdk.md?plain=1#L1282)


[schema](https://github.com/open-telemetry/opentelemetry-configuration/blob/3dbebe292912f0c0c96ce5dcfefc45dfe5e20f39/snippets/CardinalityLimits_kitchen_sink.yaml#L11-L18)

closes #7786

---------

Co-authored-by: David Ashpole <dashpole@google.com>
Co-authored-by: Damien Mathieu <42@dmathieu.com>
This commit is contained in:
Peter Nguyen
2026-03-20 03:24:19 -07:00
committed by GitHub
parent e604da5eec
commit 015ed1a7a4
8 changed files with 357 additions and 31 deletions
+1
View File
@@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Support attributes with empty value (`attribute.EMPTY`) in `go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest`. (#8038)
- Add support for per-series start time tracking for cumulative metrics in `go.opentelemetry.io/otel/sdk/metric`.
Set `OTEL_GO_X_PER_SERIES_START_TIMESTAMPS=true` to enable. (#8060)
- Add `WithCardinalityLimitSelector` for metric reader for configuring cardinality limits specific to the instrument kind. (#7855)
### Changed
+3 -1
View File
@@ -160,12 +160,14 @@ func WithExemplarFilter(filter exemplar.Filter) Option {
})
}
// WithCardinalityLimit sets the cardinality limit for the MeterProvider.
// WithCardinalityLimit sets the global cardinality limit for the MeterProvider.
//
// The cardinality limit is the hard limit on the number of metric datapoints
// that can be collected for a single instrument in a single collect cycle.
//
// Setting this to a zero or negative value means no limit is applied.
// This value applies to all instrument kinds, but can be overridden per kind by
// the reader's cardinality limit selector (see [WithCardinalityLimitSelector]).
func WithCardinalityLimit(limit int) Option {
// For backward compatibility, the environment variable `OTEL_GO_X_CARDINALITY_LIMIT`
// can also be used to set this value.
+15 -7
View File
@@ -20,13 +20,14 @@ import (
)
type reader struct {
producer sdkProducer
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context, *metricdata.ResourceMetrics) error
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
producer sdkProducer
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
cardinalityLimitSelector CardinalityLimitSelector
collectFunc func(context.Context, *metricdata.ResourceMetrics) error
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
const envVarResourceAttributes = "OTEL_RESOURCE_ATTRIBUTES"
@@ -45,6 +46,13 @@ func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
func (r *reader) cardinalityLimit(kind InstrumentKind) (int, bool) {
if r.cardinalityLimitSelector != nil {
return r.cardinalityLimitSelector(kind)
}
return 0, true
}
func (r *reader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
return r.collectFunc(ctx, rm)
}
+18 -9
View File
@@ -32,8 +32,9 @@ type ManualReader struct {
isShutdown bool
externalProducers atomic.Value
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
cardinalityLimitSelector CardinalityLimitSelector
inst *observ.Instrumentation
}
@@ -45,8 +46,9 @@ var _ = map[Reader]struct{}{&ManualReader{}: {}}
func NewManualReader(opts ...ManualReaderOption) *ManualReader {
cfg := newManualReaderConfig(opts)
r := &ManualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
cardinalityLimitSelector: cfg.cardinalityLimitSelector,
}
r.externalProducers.Store(cfg.producers)
@@ -89,6 +91,11 @@ func (mr *ManualReader) aggregation(
return mr.aggregationSelector(kind)
}
// cardinalityLimit returns the cardinality limit for kind.
func (mr *ManualReader) cardinalityLimit(kind InstrumentKind) (int, bool) {
return mr.cardinalityLimitSelector(kind)
}
// Shutdown closes any connections and frees any resources used by the reader.
//
// This method is safe to call concurrently.
@@ -179,16 +186,18 @@ func (r *ManualReader) MarshalLog() any {
// manualReaderConfig contains configuration options for a ManualReader.
type manualReaderConfig struct {
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
producers []Producer
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
cardinalityLimitSelector CardinalityLimitSelector
producers []Producer
}
// newManualReaderConfig returns a manualReaderConfig configured with options.
func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig {
cfg := manualReaderConfig{
temporalitySelector: DefaultTemporalitySelector,
aggregationSelector: DefaultAggregationSelector,
temporalitySelector: DefaultTemporalitySelector,
aggregationSelector: DefaultAggregationSelector,
cardinalityLimitSelector: defaultCardinalityLimitSelector,
}
for _, opt := range opts {
cfg = opt.applyManual(cfg)
+21 -11
View File
@@ -26,17 +26,19 @@ const (
// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
producers []Producer
interval time.Duration
timeout time.Duration
producers []Producer
cardinalityLimitSelector CardinalityLimitSelector
}
// newPeriodicReaderConfig returns a periodicReaderConfig configured with
// options.
func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
c := periodicReaderConfig{
interval: envDuration(envInterval, defaultInterval),
timeout: envDuration(envTimeout, defaultTimeout),
interval: envDuration(envInterval, defaultInterval),
timeout: envDuration(envTimeout, defaultTimeout),
cardinalityLimitSelector: defaultCardinalityLimitSelector,
}
for _, o := range options {
c = o.applyPeriodic(c)
@@ -111,12 +113,13 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri
context.Background(),
)
r := &PeriodicReader{
interval: conf.interval,
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
interval: conf.interval,
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
cardinalityLimitSelector: conf.cardinalityLimitSelector,
rmPool: sync.Pool{
New: func() any {
return &metricdata.ResourceMetrics{}
@@ -170,6 +173,8 @@ type PeriodicReader struct {
rmPool sync.Pool
cardinalityLimitSelector CardinalityLimitSelector
inst *observ.Instrumentation
}
@@ -222,6 +227,11 @@ func (r *PeriodicReader) aggregation(
return r.exporter.Aggregation(kind)
}
// cardinalityLimit returns the cardinality limit for kind.
func (r *PeriodicReader) cardinalityLimit(kind InstrumentKind) (int, bool) {
return r.cardinalityLimitSelector(kind)
}
// collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter.
func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
+13 -3
View File
@@ -395,9 +395,7 @@ func (i *inserter[N]) cachedAggregator(
b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation
// limits for the builder (an all the created aggregates).
// cardinalityLimit will be 0 by default if unset (or
// unrecognized input). Use that value directly.
b.AggregationLimit = i.pipeline.cardinalityLimit
b.AggregationLimit = i.getCardinalityLimit(kind)
in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
if err != nil {
return aggVal[N]{0, nil, err}
@@ -419,6 +417,18 @@ func (i *inserter[N]) cachedAggregator(
return cv.Measure, cv.ID, cv.Err
}
// getCardinalityLimit returns the cardinality limit for the given instrument kind.
// When the reader's selector returns fallback = true, the pipeline's global
// limit is used, then the default if global is unset. When fallback is false,
// the selector's limit is used (0 or less means unlimited).
func (i *inserter[N]) getCardinalityLimit(kind InstrumentKind) int {
limit, fallback := i.pipeline.reader.cardinalityLimit(kind)
if fallback {
return i.pipeline.cardinalityLimit
}
return limit
}
// logConflict validates if an instrument with the same case-insensitive name
// as id has already been created. If that instrument conflicts with id, a
// warning is logged.
+228
View File
@@ -244,3 +244,231 @@ func TestMeterProviderCardinalityLimit(t *testing.T) {
})
}
}
func TestMeterProviderPerInstrumentCardinalityLimits(t *testing.T) {
const uniqueAttributesCount = 10
type metricCase struct {
name string
selector CardinalityLimitSelector
globalLimit int
build func(t *testing.T, meter api.Meter)
wantPoints int
}
testCases := []metricCase{
{
name: "counter uses counter-specific limit",
selector: func(kind InstrumentKind) (int, bool) {
if kind == InstrumentKindCounter {
return 3, false
}
return 0, true
},
globalLimit: 8,
build: func(t *testing.T, meter api.Meter) {
counter, err := meter.Int64Counter("counter-metric")
require.NoError(t, err)
for i := range uniqueAttributesCount {
counter.Add(t.Context(), 1, api.WithAttributes(attribute.Int("key", i)))
}
},
wantPoints: 3,
},
{
name: "histogram uses histogram-specific limit",
selector: func(kind InstrumentKind) (int, bool) {
if kind == InstrumentKindHistogram {
return 4, false
}
return 0, true
},
globalLimit: 8,
build: func(t *testing.T, meter api.Meter) {
histogram, err := meter.Int64Histogram("histogram-metric")
require.NoError(t, err)
for i := range uniqueAttributesCount {
histogram.Record(t.Context(), int64(i), api.WithAttributes(attribute.Int("key", i)))
}
},
wantPoints: 4,
},
{
name: "gauge uses gauge-specific limit",
selector: func(kind InstrumentKind) (int, bool) {
if kind == InstrumentKindGauge {
return 5, false
}
return 0, true
},
globalLimit: 8,
build: func(t *testing.T, meter api.Meter) {
gauge, err := meter.Int64Gauge("gauge-metric")
require.NoError(t, err)
for i := range uniqueAttributesCount {
gauge.Record(t.Context(), int64(i), api.WithAttributes(attribute.Int("key", i)))
}
},
wantPoints: 5,
},
{
name: "up down counter uses updowncounter-specific limit",
selector: func(kind InstrumentKind) (int, bool) {
if kind == InstrumentKindUpDownCounter {
return 2, false
}
return 0, true
},
globalLimit: 8,
build: func(t *testing.T, meter api.Meter) {
upDownCounter, err := meter.Int64UpDownCounter("updowncounter-metric")
require.NoError(t, err)
for i := range uniqueAttributesCount {
upDownCounter.Add(t.Context(), 1, api.WithAttributes(attribute.Int("key", i)))
}
},
wantPoints: 2,
},
{
name: "observable counter uses observable-counter-specific limit",
selector: func(kind InstrumentKind) (int, bool) {
if kind == InstrumentKindObservableCounter {
return 4, false
}
return 0, true
},
globalLimit: 8,
build: func(t *testing.T, meter api.Meter) {
obs, err := meter.Int64ObservableCounter(
"observable-counter-metric",
api.WithInt64Callback(func(_ context.Context, o api.Int64Observer) error {
for i := range uniqueAttributesCount {
o.Observe(int64(i), api.WithAttributes(attribute.Int("key", i)))
}
return nil
}),
)
require.NoError(t, err)
require.NotNil(t, obs)
},
wantPoints: 4,
},
{
name: "observable gauge uses observable-gauge-specific limit",
selector: func(kind InstrumentKind) (int, bool) {
if kind == InstrumentKindObservableGauge {
return 5, false
}
return 0, true
},
globalLimit: 8,
build: func(t *testing.T, meter api.Meter) {
obs, err := meter.Int64ObservableGauge(
"observable-gauge-metric",
api.WithInt64Callback(func(_ context.Context, o api.Int64Observer) error {
for i := range uniqueAttributesCount {
o.Observe(int64(i), api.WithAttributes(attribute.Int("key", i)))
}
return nil
}),
)
require.NoError(t, err)
require.NotNil(t, obs)
},
wantPoints: 5,
},
{
name: "observable up down counter uses limit",
selector: func(kind InstrumentKind) (int, bool) {
if kind == InstrumentKindObservableUpDownCounter {
return 3, false
}
return 0, true
},
globalLimit: 8,
build: func(t *testing.T, meter api.Meter) {
obs, err := meter.Int64ObservableUpDownCounter(
"observable-updowncounter-metric",
api.WithInt64Callback(func(_ context.Context, o api.Int64Observer) error {
for i := range uniqueAttributesCount {
o.Observe(int64(i), api.WithAttributes(attribute.Int("key", i)))
}
return nil
}),
)
require.NoError(t, err)
require.NotNil(t, obs)
},
wantPoints: 3,
},
{
name: "instrument without specific limit falls back to global limit",
selector: func(kind InstrumentKind) (int, bool) {
if kind == InstrumentKindCounter {
return 3, false
}
return 0, true // fall back to global limit for other kinds
},
globalLimit: 6,
build: func(t *testing.T, meter api.Meter) {
histogram, err := meter.Int64Histogram("histogram-metric")
require.NoError(t, err)
for i := range uniqueAttributesCount {
histogram.Record(t.Context(), int64(i), api.WithAttributes(attribute.Int("key", i)))
}
},
wantPoints: 6,
},
{
name: "selector can set specific kind to unlimited while global limit is nonzero (limited)",
selector: func(kind InstrumentKind) (int, bool) {
if kind == InstrumentKindCounter {
return 0, false // unlimited for counter only
}
return 0, true // fallback to global limit
},
globalLimit: 3,
build: func(t *testing.T, meter api.Meter) {
counter, err := meter.Int64Counter("counter-metric")
require.NoError(t, err)
for i := range uniqueAttributesCount {
counter.Add(t.Context(), 1, api.WithAttributes(attribute.Int("key", i)))
}
},
wantPoints: uniqueAttributesCount,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
reader := NewManualReader(
WithCardinalityLimitSelector(tc.selector),
)
mp := NewMeterProvider(
WithReader(reader),
WithCardinalityLimit(tc.globalLimit),
)
meter := mp.Meter("test-meter")
tc.build(t, meter)
var rm metricdata.ResourceMetrics
err := reader.Collect(t.Context(), &rm)
require.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
switch data := rm.ScopeMetrics[0].Metrics[0].Data.(type) {
case metricdata.Sum[int64]:
assert.Len(t, data.DataPoints, tc.wantPoints, tc.name)
case metricdata.Histogram[int64]:
assert.Len(t, data.DataPoints, tc.wantPoints, tc.name)
case metricdata.Gauge[int64]:
assert.Len(t, data.DataPoints, tc.wantPoints, tc.name)
default:
t.Fatalf("unexpected data type %T", data)
}
})
}
}
+58
View File
@@ -59,6 +59,15 @@ type Reader interface {
// Reader methods.
aggregation(InstrumentKind) Aggregation // nolint:revive // import-shadow for method scoped by type.
// cardinalityLimit returns the cardinality limit for an instrument kind.
// When fallback is true, the pipeline falls back to the provider's global limit.
// When fallback is false, limit is used: 0 or less means no limit (unlimited),
// and a positive value is the limit for that kind.
//
// This method needs to be concurrent safe with itself and all the other
// Reader methods.
cardinalityLimit(InstrumentKind) (limit int, fallback bool)
// Collect gathers and returns all metric data related to the Reader from
// the SDK and stores it in rm. An error is returned if this is called
// after Shutdown or if rm is nil.
@@ -192,6 +201,25 @@ func DefaultAggregationSelector(ik InstrumentKind) Aggregation {
panic("unknown instrument kind")
}
// CardinalityLimitSelector selects the cardinality limit to use based on the
// InstrumentKind. The cardinality limit is the maximum number of distinct
// attribute sets that can be recorded for a single instrument.
//
// The selector returns (limit, fallback). When fallback is true, the pipeline
// falls back to the provider's global cardinality limit.
// When fallback is false, the limit is applied: a value of 0 or less means
// no limit, and a positive value is the limit for that kind.
// To avoid overriding the provider's global limit, return (0, true).
type CardinalityLimitSelector func(InstrumentKind) (limit int, fallback bool)
// defaultCardinalityLimitSelector is the default CardinalityLimitSelector used
// if WithCardinalityLimitSelector is not provided. It returns (0, true) for all
// instrument kinds, allowing the pipeline to fall back to the provider's global
// limit.
func defaultCardinalityLimitSelector(_ InstrumentKind) (int, bool) {
return 0, true
}
// ReaderOption is an option which can be applied to manual or Periodic
// readers.
type ReaderOption interface {
@@ -220,3 +248,33 @@ func (o producerOption) applyPeriodic(c periodicReaderConfig) periodicReaderConf
c.producers = append(c.producers, o.p)
return c
}
// WithCardinalityLimitSelector sets the CardinalityLimitSelector a reader will
// use to determine the cardinality limit for an instrument based on its kind.
// If this option is not used, the reader will use the
// defaultCardinalityLimitSelector.
//
// The selector should return (limit, false) to set a positive limit,
// (0, false) to explicitly specify unlimited, or
// (0, true) to fall back to the provider's global limit.
//
// See [CardinalityLimitSelector] for more details.
func WithCardinalityLimitSelector(selector CardinalityLimitSelector) ReaderOption {
return cardinalityLimitSelectorOption{selector: selector}
}
type cardinalityLimitSelectorOption struct {
selector CardinalityLimitSelector
}
// applyManual returns a manualReaderConfig with option applied.
func (o cardinalityLimitSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig {
c.cardinalityLimitSelector = o.selector
return c
}
// applyPeriodic returns a periodicReaderConfig with option applied.
func (o cardinalityLimitSelectorOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig {
c.cardinalityLimitSelector = o.selector
return c
}