mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-02-01 13:07:51 +02:00
Add cardinality limiting to the metric SDK as an experimental feature (#4457)
* Add agg limiting func * Add unit test for limitAttr * Add limiting to aggregate types * Add internal x pkg for experimental feature-flagging * Connect cardinality limit to metric SDK * Replace limitAttr fn with limiter type The Attribute method is still inlinable. * Use x.CardinalityLimit directly * Simplify limiter test * Add limiter benchmark * Document the AggregationLimit field * Test sum limits * Test limit for last value * Test histogram limit * Refactor expo hist test to use existing fixtures The tests for the exponential histogram create their own testing fixtures. There is nothing these new fixtures do that cannot already be done with the existing testing fixtures used by all the other aggregate functions. Unify the exponential histogram testing to use the existing fixtures. * Test the ExponentialHistogram limit * Fix lint * Add docs * Rename aggregation field to aggLimit --------- Co-authored-by: Robert Pająk <pellared@hotmail.com>
This commit is contained in:
parent
cb8cb2d269
commit
e3bf787c21
@ -19,6 +19,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- The `go.opentelemetry.io/otel/semconv/v1.24.0` package.
|
||||
The package contains semantic conventions from the `v1.24.0` version of the OpenTelemetry Semantic Conventions. (#4770)
|
||||
- Add `WithResourceAsConstantLabels` option to apply resource attributes for every metric emitted by the Prometheus exporter. (#4733)
|
||||
- Experimental cardinality limiting is added to the metric SDK.
|
||||
See [metric documentation](./sdk/metric/EXPERIMENTAL.md#cardinality-limit) for more information about this feature and how to enable it. (#4457)
|
||||
|
||||
### Changed
|
||||
|
||||
|
50
sdk/metric/EXPERIMENTAL.md
Normal file
50
sdk/metric/EXPERIMENTAL.md
Normal file
@ -0,0 +1,50 @@
|
||||
# Experimental Features
|
||||
|
||||
The metric SDK contains features that have not yet stabilized in the OpenTelemetry specification.
|
||||
These features are added to the OpenTelemetry Go metric SDK prior to stabilization in the specification so that users can start experimenting with them and provide feedback.
|
||||
|
||||
These feature may change in backwards incompatible ways as feedback is applied.
|
||||
See the [Compatibility and Stability](#compatibility-and-stability) section for more information.
|
||||
|
||||
## Features
|
||||
|
||||
- [Cardinality Limit](#cardinality-limit)
|
||||
|
||||
### Cardinality Limit
|
||||
|
||||
The cardinality limit is the hard limit on the number of metric streams that can be collected for a single instrument.
|
||||
|
||||
This experimental feature can be enabled by setting the `OTEL_GO_X_CARDINALITY_LIMIT` environment value.
|
||||
The value must be an integer value.
|
||||
All other values are ignored.
|
||||
|
||||
If the value set is less than or equal to `0`, no limit will be applied.
|
||||
|
||||
#### Examples
|
||||
|
||||
Set the cardinality limit to 2000.
|
||||
|
||||
```console
|
||||
export OTEL_GO_X_CARDINALITY_LIMIT=2000
|
||||
```
|
||||
|
||||
Set an infinite cardinality limit (functionally equivalent to disabling the feature).
|
||||
|
||||
```console
|
||||
export OTEL_GO_X_CARDINALITY_LIMIT=-1
|
||||
```
|
||||
|
||||
Disable the cardinality limit.
|
||||
|
||||
```console
|
||||
unset OTEL_GO_X_CARDINALITY_LIMIT
|
||||
```
|
||||
|
||||
## Compatibility and Stability
|
||||
|
||||
Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../VERSIONING.md).
|
||||
These features may be removed or modified in successive version releases, including patch versions.
|
||||
|
||||
When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.
|
||||
There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version.
|
||||
If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support.
|
@ -44,6 +44,14 @@ type Builder[N int64 | float64] struct {
|
||||
// Filter is the attribute filter the aggregate function will use on the
|
||||
// input of measurements.
|
||||
Filter attribute.Filter
|
||||
// AggregationLimit is the cardinality limit of measurement attributes. Any
|
||||
// measurement for new attributes once the limit has been reached will be
|
||||
// aggregated into a single aggregate for the "otel.metric.overflow"
|
||||
// attribute.
|
||||
//
|
||||
// If AggregationLimit is less than or equal to zero there will not be an
|
||||
// aggregation limit imposed (i.e. unlimited attribute sets).
|
||||
AggregationLimit int
|
||||
}
|
||||
|
||||
func (b Builder[N]) filter(f Measure[N]) Measure[N] {
|
||||
@ -63,7 +71,7 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
|
||||
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
|
||||
// Delta temporality is the only temporality that makes semantic sense for
|
||||
// a last-value aggregate.
|
||||
lv := newLastValue[N]()
|
||||
lv := newLastValue[N](b.AggregationLimit)
|
||||
|
||||
return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
|
||||
// Ignore if dest is not a metricdata.Gauge. The chance for memory
|
||||
@ -79,7 +87,7 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
|
||||
// PrecomputedSum returns a sum aggregate function input and output. The
|
||||
// arguments passed to the input are expected to be the precomputed sum values.
|
||||
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
|
||||
s := newPrecomputedSum[N](monotonic)
|
||||
s := newPrecomputedSum[N](monotonic, b.AggregationLimit)
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(s.measure), s.delta
|
||||
@ -90,7 +98,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati
|
||||
|
||||
// Sum returns a sum aggregate function input and output.
|
||||
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
|
||||
s := newSum[N](monotonic)
|
||||
s := newSum[N](monotonic, b.AggregationLimit)
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(s.measure), s.delta
|
||||
@ -102,7 +110,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
|
||||
// ExplicitBucketHistogram returns a histogram aggregate function input and
|
||||
// output.
|
||||
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
|
||||
h := newHistogram[N](boundaries, noMinMax, noSum)
|
||||
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit)
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(h.measure), h.delta
|
||||
@ -114,7 +122,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
|
||||
// ExponentialBucketHistogram returns a histogram aggregate function input and
|
||||
// output.
|
||||
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
|
||||
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum)
|
||||
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit)
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(h.measure), h.delta
|
||||
|
@ -31,11 +31,15 @@ var (
|
||||
keyUser = "user"
|
||||
userAlice = attribute.String(keyUser, "Alice")
|
||||
userBob = attribute.String(keyUser, "Bob")
|
||||
userCarol = attribute.String(keyUser, "Carol")
|
||||
userDave = attribute.String(keyUser, "Dave")
|
||||
adminTrue = attribute.Bool("admin", true)
|
||||
adminFalse = attribute.Bool("admin", false)
|
||||
|
||||
alice = attribute.NewSet(userAlice, adminTrue)
|
||||
bob = attribute.NewSet(userBob, adminFalse)
|
||||
carol = attribute.NewSet(userCarol, adminFalse)
|
||||
dave = attribute.NewSet(userDave, adminFalse)
|
||||
|
||||
// Filtered.
|
||||
attrFltr = func(kv attribute.KeyValue) bool {
|
||||
|
@ -288,13 +288,14 @@ func (b *expoBuckets) downscale(delta int) {
|
||||
// newExponentialHistogram returns an Aggregator that summarizes a set of
|
||||
// measurements as an exponential histogram. Each histogram is scoped by attributes
|
||||
// and the aggregation cycle the measurements were made in.
|
||||
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] {
|
||||
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int) *expoHistogram[N] {
|
||||
return &expoHistogram[N]{
|
||||
noSum: noSum,
|
||||
noMinMax: noMinMax,
|
||||
maxSize: int(maxSize),
|
||||
maxScale: int(maxScale),
|
||||
|
||||
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
|
||||
values: make(map[attribute.Set]*expoHistogramDataPoint[N]),
|
||||
|
||||
start: now(),
|
||||
@ -309,6 +310,7 @@ type expoHistogram[N int64 | float64] struct {
|
||||
maxSize int
|
||||
maxScale int
|
||||
|
||||
limit limiter[*expoHistogramDataPoint[N]]
|
||||
values map[attribute.Set]*expoHistogramDataPoint[N]
|
||||
valuesMu sync.Mutex
|
||||
|
||||
@ -324,6 +326,7 @@ func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Se
|
||||
e.valuesMu.Lock()
|
||||
defer e.valuesMu.Unlock()
|
||||
|
||||
attr = e.limit.Attributes(attr, e.values)
|
||||
v, ok := e.values[attr]
|
||||
if !ok {
|
||||
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
|
||||
|
@ -183,7 +183,7 @@ func testExpoHistogramMinMaxSumInt64(t *testing.T) {
|
||||
restore := withHandler(t)
|
||||
defer restore()
|
||||
|
||||
h := newExponentialHistogram[int64](4, 20, false, false)
|
||||
h := newExponentialHistogram[int64](4, 20, false, false, 0)
|
||||
for _, v := range tt.values {
|
||||
h.measure(context.Background(), v, alice)
|
||||
}
|
||||
@ -225,7 +225,7 @@ func testExpoHistogramMinMaxSumFloat64(t *testing.T) {
|
||||
restore := withHandler(t)
|
||||
defer restore()
|
||||
|
||||
h := newExponentialHistogram[float64](4, 20, false, false)
|
||||
h := newExponentialHistogram[float64](4, 20, false, false, 0)
|
||||
for _, v := range tt.values {
|
||||
h.measure(context.Background(), v, alice)
|
||||
}
|
||||
@ -747,8 +747,9 @@ func TestExponentialHistogramAggregation(t *testing.T) {
|
||||
|
||||
func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
|
||||
in, out := Builder[N]{
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
Filter: attrFltr,
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 2,
|
||||
}.ExponentialBucketHistogram(4, 20, false, false)
|
||||
ctx := context.Background()
|
||||
return test[N](in, out, []teststep[N]{
|
||||
@ -805,13 +806,67 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: []arg[N]{
|
||||
{ctx, 4, alice},
|
||||
{ctx, 4, alice},
|
||||
{ctx, 4, alice},
|
||||
{ctx, 2, alice},
|
||||
{ctx, 16, alice},
|
||||
{ctx, 1, alice},
|
||||
// These will exceed the cardinality limit.
|
||||
{ctx, 4, bob},
|
||||
{ctx, 4, bob},
|
||||
{ctx, 4, bob},
|
||||
{ctx, 2, carol},
|
||||
{ctx, 16, carol},
|
||||
{ctx, 1, dave},
|
||||
},
|
||||
expect: output{
|
||||
n: 2,
|
||||
agg: metricdata.ExponentialHistogram[N]{
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
|
||||
{
|
||||
Attributes: fltrAlice,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Count: 6,
|
||||
Min: metricdata.NewExtrema[N](1),
|
||||
Max: metricdata.NewExtrema[N](16),
|
||||
Sum: 31,
|
||||
Scale: -1,
|
||||
PositiveBucket: metricdata.ExponentialBucket{
|
||||
Offset: -1,
|
||||
Counts: []uint64{1, 4, 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
Attributes: overflowSet,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Count: 6,
|
||||
Min: metricdata.NewExtrema[N](1),
|
||||
Max: metricdata.NewExtrema[N](16),
|
||||
Sum: 31,
|
||||
Scale: -1,
|
||||
PositiveBucket: metricdata.ExponentialBucket{
|
||||
Offset: -1,
|
||||
Counts: []uint64{1, 4, 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
|
||||
in, out := Builder[N]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
Filter: attrFltr,
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 2,
|
||||
}.ExponentialBucketHistogram(4, 20, false, false)
|
||||
ctx := context.Background()
|
||||
return test[N](in, out, []teststep[N]{
|
||||
@ -911,6 +966,53 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: []arg[N]{
|
||||
// These will exceed the cardinality limit.
|
||||
{ctx, 4, bob},
|
||||
{ctx, 4, bob},
|
||||
{ctx, 4, bob},
|
||||
{ctx, 2, carol},
|
||||
{ctx, 16, carol},
|
||||
{ctx, 1, dave},
|
||||
},
|
||||
expect: output{
|
||||
n: 2,
|
||||
agg: metricdata.ExponentialHistogram[N]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
|
||||
{
|
||||
Attributes: fltrAlice,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Count: 9,
|
||||
Min: metricdata.NewExtrema[N](1),
|
||||
Max: metricdata.NewExtrema[N](16),
|
||||
Sum: 44,
|
||||
Scale: -1,
|
||||
PositiveBucket: metricdata.ExponentialBucket{
|
||||
Offset: -1,
|
||||
Counts: []uint64{1, 6, 2},
|
||||
},
|
||||
},
|
||||
{
|
||||
Attributes: overflowSet,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Count: 6,
|
||||
Min: metricdata.NewExtrema[N](1),
|
||||
Max: metricdata.NewExtrema[N](16),
|
||||
Sum: 31,
|
||||
Scale: -1,
|
||||
PositiveBucket: metricdata.ExponentialBucket{
|
||||
Offset: -1,
|
||||
Counts: []uint64{1, 4, 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -54,11 +54,12 @@ type histValues[N int64 | float64] struct {
|
||||
noSum bool
|
||||
bounds []float64
|
||||
|
||||
limit limiter[*buckets[N]]
|
||||
values map[attribute.Set]*buckets[N]
|
||||
valuesMu sync.Mutex
|
||||
}
|
||||
|
||||
func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] {
|
||||
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int) *histValues[N] {
|
||||
// The responsibility of keeping all buckets correctly associated with the
|
||||
// passed boundaries is ultimately this type's responsibility. Make a copy
|
||||
// here so we can always guarantee this. Or, in the case of failure, have
|
||||
@ -69,6 +70,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[
|
||||
return &histValues[N]{
|
||||
noSum: noSum,
|
||||
bounds: b,
|
||||
limit: newLimiter[*buckets[N]](limit),
|
||||
values: make(map[attribute.Set]*buckets[N]),
|
||||
}
|
||||
}
|
||||
@ -86,6 +88,7 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set)
|
||||
s.valuesMu.Lock()
|
||||
defer s.valuesMu.Unlock()
|
||||
|
||||
attr = s.limit.Attributes(attr, s.values)
|
||||
b, ok := s.values[attr]
|
||||
if !ok {
|
||||
// N+1 buckets. For example:
|
||||
@ -108,9 +111,9 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set)
|
||||
|
||||
// newHistogram returns an Aggregator that summarizes a set of measurements as
|
||||
// an histogram.
|
||||
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool) *histogram[N] {
|
||||
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int) *histogram[N] {
|
||||
return &histogram[N]{
|
||||
histValues: newHistValues[N](boundaries, noSum),
|
||||
histValues: newHistValues[N](boundaries, noSum, limit),
|
||||
noMinMax: noMinMax,
|
||||
start: now(),
|
||||
}
|
||||
|
@ -53,8 +53,9 @@ type conf[N int64 | float64] struct {
|
||||
|
||||
func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
|
||||
in, out := Builder[N]{
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
Filter: attrFltr,
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.ExplicitBucketHistogram(bounds, noMinMax, c.noSum)
|
||||
ctx := context.Background()
|
||||
return test[N](in, out, []teststep[N]{
|
||||
@ -114,13 +115,34 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: []arg[N]{
|
||||
{ctx, 1, alice},
|
||||
{ctx, 1, bob},
|
||||
// These will exceed cardinality limit.
|
||||
{ctx, 1, carol},
|
||||
{ctx, 1, dave},
|
||||
},
|
||||
expect: output{
|
||||
n: 3,
|
||||
agg: metricdata.Histogram[N]{
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
DataPoints: []metricdata.HistogramDataPoint[N]{
|
||||
c.hPt(fltrAlice, 1, 1),
|
||||
c.hPt(fltrBob, 1, 1),
|
||||
c.hPt(overflowSet, 1, 2),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
|
||||
in, out := Builder[N]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
Filter: attrFltr,
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.ExplicitBucketHistogram(bounds, noMinMax, c.noSum)
|
||||
ctx := context.Background()
|
||||
return test[N](in, out, []teststep[N]{
|
||||
@ -182,6 +204,24 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: []arg[N]{
|
||||
// These will exceed cardinality limit.
|
||||
{ctx, 1, carol},
|
||||
{ctx, 1, dave},
|
||||
},
|
||||
expect: output{
|
||||
n: 3,
|
||||
agg: metricdata.Histogram[N]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.HistogramDataPoint[N]{
|
||||
c.hPt(fltrAlice, 2, 4),
|
||||
c.hPt(fltrBob, 10, 3),
|
||||
c.hPt(overflowSet, 1, 2),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@ -273,7 +313,7 @@ func TestHistogramImmutableBounds(t *testing.T) {
|
||||
cpB := make([]float64, len(b))
|
||||
copy(cpB, b)
|
||||
|
||||
h := newHistogram[int64](b, false, false)
|
||||
h := newHistogram[int64](b, false, false, 0)
|
||||
require.Equal(t, cpB, h.bounds)
|
||||
|
||||
b[0] = 10
|
||||
@ -289,7 +329,7 @@ func TestHistogramImmutableBounds(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestCumulativeHistogramImutableCounts(t *testing.T) {
|
||||
h := newHistogram[int64](bounds, noMinMax, false)
|
||||
h := newHistogram[int64](bounds, noMinMax, false, 0)
|
||||
h.measure(context.Background(), 5, alice)
|
||||
|
||||
var data metricdata.Aggregation = metricdata.Histogram[int64]{}
|
||||
@ -307,7 +347,7 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) {
|
||||
func TestDeltaHistogramReset(t *testing.T) {
|
||||
t.Cleanup(mockTime(now))
|
||||
|
||||
h := newHistogram[int64](bounds, noMinMax, false)
|
||||
h := newHistogram[int64](bounds, noMinMax, false, 0)
|
||||
|
||||
var data metricdata.Aggregation = metricdata.Histogram[int64]{}
|
||||
require.Equal(t, 0, h.delta(&data))
|
||||
|
@ -29,20 +29,25 @@ type datapoint[N int64 | float64] struct {
|
||||
value N
|
||||
}
|
||||
|
||||
func newLastValue[N int64 | float64]() *lastValue[N] {
|
||||
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
|
||||
func newLastValue[N int64 | float64](limit int) *lastValue[N] {
|
||||
return &lastValue[N]{
|
||||
limit: newLimiter[datapoint[N]](limit),
|
||||
values: make(map[attribute.Set]datapoint[N]),
|
||||
}
|
||||
}
|
||||
|
||||
// lastValue summarizes a set of measurements as the last one made.
|
||||
type lastValue[N int64 | float64] struct {
|
||||
sync.Mutex
|
||||
|
||||
limit limiter[datapoint[N]]
|
||||
values map[attribute.Set]datapoint[N]
|
||||
}
|
||||
|
||||
func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) {
|
||||
d := datapoint[N]{timestamp: now(), value: value}
|
||||
s.Lock()
|
||||
attr = s.limit.Attributes(attr, s.values)
|
||||
s.values[attr] = d
|
||||
s.Unlock()
|
||||
}
|
||||
|
@ -29,7 +29,10 @@ func TestLastValue(t *testing.T) {
|
||||
}
|
||||
|
||||
func testLastValue[N int64 | float64]() func(*testing.T) {
|
||||
in, out := Builder[N]{Filter: attrFltr}.LastValue()
|
||||
in, out := Builder[N]{
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.LastValue()
|
||||
ctx := context.Background()
|
||||
return test[N](in, out, []teststep[N]{
|
||||
{
|
||||
@ -87,6 +90,36 @@ func testLastValue[N int64 | float64]() func(*testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
}, {
|
||||
input: []arg[N]{
|
||||
{ctx, 1, alice},
|
||||
{ctx, 1, bob},
|
||||
// These will exceed cardinality limit.
|
||||
{ctx, 1, carol},
|
||||
{ctx, 1, dave},
|
||||
},
|
||||
expect: output{
|
||||
n: 3,
|
||||
agg: metricdata.Gauge[N]{
|
||||
DataPoints: []metricdata.DataPoint[N]{
|
||||
{
|
||||
Attributes: fltrAlice,
|
||||
Time: staticTime,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Attributes: fltrBob,
|
||||
Time: staticTime,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Attributes: overflowSet,
|
||||
Time: staticTime,
|
||||
Value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
53
sdk/metric/internal/aggregate/limit.go
Normal file
53
sdk/metric/internal/aggregate/limit.go
Normal file
@ -0,0 +1,53 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
|
||||
import "go.opentelemetry.io/otel/attribute"
|
||||
|
||||
// overflowSet is the attribute set used to record a measurement when adding
|
||||
// another distinct attribute set to the aggregate would exceed the aggregate
|
||||
// limit.
|
||||
var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true))
|
||||
|
||||
// limiter limits aggregate values.
|
||||
type limiter[V any] struct {
|
||||
// aggLimit is the maximum number of metric streams that can be aggregated.
|
||||
//
|
||||
// Any metric stream with attributes distinct from any set already
|
||||
// aggregated once the aggLimit will be meet will instead be aggregated
|
||||
// into an "overflow" metric stream. That stream will only contain the
|
||||
// "otel.metric.overflow"=true attribute.
|
||||
aggLimit int
|
||||
}
|
||||
|
||||
// newLimiter returns a new Limiter with the provided aggregation limit.
|
||||
func newLimiter[V any](aggregation int) limiter[V] {
|
||||
return limiter[V]{aggLimit: aggregation}
|
||||
}
|
||||
|
||||
// Attributes checks if adding a measurement for attrs will exceed the
|
||||
// aggregation cardinality limit for the existing measurements. If it will,
|
||||
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
|
||||
// limit is not set (limit <= 0), attr is returned.
|
||||
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Set]V) attribute.Set {
|
||||
if l.aggLimit > 0 {
|
||||
_, exists := measurements[attrs]
|
||||
if !exists && len(measurements) >= l.aggLimit-1 {
|
||||
return overflowSet
|
||||
}
|
||||
}
|
||||
|
||||
return attrs
|
||||
}
|
67
sdk/metric/internal/aggregate/limit_test.go
Normal file
67
sdk/metric/internal/aggregate/limit_test.go
Normal file
@ -0,0 +1,67 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
)
|
||||
|
||||
func TestLimiterAttributes(t *testing.T) {
|
||||
m := map[attribute.Set]struct{}{alice: {}}
|
||||
t.Run("NoLimit", func(t *testing.T) {
|
||||
l := newLimiter[struct{}](0)
|
||||
assert.Equal(t, alice, l.Attributes(alice, m))
|
||||
assert.Equal(t, bob, l.Attributes(bob, m))
|
||||
})
|
||||
|
||||
t.Run("NotAtLimit/Exists", func(t *testing.T) {
|
||||
l := newLimiter[struct{}](3)
|
||||
assert.Equal(t, alice, l.Attributes(alice, m))
|
||||
})
|
||||
|
||||
t.Run("NotAtLimit/DoesNotExist", func(t *testing.T) {
|
||||
l := newLimiter[struct{}](3)
|
||||
assert.Equal(t, bob, l.Attributes(bob, m))
|
||||
})
|
||||
|
||||
t.Run("AtLimit/Exists", func(t *testing.T) {
|
||||
l := newLimiter[struct{}](2)
|
||||
assert.Equal(t, alice, l.Attributes(alice, m))
|
||||
})
|
||||
|
||||
t.Run("AtLimit/DoesNotExist", func(t *testing.T) {
|
||||
l := newLimiter[struct{}](2)
|
||||
assert.Equal(t, overflowSet, l.Attributes(bob, m))
|
||||
})
|
||||
}
|
||||
|
||||
var limitedAttr attribute.Set
|
||||
|
||||
func BenchmarkLimiterAttributes(b *testing.B) {
|
||||
m := map[attribute.Set]struct{}{alice: {}}
|
||||
l := newLimiter[struct{}](2)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for n := 0; n < b.N; n++ {
|
||||
limitedAttr = l.Attributes(alice, m)
|
||||
limitedAttr = l.Attributes(bob, m)
|
||||
}
|
||||
}
|
@ -26,15 +26,20 @@ import (
|
||||
// valueMap is the storage for sums.
|
||||
type valueMap[N int64 | float64] struct {
|
||||
sync.Mutex
|
||||
limit limiter[N]
|
||||
values map[attribute.Set]N
|
||||
}
|
||||
|
||||
func newValueMap[N int64 | float64]() *valueMap[N] {
|
||||
return &valueMap[N]{values: make(map[attribute.Set]N)}
|
||||
func newValueMap[N int64 | float64](limit int) *valueMap[N] {
|
||||
return &valueMap[N]{
|
||||
limit: newLimiter[N](limit),
|
||||
values: make(map[attribute.Set]N),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) {
|
||||
s.Lock()
|
||||
attr = s.limit.Attributes(attr, s.values)
|
||||
s.values[attr] += value
|
||||
s.Unlock()
|
||||
}
|
||||
@ -42,9 +47,9 @@ func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) {
|
||||
// newSum returns an aggregator that summarizes a set of measurements as their
|
||||
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
|
||||
// the measurements were made in.
|
||||
func newSum[N int64 | float64](monotonic bool) *sum[N] {
|
||||
func newSum[N int64 | float64](monotonic bool, limit int) *sum[N] {
|
||||
return &sum[N]{
|
||||
valueMap: newValueMap[N](),
|
||||
valueMap: newValueMap[N](limit),
|
||||
monotonic: monotonic,
|
||||
start: now(),
|
||||
}
|
||||
@ -129,9 +134,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
|
||||
// newPrecomputedSum returns an aggregator that summarizes a set of
|
||||
// observatrions as their arithmetic sum. Each sum is scoped by attributes and
|
||||
// the aggregation cycle the measurements were made in.
|
||||
func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] {
|
||||
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int) *precomputedSum[N] {
|
||||
return &precomputedSum[N]{
|
||||
valueMap: newValueMap[N](),
|
||||
valueMap: newValueMap[N](limit),
|
||||
monotonic: monotonic,
|
||||
start: now(),
|
||||
}
|
||||
|
@ -40,8 +40,9 @@ func TestSum(t *testing.T) {
|
||||
func testDeltaSum[N int64 | float64]() func(t *testing.T) {
|
||||
mono := false
|
||||
in, out := Builder[N]{
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
Filter: attrFltr,
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.Sum(mono)
|
||||
ctx := context.Background()
|
||||
return test[N](in, out, []teststep[N]{
|
||||
@ -125,14 +126,51 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: []arg[N]{
|
||||
{ctx, 1, alice},
|
||||
{ctx, 1, bob},
|
||||
// These will exceed cardinality limit.
|
||||
{ctx, 1, carol},
|
||||
{ctx, 1, dave},
|
||||
},
|
||||
expect: output{
|
||||
n: 3,
|
||||
agg: metricdata.Sum[N]{
|
||||
IsMonotonic: mono,
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
DataPoints: []metricdata.DataPoint[N]{
|
||||
{
|
||||
Attributes: fltrAlice,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Attributes: fltrBob,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Attributes: overflowSet,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func testCumulativeSum[N int64 | float64]() func(t *testing.T) {
|
||||
mono := false
|
||||
in, out := Builder[N]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
Filter: attrFltr,
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.Sum(mono)
|
||||
ctx := context.Background()
|
||||
return test[N](in, out, []teststep[N]{
|
||||
@ -204,14 +242,49 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: []arg[N]{
|
||||
// These will exceed cardinality limit.
|
||||
{ctx, 1, carol},
|
||||
{ctx, 1, dave},
|
||||
},
|
||||
expect: output{
|
||||
n: 3,
|
||||
agg: metricdata.Sum[N]{
|
||||
IsMonotonic: mono,
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.DataPoint[N]{
|
||||
{
|
||||
Attributes: fltrAlice,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 14,
|
||||
},
|
||||
{
|
||||
Attributes: fltrBob,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: -8,
|
||||
},
|
||||
{
|
||||
Attributes: overflowSet,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) {
|
||||
mono := false
|
||||
in, out := Builder[N]{
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
Filter: attrFltr,
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.PrecomputedSum(mono)
|
||||
ctx := context.Background()
|
||||
return test[N](in, out, []teststep[N]{
|
||||
@ -296,14 +369,51 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: []arg[N]{
|
||||
{ctx, 1, alice},
|
||||
{ctx, 1, bob},
|
||||
// These will exceed cardinality limit.
|
||||
{ctx, 1, carol},
|
||||
{ctx, 1, dave},
|
||||
},
|
||||
expect: output{
|
||||
n: 3,
|
||||
agg: metricdata.Sum[N]{
|
||||
IsMonotonic: mono,
|
||||
Temporality: metricdata.DeltaTemporality,
|
||||
DataPoints: []metricdata.DataPoint[N]{
|
||||
{
|
||||
Attributes: fltrAlice,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Attributes: fltrBob,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Attributes: overflowSet,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) {
|
||||
mono := false
|
||||
in, out := Builder[N]{
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
Filter: attrFltr,
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
Filter: attrFltr,
|
||||
AggregationLimit: 3,
|
||||
}.PrecomputedSum(mono)
|
||||
ctx := context.Background()
|
||||
return test[N](in, out, []teststep[N]{
|
||||
@ -388,6 +498,42 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
input: []arg[N]{
|
||||
{ctx, 1, alice},
|
||||
{ctx, 1, bob},
|
||||
// These will exceed cardinality limit.
|
||||
{ctx, 1, carol},
|
||||
{ctx, 1, dave},
|
||||
},
|
||||
expect: output{
|
||||
n: 3,
|
||||
agg: metricdata.Sum[N]{
|
||||
IsMonotonic: mono,
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
DataPoints: []metricdata.DataPoint[N]{
|
||||
{
|
||||
Attributes: fltrAlice,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Attributes: fltrBob,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 1,
|
||||
},
|
||||
{
|
||||
Attributes: overflowSet,
|
||||
StartTime: staticTime,
|
||||
Time: staticTime,
|
||||
Value: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -43,6 +43,9 @@ var (
|
||||
//
|
||||
// To enable this feature set the OTEL_GO_X_CARDINALITY_LIMIT environment
|
||||
// variable to the integer limit value you want to use.
|
||||
//
|
||||
// Setting OTEL_GO_X_CARDINALITY_LIMIT to a value less than or equal to 0
|
||||
// will disable the cardinality limits.
|
||||
CardinalityLimit = newFeature("CARDINALITY_LIMIT", func(v string) (int, bool) {
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/x"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
@ -361,6 +362,12 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
|
||||
Temporality: i.pipeline.reader.temporality(kind),
|
||||
}
|
||||
b.Filter = stream.AttributeFilter
|
||||
// A value less than or equal to zero will disable the aggregation
|
||||
// limits for the builder (an all the created aggregates).
|
||||
// CardinalityLimit.Lookup returns 0 by default if unset (or
|
||||
// unrecognized input). Use that value directly.
|
||||
b.AggregationLimit, _ = x.CardinalityLimit.Lookup()
|
||||
|
||||
in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
|
||||
if err != nil {
|
||||
return aggVal[N]{0, nil, err}
|
||||
|
Loading…
x
Reference in New Issue
Block a user