1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-18 03:22:12 +02:00

Metric SDK: Sum duplicate async observations regardless of filtering (#4289)

* Metric SDK: Remove the distinction between filtered and unfiltered attributes.
This commit is contained in:
David Ashpole 2023-07-19 11:52:11 -04:00 committed by GitHub
parent f2a9f2f2be
commit c197fe9305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 45 additions and 308 deletions

View File

@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view.
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)
- If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289)
- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332)
- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333)

View File

@ -38,22 +38,3 @@ type aggregator[N int64 | float64] interface {
// measurements made and ends an aggregation cycle.
Aggregation() metricdata.Aggregation
}
// precomputeAggregator is an Aggregator that receives values to aggregate that
// have been pre-computed by the caller.
type precomputeAggregator[N int64 | float64] interface {
// The Aggregate method of the embedded Aggregator is used to record
// pre-computed measurements, scoped by attributes that have not been
// filtered by an attribute filter.
aggregator[N]
// aggregateFiltered records measurements scoped by attributes that have
// been filtered by an attribute filter.
//
// Pre-computed measurements of filtered attributes need to be recorded
// separate from those that haven't been filtered so they can be added to
// the non-filtered pre-computed measurements in a collection cycle and
// then resets after the cycle (the non-filtered pre-computed measurements
// are not reset).
aggregateFiltered(N, attribute.Set)
}

View File

@ -27,9 +27,6 @@ func newFilter[N int64 | float64](agg aggregator[N], fn attribute.Filter) aggreg
if fn == nil {
return agg
}
if fa, ok := agg.(precomputeAggregator[N]); ok {
return newPrecomputedFilter(fa, fn)
}
return &filter[N]{
filter: fn,
aggregator: agg,
@ -59,43 +56,3 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}
// precomputedFilter is an aggregator that applies attribute filter when
// Aggregating for pre-computed Aggregations. The pre-computed Aggregations
// need to operate normally when no attribute filtering is done (for sums this
// means setting the value), but when attribute filtering is done it needs to
// be added to any set value.
type precomputedFilter[N int64 | float64] struct {
filter attribute.Filter
aggregator precomputeAggregator[N]
}
// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg
// with the attribute filter fn.
//
// This should not be used to wrap a non-pre-computed Aggregator. Use a
// precomputedFilter instead.
func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] {
return &precomputedFilter[N]{
filter: fn,
aggregator: agg,
}
}
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) {
fAttr, _ := attr.Filter(f.filter)
if fAttr.Equals(&attr) {
// No filtering done.
f.aggregator.Aggregate(measurement, fAttr)
} else {
f.aggregator.aggregateFiltered(measurement, fAttr)
}
}
// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}

View File

@ -15,8 +15,6 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"fmt"
"strings"
"sync"
"testing"
@ -196,90 +194,3 @@ func TestFilterConcurrent(t *testing.T) {
testFilterConcurrent[float64](t)
})
}
func TestPrecomputedFilter(t *testing.T) {
t.Run("Int64", testPrecomputedFilter[int64]())
t.Run("Float64", testPrecomputedFilter[float64]())
}
func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
agg := newTestFilterAgg[N]()
f := newFilter[N](agg, testAttributeFilter)
require.IsType(t, &precomputedFilter[N]{}, f)
var (
powerLevel = attribute.Int("power-level", 9000)
user = attribute.String("user", "Alice")
admin = attribute.Bool("admin", true)
)
a := attribute.NewSet(powerLevel)
key := a
f.Aggregate(1, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(0), agg.values[key].filtered, str(a))
a = attribute.NewSet(powerLevel, user)
f.Aggregate(2, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(2), agg.values[key].filtered, str(a))
a = attribute.NewSet(powerLevel, user, admin)
f.Aggregate(3, a)
assert.Equal(t, N(1), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
a = attribute.NewSet(powerLevel)
f.Aggregate(2, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
a = attribute.NewSet(user)
f.Aggregate(3, a)
assert.Equal(t, N(2), agg.values[key].measured, str(a))
assert.Equal(t, N(5), agg.values[key].filtered, str(a))
assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a))
_ = f.Aggregation()
assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation")
}
}
func str(a attribute.Set) string {
iter := a.Iter()
out := make([]string, 0, iter.Len())
for iter.Next() {
kv := iter.Attribute()
out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface()))
}
return strings.Join(out, ",")
}
type testFilterAgg[N int64 | float64] struct {
values map[attribute.Set]precomputedValue[N]
aggregationN int
}
func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] {
return &testFilterAgg[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}
func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) {
v := a.values[attr]
v.measured = val
a.values[attr] = v
}
// nolint: unused // Used to agg filtered.
func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) {
v := a.values[attr]
v.filtered += val
a.values[attr] = v
}
func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation {
a.aggregationN++
return nil
}

