1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-03-31 21:55:32 +02:00

Combine precomputed values of filtered attribute sets (#3549)

* Combine spatially aggregated precomputed vals

Fix #3439

When an attribute filter drops a distinguishing attribute during the
aggregation of a precomputed sum add that value to existing, instead of
just setting the value as an override (current behavior).

* Ignore false positive lint error and test method

* Add fix to changelog

* Handle edge case of exact set after filter

* Fix filter and measure algo for precomp

* Add tests for precomp sums

* Unify precomputedMap

* Adds example from supplimental guide

* Fixes for lint

* Update sdk/metric/meter_example_test.go

* Fix async example test

* Reduce duplicate code in TestAsynchronousExample

* Clarify naming and documentation

* Fix spelling errors

* Add a noop filter to default view

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
Co-authored-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com>
This commit is contained in:
Tyler Yahn 2023-01-20 09:54:42 -08:00 committed by GitHub
parent 88f6000318
commit a1ce7e5f0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 752 additions and 92 deletions

View File

@ -107,6 +107,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed
- Asynchronous instruments that use sum aggregators and attribute filters correctly add values from equivalent attribute sets that have been filtered. (#3439, #3549)
- The `RegisterCallback` method of the `Meter` from `go.opentelemetry.io/otel/sdk/metric` only registers a callback for instruments created by that meter.
Trying to register a callback with instruments from a different meter will result in an error being returned. (#3584)

View File

@ -22,13 +22,13 @@ import (
)
// now is used to return the current local time while allowing tests to
// override the the default time.Now function.
// override the default time.Now function.
var now = time.Now
// Aggregator forms an aggregation from a collection of recorded measurements.
//
// Aggregators need to be comparable so they can be de-duplicated by the SDK when
// it creates them for multiple views.
// Aggregators need to be comparable so they can be de-duplicated by the SDK
// when it creates them for multiple views.
type Aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
@ -38,3 +38,22 @@ type Aggregator[N int64 | float64] interface {
// measurements made and ends an aggregation cycle.
Aggregation() metricdata.Aggregation
}
// precomputeAggregator is an Aggregator that receives values to aggregate that
// have been pre-computed by the caller.
type precomputeAggregator[N int64 | float64] interface {
// The Aggregate method of the embedded Aggregator is used to record
// pre-computed measurements, scoped by attributes that have not been
// filtered by an attribute filter.
Aggregator[N]
// aggregateFiltered records measurements scoped by attributes that have
// been filtered by an attribute filter.
//
// Pre-computed measurements of filtered attributes need to be recorded
// separate from those that haven't been filtered so they can be added to
// the non-filtered pre-computed measurements in a collection cycle and
// then resets after the cycle (the non-filtered pre-computed measurements
// are not reset).
aggregateFiltered(N, attribute.Set)
}

View File

@ -21,8 +21,26 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// filter is an aggregator that applies attribute filter when Aggregating. filters
// do not have any backing memory, and must be constructed with a backing Aggregator.
// NewFilter returns an Aggregator that wraps an agg with an attribute
// filtering function. Both pre-computed non-pre-computed Aggregators can be
// passed for agg. An appropriate Aggregator will be returned for the detected
// type.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
if fa, ok := agg.(precomputeAggregator[N]); ok {
return newPrecomputedFilter(fa, fn)
}
return newFilter(agg, fn)
}
// filter wraps an aggregator with an attribute filter. All recorded
// measurements will have their attributes filtered before they are passed to
// the underlying aggregator's Aggregate method.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator Aggregator[N]
@ -31,15 +49,16 @@ type filter[N int64 | float64] struct {
seen map[attribute.Set]attribute.Set
}
// NewFilter wraps an Aggregator with an attribute filtering function.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] {
if fn == nil {
return agg
}
// newFilter returns an filter Aggregator that wraps agg with the attribute
// filter fn.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] {
return &filter[N]{
filter: fn,
aggregator: agg,
seen: map[attribute.Set]attribute.Set{},
seen: make(map[attribute.Set]attribute.Set),
}
}
@ -62,3 +81,54 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}
// precomputedFilter is an aggregator that applies attribute filter when
// Aggregating for pre-computed Aggregations. The pre-computed Aggregations
// need to operate normally when no attribute filtering is done (for sums this
// means setting the value), but when attribute filtering is done it needs to
// be added to any set value.
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator precomputeAggregator[N]
sync.Mutex
seen map[attribute.Set]attribute.Set
}
// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
// with the attribute filter fn.
//
// This should not be used to wrap a non-pre-computed Aggregator. Use a
// precomputedFilter instead.
func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] {
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
seen: make(map[attribute.Set]attribute.Set),
}
}
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
// TODO (#3006): drop stale attributes from seen.
f.Lock()
defer f.Unlock()
fAttr, ok := f.seen[attr]
if !ok {
fAttr, _ = attr.Filter(f.filter)
f.seen[attr] = fAttr
}
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
} else {
f.aggregator.aggregateFiltered(measurement, fAttr)
}
}
// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}

