You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
Metric SDK: Do not export non-observed attribute sets for async instruments (#4290)
* drop non-observed attribute sets * fix test comment * add documentation for async callbacks dropping unobserved attributes
This commit is contained in:
@@ -255,10 +255,12 @@ type precomputedDeltaSum[N int64 | float64] struct {
|
||||
// collection cycle, and the unfiltered-sum is kept for the next collection
|
||||
// cycle.
|
||||
func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
|
||||
newReported := make(map[attribute.Set]N)
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
if len(s.values) == 0 {
|
||||
s.reported = newReported
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -277,16 +279,12 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
|
||||
Time: t,
|
||||
Value: delta,
|
||||
})
|
||||
if delta != 0 {
|
||||
s.reported[attr] = v
|
||||
}
|
||||
value.filtered = N(0)
|
||||
s.values[attr] = value
|
||||
// TODO (#3006): This will use an unbounded amount of memory if there
|
||||
// are unbounded number of attribute sets being aggregated. Attribute
|
||||
// sets that become "stale" need to be forgotten so this will not
|
||||
// overload the system.
|
||||
newReported[attr] = v
|
||||
// Unused attribute sets do not report.
|
||||
delete(s.values, attr)
|
||||
}
|
||||
// Unused attribute sets are forgotten.
|
||||
s.reported = newReported
|
||||
// The delta collection cycle resets.
|
||||
s.start = t
|
||||
return out
|
||||
@@ -349,12 +347,8 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation {
|
||||
Time: t,
|
||||
Value: value.measured + value.filtered,
|
||||
})
|
||||
value.filtered = N(0)
|
||||
s.values[attr] = value
|
||||
// TODO (#3006): This will use an unbounded amount of memory if there
|
||||
// are unbounded number of attribute sets being aggregated. Attribute
|
||||
// sets that become "stale" need to be forgotten so this will not
|
||||
// overload the system.
|
||||
// Unused attribute sets do not report.
|
||||
delete(s.values, attr)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -180,10 +180,9 @@ func TestPreComputedDeltaSum(t *testing.T) {
|
||||
opt := metricdatatest.IgnoreTimestamp()
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
|
||||
// Delta values should zero.
|
||||
// No observation means no metric data
|
||||
got = agg.Aggregation()
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
|
||||
|
||||
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
|
||||
got = agg.Aggregation()
|
||||
@@ -193,13 +192,8 @@ func TestPreComputedDeltaSum(t *testing.T) {
|
||||
|
||||
// Filtered values should not persist.
|
||||
got = agg.Aggregation()
|
||||
// measured(+): 1, previous(-): 2, filtered(+): 0
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -1)}
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
got = agg.Aggregation()
|
||||
// measured(+): 1, previous(-): 1, filtered(+): 0
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 0)}
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
// No observation means no metric data
|
||||
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
|
||||
|
||||
// Override set value.
|
||||
agg.Aggregate(2, attrs)
|
||||
@@ -208,8 +202,8 @@ func TestPreComputedDeltaSum(t *testing.T) {
|
||||
agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs)
|
||||
agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs)
|
||||
got = agg.Aggregation()
|
||||
// measured(+): 5, previous(-): 1, filtered(+): 13
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 17)}
|
||||
// measured(+): 5, previous(-): 0, filtered(+): 13
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)}
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
|
||||
// Filtered values should not persist.
|
||||
@@ -251,19 +245,18 @@ func TestPreComputedCumulativeSum(t *testing.T) {
|
||||
opt := metricdatatest.IgnoreTimestamp()
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
|
||||
// Cumulative values should persist.
|
||||
// Cumulative values should not persist.
|
||||
got = agg.Aggregation()
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
|
||||
|
||||
agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs)
|
||||
got = agg.Aggregation()
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 2)}
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
|
||||
// Filtered values should not persist.
|
||||
got = agg.Aggregation()
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)}
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
|
||||
|
||||
// Override set value.
|
||||
agg.Aggregate(5, attrs)
|
||||
@@ -276,8 +269,7 @@ func TestPreComputedCumulativeSum(t *testing.T) {
|
||||
|
||||
// Filtered values should not persist.
|
||||
got = agg.Aggregation()
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 5)}
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
metricdatatest.AssertAggregationsEqual(t, nil, got, opt)
|
||||
|
||||
// Order should not affect measure.
|
||||
// Filtered should add.
|
||||
@@ -287,9 +279,6 @@ func TestPreComputedCumulativeSum(t *testing.T) {
|
||||
got = agg.Aggregation()
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)}
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
got = agg.Aggregation()
|
||||
want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 7)}
|
||||
metricdatatest.AssertAggregationsEqual(t, want, got, opt)
|
||||
}
|
||||
|
||||
func TestEmptySumNilAggregation(t *testing.T) {
|
||||
|
||||
+12
-2
@@ -107,6 +107,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
|
||||
// Int64ObservableCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// increasing int64 measurements once per a measurement collection cycle.
|
||||
// Only the measurements recorded during the collection cycle are exported.
|
||||
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
|
||||
cfg := metric.NewInt64ObservableCounterConfig(options...)
|
||||
const kind = InstrumentKindObservableCounter
|
||||
@@ -121,7 +122,8 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
|
||||
|
||||
// Int64ObservableUpDownCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// int64 measurements once per a measurement collection cycle.
|
||||
// int64 measurements once per a measurement collection cycle. Only the
|
||||
// measurements recorded during the collection cycle are exported.
|
||||
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
|
||||
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
|
||||
const kind = InstrumentKindObservableUpDownCounter
|
||||
@@ -137,6 +139,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
|
||||
// Int64ObservableGauge returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// instantaneous int64 measurements once per a measurement collection cycle.
|
||||
// Only the measurements recorded during the collection cycle are exported.
|
||||
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
|
||||
cfg := metric.NewInt64ObservableGaugeConfig(options...)
|
||||
const kind = InstrumentKindObservableGauge
|
||||
@@ -194,6 +197,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
|
||||
// Float64ObservableCounter returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// increasing float64 measurements once per a measurement collection cycle.
|
||||
// Only the measurements recorded during the collection cycle are exported.
|
||||
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
|
||||
cfg := metric.NewFloat64ObservableCounterConfig(options...)
|
||||
const kind = InstrumentKindObservableCounter
|
||||
@@ -208,7 +212,8 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
|
||||
|
||||
// Float64ObservableUpDownCounter returns a new instrument identified by name
|
||||
// and configured with options. The instrument is used to asynchronously record
|
||||
// float64 measurements once per a measurement collection cycle.
|
||||
// float64 measurements once per a measurement collection cycle. Only the
|
||||
// measurements recorded during the collection cycle are exported.
|
||||
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
|
||||
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
|
||||
const kind = InstrumentKindObservableUpDownCounter
|
||||
@@ -224,6 +229,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
|
||||
// Float64ObservableGauge returns a new instrument identified by name and
|
||||
// configured with options. The instrument is used to asynchronously record
|
||||
// instantaneous float64 measurements once per a measurement collection cycle.
|
||||
// Only the measurements recorded during the collection cycle are exported.
|
||||
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
|
||||
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
|
||||
const kind = InstrumentKindObservableGauge
|
||||
@@ -272,6 +278,10 @@ func isAlphanumeric(c rune) bool {
|
||||
// Only instruments from this meter can be registered with f, an error is
|
||||
// returned if other instrument are provided.
|
||||
//
|
||||
// Only observations made in the callback will be exported. Unlike synchronous
|
||||
// instruments, asynchronous callbacks can "forget" attribute sets that are no
|
||||
// longer relevant by omitting the observation during the callback.
|
||||
//
|
||||
// The returned Registration can be used to unregister f.
|
||||
func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
|
||||
if len(insts) == 0 {
|
||||
|
||||
@@ -1687,8 +1687,7 @@ func TestObservableExample(t *testing.T) {
|
||||
Temporality: temporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
// Thread 1 remains at last measured value.
|
||||
{Attributes: thread1, Value: 60},
|
||||
// Thread 1 is no longer exported.
|
||||
{Attributes: thread2, Value: 53},
|
||||
{Attributes: thread3, Value: 5},
|
||||
},
|
||||
@@ -1762,8 +1761,7 @@ func TestObservableExample(t *testing.T) {
|
||||
Temporality: temporality,
|
||||
IsMonotonic: true,
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
// Thread 1 remains at last measured value.
|
||||
{Attributes: thread1, Value: 0},
|
||||
// Thread 1 is no longer exported.
|
||||
{Attributes: thread2, Value: 6},
|
||||
{Attributes: thread3, Value: 5},
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user