View File

@ -150,63 +150,6 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
return out
}
// precomputedValue is the recorded measurement value for a set of attributes.
type precomputedValue[N int64 | float64] struct {
// measured is the last value measured for a set of attributes that were
// not filtered.
measured N
// filtered is the sum of values from measurements that had their
// attributes filtered.
filtered N
}
// precomputedMap is the storage for precomputed sums.
type precomputedMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]precomputedValue[N]
}
func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] {
return &precomputedMap[N]{
values: make(map[attribute.Set]precomputedValue[N]),
}
}
// Aggregate records value with the unfiltered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value overwrite
// that value.
// - If that measurement's attributes were filtered, this value will be
// recorded along side that value.
func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) {
s.Lock()
v := s.values[attr]
v.measured = value
s.values[attr] = v
s.Unlock()
}
// aggregateFiltered records value with the filtered attributes attr.
//
// If a previous measurement was made for the same attribute set:
//
// - If that measurement's attributes were not filtered, this value will be
// recorded along side that value.
// - If that measurement's attributes were filtered, this value will be
// added to it.
//
// This method should not be used if attr have not been reduced by an attribute
// filter.
func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered.
s.Lock()
v := s.values[attr]
v.filtered += value
s.values[attr] = v
s.Unlock()
}
// newPrecomputedDeltaSum returns an Aggregator that summarizes a set of
// pre-computed sums. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in.
@ -218,17 +161,17 @@ func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { //
// The output Aggregation will report recorded values as delta temporality.
func newPrecomputedDeltaSum[N int64 | float64](monotonic bool) aggregator[N] {
return &precomputedDeltaSum[N]{
precomputedMap: newPrecomputedMap[N](),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
valueMap: newValueMap[N](),
reported: make(map[attribute.Set]N),
monotonic: monotonic,
start: now(),
}
}
// precomputedDeltaSum summarizes a set of pre-computed sums recorded over all
// aggregation cycles as the delta of these sums.
type precomputedDeltaSum[N int64 | float64] struct {
*precomputedMap[N]
*valueMap[N]
reported map[attribute.Set]N
@ -263,15 +206,14 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
for attr, value := range s.values {
v := value.measured + value.filtered
delta := v - s.reported[attr]
delta := value - s.reported[attr]
out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{
Attributes: attr,
StartTime: s.start,
Time: t,
Value: delta,
})
newReported[attr] = v
newReported[attr] = value
// Unused attribute sets do not report.
delete(s.values, attr)
}
@ -294,15 +236,15 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
// temporality.
func newPrecomputedCumulativeSum[N int64 | float64](monotonic bool) aggregator[N] {
return &precomputedCumulativeSum[N]{
precomputedMap: newPrecomputedMap[N](),
monotonic: monotonic,
start: now(),
valueMap: newValueMap[N](),
monotonic: monotonic,
start: now(),
}
}
// precomputedCumulativeSum directly records and reports a set of pre-computed sums.
type precomputedCumulativeSum[N int64 | float64] struct {
*precomputedMap[N]
*valueMap[N]
monotonic bool
start time.Time
@ -337,7 +279,7 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
Attributes: attr,
StartTime: s.start,
Time: t,
Value: value.measured + value.filtered,
Value: value,
})
// Unused attribute sets do not report.
delete(s.values, attr)

View File