View File

@ -15,6 +15,8 @@
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
import (
"fmt"
"strings"
"sync"
"testing"
@ -194,3 +196,90 @@ func TestFilterConcurrent(t *testing.T) {
testFilterConcurrent[float64](t)
})
}
func TestPrecomputedFilter(t *testing.T) {
t.Run("Int64", testPrecomputedFilter[int64]())
t.Run("Float64", testPrecomputedFilter[float64]())
}
func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
agg := newTestFilterAgg[N]()
f := NewFilter[N](agg, testAttributeFilter)
require.IsType(t, &precomputedFilter[N]{}, f)
var (
powerLevel = attribute.Int("power-level", 9000)
user = attribute.String("user", "Alice")
admin = attribute.Bool("admin", true)
)
a := attribute.NewSet(powerLevel)
key := a
f.Aggregate(1, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(0), agg.values[key].filtered, str(a))
a = attribute.NewSet(powerLevel, user)
f.Aggregate(2, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(2), agg.values[key].filtered, str(a))
a = attribute.NewSet(powerLevel, user, admin)
f.Aggregate(3, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
a = attribute.NewSet(powerLevel)
f.Aggregate(2, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
a = attribute.NewSet(user)
f.Aggregate(3, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a))
_ = f.Aggregation()
assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation")
}
}
func str(a attribute.Set) string {
iter := a.Iter()
out := make([]string, 0, iter.Len())
for iter.Next() {
kv := iter.Attribute()
out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface()))
}
return strings.Join(out, ",")
}
type testFilterAgg[N int64 | float64] struct {
values map[attribute.Set]precomputedValue[N]
aggregationN int
}
func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] {
return &testFilterAgg[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}
func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) {
v := a.values[attr]
v.measured = val
a.values[attr] = v
}
// nolint: unused // Used to agg filtered.
func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) {
v := a.values[attr]
v.filtered += val
a.values[attr] = v
}
func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation {
a.aggregationN++
return nil
}

View File

@ -22,7 +22,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// valueMap is the storage for all sums.
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]N
@ -32,12 +32,6 @@ func newValueMap[N int64 | float64]() *valueMap[N] {
return &valueMap[N]{values: make(map[attribute.Set]N)}
}
func (s *valueMap[N]) set(value N, attr attribute.Set) { // nolint: unused // This is indeed used.
s.Lock()
s.values[attr] = value
s.Unlock()
}
func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
s.values[attr] += value
@ -164,48 +158,107 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
return out
}
// precomputedValue is the recorded measurement value for a set of attributes.
type precomputedValue[N int64 | float64] struct {
// measured is the last value measured for a set of attributes that were
// not filtered.
measured N
// filtered is the sum of values from measurements that had their
// attributes filtered.
filtered N
}
// precomputedMap is the storage for precomputed sums.
type precomputedMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]precomputedValue[N]
}
func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] {
return &precomputedMap[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}
// Aggregate records value with the unfiltered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value overwrite
// that value.
// - If that measurement's attributes were filtered, this value will be
// recorded along side that value.
func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
v := s.values[attr]
v.measured = value
s.values[attr] = v
s.Unlock()
}
// aggregateFiltered records value with the filtered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value will be
// recorded along side that value.
// - If that measurement's attributes were filtered, this value will be
// added to it.
//
// This method should not be used if attr have not been reduced by an attribute
// filter.
func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered.
s.Lock()
v := s.values[attr]
v.filtered += value
s.values[attr] = v
s.Unlock()
}
// NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
// attributes and the aggregation cycle the measurements were made in.
// pre-computed sums. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in.
//
// The monotonic value is used to communicate the produced Aggregation is
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// The output Aggregation will report recorded values as delta temporality. It
// is up to the caller to ensure this is accurate.
// The output Aggregation will report recorded values as delta temporality.
func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedDeltaSum[N]{
recorded: make(map[attribute.Set]N),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
precomputedMap: newPrecomputedMap[N](),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
}
}
// precomputedDeltaSum summarizes a set of measurements recorded over all
// aggregation cycles as the delta arithmetic sum.
// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all
// aggregation cycles as the delta of these sums.
type precomputedDeltaSum[N int64 | float64] struct {
sync.Mutex
recorded map[attribute.Set]N
*precomputedMap[N]
reported map[attribute.Set]N
monotonic bool
start time.Time
}
// Aggregate records value as a cumulative sum for attr.
func (s *precomputedDeltaSum[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
s.recorded[attr] = value
s.Unlock()
}
// Aggregation returns the recorded pre-computed sums as an Aggregation. The
// sum values are expressed as the delta between what was measured this
// collection cycle and the previous.
//
// All pre-computed sums that were recorded for attributes sets reduced by an
// attribute filter (filtered-sums) are summed together and added to any
// pre-computed sum value recorded directly for the resulting attribute set
// (unfiltered-sum). The filtered-sums are reset to zero for the next
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
s.Lock()
defer s.Unlock()
if len(s.recorded) == 0 {
if len(s.values) == 0 {
return nil
}
@ -213,19 +266,22 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
out := metricdata.Sum[N]{
Temporality: metricdata.DeltaTemporality,
IsMonotonic: s.monotonic,
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.recorded)),
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, recorded := range s.recorded {
value := recorded - s.reported[attr]
for attr, value := range s.values {
v := value.measured + value.filtered
delta := v - s.reported[attr]
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value,
Value: delta,
})
if value != 0 {
s.reported[attr] = recorded
if delta != 0 {
s.reported[attr] = v
}
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
@ -237,26 +293,68 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
}
// NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
// attributes and the aggregation cycle the measurements were made in.
// pre-computed sums. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in.
//
// The monotonic value is used to communicate the produced Aggregation is
// monotonic or not. The returned Aggregator does not make any guarantees this
// value is accurate. It is up to the caller to ensure it.
//
// The output Aggregation will report recorded values as cumulative
// temporality. It is up to the caller to ensure this is accurate.
// temporality.
func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
return &precomputedSum[N]{newCumulativeSum[N](monotonic)}
return &precomputedCumulativeSum[N]{
precomputedMap: newPrecomputedMap[N](),
monotonic: monotonic,
start: now(),
}
}
// precomputedSum summarizes a set of measurements recorded over all
// aggregation cycles directly as the cumulative arithmetic sum.
type precomputedSum[N int64 | float64] struct {
*cumulativeSum[N]
// precomputedCumulativeSum directly records and reports a set of pre-computed sums.
type precomputedCumulativeSum[N int64 | float64] struct {
*precomputedMap[N]
monotonic bool
start time.Time
}
// Aggregate records value as a cumulative sum for attr.
func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) {
s.set(value, attr)
// Aggregation returns the recorded pre-computed sums as an Aggregation. The
// sum values are expressed directly as they are assumed to be recorded as the
// cumulative sum of a some measured phenomena.
//
// All pre-computed sums that were recorded for attributes sets reduced by an
// attribute filter (filtered-sums) are summed together and added to any
// pre-computed sum value recorded directly for the resulting attribute set
// (unfiltered-sum). The filtered-sums are reset to zero for the next
// collection cycle, and the unfiltered-sum is kept for the next collection
// cycle.
func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
s.Lock()
defer s.Unlock()
if len(s.values) == 0 {
return nil
}
t := now()
out := metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: s.monotonic,
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, value := range s.values {
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value.measured + value.filtered,
})
value.filtered = N(0)
s.values[attr] = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
return out
}

View File