@ -37,6 +37,7 @@ func testSum[N int64 | float64](t *testing.T) {
MeasurementN: defaultMeasurements,
CycleN: defaultCycles,
}
totalMeasurements := defaultGoroutines * defaultMeasurements
t.Run("Delta", func(t *testing.T) {
incr, mono := monoIncr[N](), true
@ -60,21 +61,21 @@ func testSum[N int64 | float64](t *testing.T) {
t.Run("PreComputedDelta", func(t *testing.T) {
incr, mono := monoIncr[N](), true
eFunc := preDeltaExpecter[N](incr, mono)
eFunc := preDeltaExpecter[N](incr, mono, N(totalMeasurements))
t.Run("Monotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false
eFunc = preDeltaExpecter[N](incr, mono)
eFunc = preDeltaExpecter[N](incr, mono, N(totalMeasurements))
t.Run("NonMonotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc))
})
t.Run("PreComputedCumulative", func(t *testing.T) {
incr, mono := monoIncr[N](), true
eFunc := preCumuExpecter[N](incr, mono)
eFunc := preCumuExpecter[N](incr, mono, N(totalMeasurements))
t.Run("Monotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false
eFunc = preCumuExpecter[N](incr, mono)
eFunc = preCumuExpecter[N](incr, mono, N(totalMeasurements))
t.Run("NonMonotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc))
})
}
@ -103,26 +104,26 @@ func cumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
}
}
func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
last := make(map[attribute.Set]N)
return func(int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
l := last[a]
sum.DataPoints = append(sum.DataPoints, point(a, N(v)-l))
sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*(N(v)-l)))
last[a] = N(v)
}
return sum
}
}
func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono}
return func(int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point(a, N(v)))
sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*N(v)))
}
return sum
}
@ -167,118 +168,65 @@ func TestDeltaSumReset(t *testing.T) {
func TestPreComputedDeltaSum(t *testing.T) {
var mono bool
agg := newPrecomputedDeltaSum[int64](mono)
require.Implements(t, (*precomputeAggregator[int64])(nil), agg)
require.Implements(t, (*aggregator[int64])(nil), agg)
attrs := attribute.NewSet(attribute.String("key", "val"))
agg.Aggregate(1, attrs)
got := agg.Aggregation()
want := metricdata.Sum[int64]{
IsMonotonic: mono,
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)},
}
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt)
// No observation means no metric data
got = agg.Aggregation()
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
// No observation results in an empty aggregation, and causes previous
// observations to be forgotten.
metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt)
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
// measured(+): 1, previous(-): 1, filtered(+): 1
agg.Aggregate(1, attrs)
// measured(+): 1, previous(-): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt)
// Filtered values should not persist.
got = agg.Aggregation()
// No observation means no metric data
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
// Override set value.
// Duplicate observations add
agg.Aggregate(2, attrs)
agg.Aggregate(5, attrs)
// Filtered should add.
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
// measured(+): 5, previous(-): 0, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Filtered values should not persist.
agg.Aggregate(5, attrs)
got = agg.Aggregation()
// measured(+): 5, previous(-): 18, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Order should not affect measure.
// Filtered should add.
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.Aggregate(7, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
// measured(+): 7, previous(-): 5, filtered(+): 13
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
agg.Aggregate(7, attrs)
got = agg.Aggregation()
// measured(+): 7, previous(-): 20, filtered(+): 0
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
agg.Aggregate(3, attrs)
agg.Aggregate(10, attrs)
// measured(+): 20, previous(-): 1
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 19)}
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt)
}
func TestPreComputedCumulativeSum(t *testing.T) {
var mono bool
agg := newPrecomputedCumulativeSum[int64](mono)
require.Implements(t, (*precomputeAggregator[int64])(nil), agg)
require.Implements(t, (*aggregator[int64])(nil), agg)
attrs := attribute.NewSet(attribute.String("key", "val"))
agg.Aggregate(1, attrs)
got := agg.Aggregation()
want := metricdata.Sum[int64]{
IsMonotonic: mono,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)},
}
opt := metricdatatest.IgnoreTimestamp()
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt)
// Cumulative values should not persist.
got = agg.Aggregation()
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt)
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
got = agg.Aggregation()
agg.Aggregate(1, attrs)
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt)
// Filtered values should not persist.
got = agg.Aggregation()
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
// Override set value.
// Duplicate measurements add
agg.Aggregate(5, attrs)
// Filtered should add.
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
agg.Aggregate(3, attrs)
agg.Aggregate(10, attrs)
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
// Filtered values should not persist.
got = agg.Aggregation()
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
// Order should not affect measure.
// Filtered should add.
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
agg.Aggregate(7, attrs)
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
got = agg.Aggregation()
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)}
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt)
}
func TestEmptySumNilAggregation(t *testing.T) {

View File

@ -960,9 +960,6 @@ func TestGlobalInstRegisterCallback(t *testing.T) {
_, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr)
assert.NoError(t, err)
_, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr)
assert.NoError(t, err)
got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err)