@ -18,6 +18,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
@ -163,6 +164,134 @@ func TestDeltaSumReset(t *testing.T) {
t.Run("Float64", testDeltaSumReset[float64])
}
func TestPreComputedDeltaSum(t *testing.T) {
var mono bool
agg := NewPrecomputedDeltaSum[int64](mono)
require.Implements(t, (*precomputeAggregator[int64])(nil), agg)
attrs := attribute.NewSet(attribute.String("key", "val"))
agg.Aggregate(1, attrs)
got := agg.Aggregation()
want := metricdata.Sum[int64]{
IsMonotonic: mono,
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)},
}
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Delta values should zero.
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
// measured(+): 1, previous(-): 1, filtered(+): 1
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Filtered values should not persist.
got = agg.Aggregation()
// measured(+): 1, previous(-): 2, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
got = agg.Aggregation()
// measured(+): 1, previous(-): 1, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Override set value.
agg.Aggregate(2, attrs)
agg.Aggregate(5, attrs)
// Filtered should add.
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
// measured(+): 5, previous(-): 1, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Filtered values should not persist.
agg.Aggregate(5, attrs)
got = agg.Aggregation()
// measured(+): 5, previous(-): 18, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Order should not affect measure.
// Filtered should add.
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.Aggregate(7, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
// measured(+): 7, previous(-): 5, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
agg.Aggregate(7, attrs)
got = agg.Aggregation()
// measured(+): 7, previous(-): 20, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
}
func TestPreComputedCumulativeSum(t *testing.T) {
var mono bool
agg := NewPrecomputedCumulativeSum[int64](mono)
require.Implements(t, (*precomputeAggregator[int64])(nil), agg)
attrs := attribute.NewSet(attribute.String("key", "val"))
agg.Aggregate(1, attrs)
got := agg.Aggregation()
want := metricdata.Sum[int64]{
IsMonotonic: mono,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)},
}
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Cumulative values should persist.
got = agg.Aggregation()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Filtered values should not persist.
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Override set value.
agg.Aggregate(5, attrs)
// Filtered should add.
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Filtered values should not persist.
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 5)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Order should not affect measure.
// Filtered should add.
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.Aggregate(7, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 7)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
}
func TestEmptySumNilAggregation(t *testing.T) {
assert.Nil(t, NewCumulativeSum[int64](true).Aggregation())
assert.Nil(t, NewCumulativeSum[int64](false).Aggregation())

View File

@ -896,6 +896,11 @@ func TestRegisterCallbackDropAggregations(t *testing.T) {
}
func TestAttributeFilter(t *testing.T) {
t.Run("Delta", testAttributeFilter(metricdata.DeltaTemporality))
t.Run("Cumulative", testAttributeFilter(metricdata.CumulativeTemporality))
}
func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
one := 1.0
two := 2.0
testcases := []struct {
@ -912,7 +917,8 @@ func TestAttributeFilter(t *testing.T) {
}
_, err = mtr.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.ObserveFloat64(ctr, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
o.ObserveFloat64(ctr, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
o.ObserveFloat64(ctr, 2.0, attribute.String("foo", "bar"))
o.ObserveFloat64(ctr, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
}, ctr)
return err
@ -923,10 +929,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 2.0, // TODO (#3439): This should be 3.0.
Value: 4.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
@ -940,7 +946,8 @@ func TestAttributeFilter(t *testing.T) {
}
_, err = mtr.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.ObserveFloat64(ctr, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
o.ObserveFloat64(ctr, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
o.ObserveFloat64(ctr, 2.0, attribute.String("foo", "bar"))
o.ObserveFloat64(ctr, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
}, ctr)
return err
@ -951,10 +958,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 2.0, // TODO (#3439): This should be 3.0.
Value: 4.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
@ -994,7 +1001,8 @@ func TestAttributeFilter(t *testing.T) {
}
_, err = mtr.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(ctr, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
o.ObserveInt64(ctr, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
o.ObserveInt64(ctr, 20, attribute.String("foo", "bar"))
o.ObserveInt64(ctr, 10, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
}, ctr)
return err
@ -1005,10 +1013,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 20, // TODO (#3439): This should be 30.
Value: 40,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
@ -1022,7 +1030,8 @@ func TestAttributeFilter(t *testing.T) {
}
_, err = mtr.RegisterCallback(func(_ context.Context, o metric.Observer) error {
o.ObserveInt64(ctr, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
o.ObserveInt64(ctr, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
o.ObserveInt64(ctr, 20, attribute.String("foo", "bar"))
o.ObserveInt64(ctr, 10, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
}, ctr)
return err
@ -1033,10 +1042,10 @@ func TestAttributeFilter(t *testing.T) {
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 20, // TODO (#3439): This should be 30.
Value: 40,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
@ -1088,7 +1097,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
@ -1114,7 +1123,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
@ -1145,7 +1154,7 @@ func TestAttributeFilter(t *testing.T) {
Sum: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
},
},
},
@ -1170,7 +1179,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: true,
},
},
@ -1196,7 +1205,7 @@ func TestAttributeFilter(t *testing.T) {
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
IsMonotonic: false,
},
},
@ -1227,37 +1236,282 @@ func TestAttributeFilter(t *testing.T) {
Sum: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
Temporality: temporality,
},
},
},
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
rdr := NewManualReader()
mtr := NewMeterProvider(
WithReader(rdr),
WithView(NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("foo")
}},
)),
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))
return func(t *testing.T) {
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
rdr := NewManualReader(WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality {
return temporality
}))
mtr := NewMeterProvider(
WithReader(rdr),
WithView(NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("foo")
}},
)),
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))
m, err := rdr.Collect(context.Background())
assert.NoError(t, err)
m, err := rdr.Collect(context.Background())
assert.NoError(t, err)
require.Len(t, m.ScopeMetrics, 1)
require.Len(t, m.ScopeMetrics[0].Metrics, 1)
require.Len(t, m.ScopeMetrics, 1)
require.Len(t, m.ScopeMetrics[0].Metrics, 1)
metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
})
metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
})
}
}
}
func TestAsynchronousExample(t *testing.T) {
// This example can be found:
// https://github.com/open-telemetry/opentelemetry-specification/blob/8b91585e6175dd52b51e1d60bea105041225e35d/specification/metrics/supplementary-guidelines.md#asynchronous-example
var (
threadID1 = attribute.Int("tid", 1)
threadID2 = attribute.Int("tid", 2)
threadID3 = attribute.Int("tid", 3)
processID1001 = attribute.String("pid", "1001")
thread1 = attribute.NewSet(processID1001, threadID1)
thread2 = attribute.NewSet(processID1001, threadID2)
thread3 = attribute.NewSet(processID1001, threadID3)
process1001 = attribute.NewSet(processID1001)
)
setup := func(t *testing.T, temp metricdata.Temporality) (map[attribute.Set]int64, func(*testing.T), *metricdata.ScopeMetrics, *int64, *int64, *int64) {
t.Helper()
const (
instName = "pageFaults"
filteredStream = "filteredPageFaults"
scopeName = "AsynchronousExample"
)
selector := func(InstrumentKind) metricdata.Temporality { return temp }
reader := NewManualReader(WithTemporalitySelector(selector))
noopFilter := func(kv attribute.KeyValue) bool { return true }
noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: noopFilter})
filter := func(kv attribute.KeyValue) bool { return kv.Key != "tid" }
filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter})
mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered))
meter := mp.Meter(scopeName)
observations := make(map[attribute.Set]int64)
_, err := meter.Int64ObservableCounter(instName, instrument.WithInt64Callback(
func(ctx context.Context, o instrument.Int64Observer) error {
for attrSet, val := range observations {
o.Observe(ctx, val, attrSet.ToSlice()...)
}
return nil
},
))
require.NoError(t, err)
want := &metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: scopeName},
Metrics: []metricdata.Metrics{
{
Name: filteredStream,
Data: metricdata.Sum[int64]{
Temporality: temp,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: process1001},
},
},
},
{
Name: instName,
Data: metricdata.Sum[int64]{
Temporality: temp,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: thread1},
{Attributes: thread2},
},
},
},
},
}
wantFiltered := &want.Metrics[0].Data.(metricdata.Sum[int64]).DataPoints[0].Value
wantThread1 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[0].Value
wantThread2 := &want.Metrics[1].Data.(metricdata.Sum[int64]).DataPoints[1].Value
collect := func(t *testing.T) {
t.Helper()
got, err := reader.Collect(context.Background())
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())
}
return observations, collect, want, wantFiltered, wantThread1, wantThread2
}
t.Run("Cumulative", func(t *testing.T) {
temporality := metricdata.CumulativeTemporality
observations, verify, want, wantFiltered, wantThread1, wantThread2 := setup(t, temporality)
// During the time range (T0, T1]:
// pid = 1001, tid = 1, #PF = 50
// pid = 1001, tid = 2, #PF = 30
observations[thread1] = 50
observations[thread2] = 30
*wantFiltered = 80
*wantThread1 = 50
*wantThread2 = 30
verify(t)
// During the time range (T1, T2]:
// pid = 1001, tid = 1, #PF = 53
// pid = 1001, tid = 2, #PF = 38
observations[thread1] = 53
observations[thread2] = 38
*wantFiltered = 91
*wantThread1 = 53
*wantThread2 = 38
verify(t)
// During the time range (T2, T3]
// pid = 1001, tid = 1, #PF = 56
// pid = 1001, tid = 2, #PF = 42
observations[thread1] = 56
observations[thread2] = 42
*wantFiltered = 98
*wantThread1 = 56
*wantThread2 = 42
verify(t)
// During the time range (T3, T4]:
// pid = 1001, tid = 1, #PF = 60
// pid = 1001, tid = 2, #PF = 47
observations[thread1] = 60
observations[thread2] = 47
*wantFiltered = 107
*wantThread1 = 60
*wantThread2 = 47
verify(t)
// During the time range (T4, T5]:
// thread 1 died, thread 3 started
// pid = 1001, tid = 2, #PF = 53
// pid = 1001, tid = 3, #PF = 5
delete(observations, thread1)
observations[thread2] = 53
observations[thread3] = 5
*wantFiltered = 58
want.Metrics[1].Data = metricdata.Sum[int64]{
Temporality: temporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
// Thread 1 remains at last measured value.
{Attributes: thread1, Value: 60},
{Attributes: thread2, Value: 53},
{Attributes: thread3, Value: 5},
},
}
verify(t)
})
t.Run("Delta", func(t *testing.T) {
temporality := metricdata.DeltaTemporality
observations, verify, want, wantFiltered, wantThread1, wantThread2 := setup(t, temporality)
// During the time range (T0, T1]:
// pid = 1001, tid = 1, #PF = 50
// pid = 1001, tid = 2, #PF = 30
observations[thread1] = 50
observations[thread2] = 30
*wantFiltered = 80
*wantThread1 = 50
*wantThread2 = 30
verify(t)
// During the time range (T1, T2]:
// pid = 1001, tid = 1, #PF = 53
// pid = 1001, tid = 2, #PF = 38
observations[thread1] = 53
observations[thread2] = 38
*wantFiltered = 11
*wantThread1 = 3
*wantThread2 = 8
verify(t)
// During the time range (T2, T3]
// pid = 1001, tid = 1, #PF = 56
// pid = 1001, tid = 2, #PF = 42
observations[thread1] = 56
observations[thread2] = 42
*wantFiltered = 7
*wantThread1 = 3
*wantThread2 = 4
verify(t)
// During the time range (T3, T4]:
// pid = 1001, tid = 1, #PF = 60
// pid = 1001, tid = 2, #PF = 47
observations[thread1] = 60
observations[thread2] = 47
*wantFiltered = 9
*wantThread1 = 4
*wantThread2 = 5
verify(t)
// During the time range (T4, T5]:
// thread 1 died, thread 3 started
// pid = 1001, tid = 2, #PF = 53
// pid = 1001, tid = 3, #PF = 5
delete(observations, thread1)
observations[thread2] = 53
observations[thread3] = 5
*wantFiltered = -49
want.Metrics[1].Data = metricdata.Sum[int64]{
Temporality: temporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
// Thread 1 remains at last measured value.
{Attributes: thread1, Value: 0},
{Attributes: thread2, Value: 6},
{Attributes: thread3, Value: 5},
},
}
verify(t)
})
}
var (
aiCounter instrument.Int64ObservableCounter
aiUpDownCounter instrument.Int64ObservableUpDownCounter