You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-06-27 00:21:15 +02:00
Fix Asynchronous Counters Recording (#3350)
* Update Asynchronous API docs Clarify the Counter and UpDownCounter Observe values are the exact counter value, not increments to the previous measurements. * Add the pre-computed sum Aggregator * Test the PreComputedSum * Use the PrecomputedSum for async counters * Add changes to changelog * Ignore false-positive lint error * Split NewPrecomputedSum into delta/cumulative vers
This commit is contained in:
@ -28,6 +28,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
|||||||
- Slice attributes of `attribute` package are now comparable based on their value, not instance. (#3108 #3252)
|
- Slice attributes of `attribute` package are now comparable based on their value, not instance. (#3108 #3252)
|
||||||
- Prometheus exporter will now cumulatively sum histogram buckets. (#3281)
|
- Prometheus exporter will now cumulatively sum histogram buckets. (#3281)
|
||||||
- Export the sum of each histogram datapoint uniquely with the `go.opentelemetry.io/otel/exporters/otlpmetric` exporters. (#3284, #3293)
|
- Export the sum of each histogram datapoint uniquely with the `go.opentelemetry.io/otel/exporters/otlpmetric` exporters. (#3284, #3293)
|
||||||
|
- Recorded values for asynchronous counters (`Counter` and `UpDownCounter`) are interpreted as exact, not incremental, sum values by the metric SDK. (#3350, #3278)
|
||||||
- UpDownCounters are now correctly output as prometheus gauges in the `go.opentelemetry.io/otel/exporters/prometheus` exporter. (#3358)
|
- UpDownCounters are now correctly output as prometheus gauges in the `go.opentelemetry.io/otel/exporters/prometheus` exporter. (#3358)
|
||||||
|
|
||||||
## [1.11.0/0.32.3] 2022-10-12
|
## [1.11.0/0.32.3] 2022-10-12
|
||||||
|
@ -35,7 +35,8 @@ type InstrumentProvider interface {
|
|||||||
|
|
||||||
// Counter is an instrument that records increasing values.
|
// Counter is an instrument that records increasing values.
|
||||||
type Counter interface {
|
type Counter interface {
|
||||||
// Observe records the state of the instrument.
|
// Observe records the state of the instrument to be x. The value of x is
|
||||||
|
// assumed to be the exact Counter value to record.
|
||||||
//
|
//
|
||||||
// It is only valid to call this within a callback. If called outside of the
|
// It is only valid to call this within a callback. If called outside of the
|
||||||
// registered callback it should have no effect on the instrument, and an
|
// registered callback it should have no effect on the instrument, and an
|
||||||
@ -47,7 +48,8 @@ type Counter interface {
|
|||||||
|
|
||||||
// UpDownCounter is an instrument that records increasing or decreasing values.
|
// UpDownCounter is an instrument that records increasing or decreasing values.
|
||||||
type UpDownCounter interface {
|
type UpDownCounter interface {
|
||||||
// Observe records the state of the instrument.
|
// Observe records the state of the instrument to be x. The value of x is
|
||||||
|
// assumed to be the exact UpDownCounter value to record.
|
||||||
//
|
//
|
||||||
// It is only valid to call this within a callback. If called outside of the
|
// It is only valid to call this within a callback. If called outside of the
|
||||||
// registered callback it should have no effect on the instrument, and an
|
// registered callback it should have no effect on the instrument, and an
|
||||||
@ -59,7 +61,7 @@ type UpDownCounter interface {
|
|||||||
|
|
||||||
// Gauge is an instrument that records independent readings.
|
// Gauge is an instrument that records independent readings.
|
||||||
type Gauge interface {
|
type Gauge interface {
|
||||||
// Observe records the state of the instrument.
|
// Observe records the state of the instrument to be x.
|
||||||
//
|
//
|
||||||
// It is only valid to call this within a callback. If called outside of the
|
// It is only valid to call this within a callback. If called outside of the
|
||||||
// registered callback it should have no effect on the instrument, and an
|
// registered callback it should have no effect on the instrument, and an
|
||||||
|
@ -35,7 +35,8 @@ type InstrumentProvider interface {
|
|||||||
|
|
||||||
// Counter is an instrument that records increasing values.
|
// Counter is an instrument that records increasing values.
|
||||||
type Counter interface {
|
type Counter interface {
|
||||||
// Observe records the state of the instrument.
|
// Observe records the state of the instrument to be x. The value of x is
|
||||||
|
// assumed to be the exact Counter value to record.
|
||||||
//
|
//
|
||||||
// It is only valid to call this within a callback. If called outside of the
|
// It is only valid to call this within a callback. If called outside of the
|
||||||
// registered callback it should have no effect on the instrument, and an
|
// registered callback it should have no effect on the instrument, and an
|
||||||
@ -47,7 +48,8 @@ type Counter interface {
|
|||||||
|
|
||||||
// UpDownCounter is an instrument that records increasing or decreasing values.
|
// UpDownCounter is an instrument that records increasing or decreasing values.
|
||||||
type UpDownCounter interface {
|
type UpDownCounter interface {
|
||||||
// Observe records the state of the instrument.
|
// Observe records the state of the instrument to be x. The value of x is
|
||||||
|
// assumed to be the exact UpDownCounter value to record.
|
||||||
//
|
//
|
||||||
// It is only valid to call this within a callback. If called outside of the
|
// It is only valid to call this within a callback. If called outside of the
|
||||||
// registered callback it should have no effect on the instrument, and an
|
// registered callback it should have no effect on the instrument, and an
|
||||||
@ -59,7 +61,7 @@ type UpDownCounter interface {
|
|||||||
|
|
||||||
// Gauge is an instrument that records independent readings.
|
// Gauge is an instrument that records independent readings.
|
||||||
type Gauge interface {
|
type Gauge interface {
|
||||||
// Observe records the state of the instrument.
|
// Observe records the state of the instrument to be x.
|
||||||
//
|
//
|
||||||
// It is only valid to call this within a callback. If called outside of the
|
// It is only valid to call this within a callback. If called outside of the
|
||||||
// registered callback it should have no effect on the instrument, and an
|
// registered callback it should have no effect on the instrument, and an
|
||||||
|
@ -32,6 +32,12 @@ func newValueMap[N int64 | float64]() *valueMap[N] {
|
|||||||
return &valueMap[N]{values: make(map[attribute.Set]N)}
|
return &valueMap[N]{values: make(map[attribute.Set]N)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *valueMap[N]) set(value N, attr attribute.Set) { // nolint: unused // This is indeed used.
|
||||||
|
s.Lock()
|
||||||
|
s.values[attr] = value
|
||||||
|
s.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
|
func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
s.values[attr] += value
|
s.values[attr] += value
|
||||||
@ -49,6 +55,10 @@ func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
|
|||||||
// Each aggregation cycle is treated independently. When the returned
|
// Each aggregation cycle is treated independently. When the returned
|
||||||
// Aggregator's Aggregation method is called it will reset all sums to zero.
|
// Aggregator's Aggregation method is called it will reset all sums to zero.
|
||||||
func NewDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
|
func NewDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
|
||||||
|
return newDeltaSum[N](monotonic)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDeltaSum[N int64 | float64](monotonic bool) *deltaSum[N] {
|
||||||
return &deltaSum[N]{
|
return &deltaSum[N]{
|
||||||
valueMap: newValueMap[N](),
|
valueMap: newValueMap[N](),
|
||||||
monotonic: monotonic,
|
monotonic: monotonic,
|
||||||
@ -106,6 +116,10 @@ func (s *deltaSum[N]) Aggregation() metricdata.Aggregation {
|
|||||||
// Each aggregation cycle is treated independently. When the returned
|
// Each aggregation cycle is treated independently. When the returned
|
||||||
// Aggregator's Aggregation method is called it will reset all sums to zero.
|
// Aggregator's Aggregation method is called it will reset all sums to zero.
|
||||||
func NewCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
|
func NewCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
|
||||||
|
return newCumulativeSum[N](monotonic)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCumulativeSum[N int64 | float64](monotonic bool) *cumulativeSum[N] {
|
||||||
return &cumulativeSum[N]{
|
return &cumulativeSum[N]{
|
||||||
valueMap: newValueMap[N](),
|
valueMap: newValueMap[N](),
|
||||||
monotonic: monotonic,
|
monotonic: monotonic,
|
||||||
@ -151,3 +165,47 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation {
|
|||||||
}
|
}
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of
|
||||||
|
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
|
||||||
|
// attributes and the aggregation cycle the measurements were made in.
|
||||||
|
//
|
||||||
|
// The monotonic value is used to communicate the produced Aggregation is
|
||||||
|
// monotonic or not. The returned Aggregator does not make any guarantees this
|
||||||
|
// value is accurate. It is up to the caller to ensure it.
|
||||||
|
//
|
||||||
|
// The output Aggregation will report recorded values as delta temporality. It
|
||||||
|
// is up to the caller to ensure this is accurate.
|
||||||
|
func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] {
|
||||||
|
return &precomputedSum[N]{settableSum: newDeltaSum[N](monotonic)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of
|
||||||
|
// measurements as their pre-computed arithmetic sum. Each sum is scoped by
|
||||||
|
// attributes and the aggregation cycle the measurements were made in.
|
||||||
|
//
|
||||||
|
// The monotonic value is used to communicate the produced Aggregation is
|
||||||
|
// monotonic or not. The returned Aggregator does not make any guarantees this
|
||||||
|
// value is accurate. It is up to the caller to ensure it.
|
||||||
|
//
|
||||||
|
// The output Aggregation will report recorded values as cumulative
|
||||||
|
// temporality. It is up to the caller to ensure this is accurate.
|
||||||
|
func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] {
|
||||||
|
return &precomputedSum[N]{settableSum: newCumulativeSum[N](monotonic)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type settableSum[N int64 | float64] interface {
|
||||||
|
set(value N, attr attribute.Set)
|
||||||
|
Aggregation() metricdata.Aggregation
|
||||||
|
}
|
||||||
|
|
||||||
|
// precomputedSum summarizes a set of measurements recorded over all
|
||||||
|
// aggregation cycles directly as an arithmetic sum.
|
||||||
|
type precomputedSum[N int64 | float64] struct {
|
||||||
|
settableSum[N]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Aggregate records value directly as a sum for attr.
|
||||||
|
func (s *precomputedSum[N]) Aggregate(value N, attr attribute.Set) {
|
||||||
|
s.set(value, attr)
|
||||||
|
}
|
||||||
|
@ -54,6 +54,26 @@ func testSum[N int64 | float64](t *testing.T) {
|
|||||||
eFunc = cumuExpecter[N](incr, mono)
|
eFunc = cumuExpecter[N](incr, mono)
|
||||||
t.Run("NonMonotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc))
|
t.Run("NonMonotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("PreComputedDelta", func(t *testing.T) {
|
||||||
|
incr, mono, temp := monoIncr, true, metricdata.DeltaTemporality
|
||||||
|
eFunc := preExpecter[N](incr, mono, temp)
|
||||||
|
t.Run("Monotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc))
|
||||||
|
|
||||||
|
incr, mono = nonMonoIncr, false
|
||||||
|
eFunc = preExpecter[N](incr, mono, temp)
|
||||||
|
t.Run("NonMonotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("PreComputedCumulative", func(t *testing.T) {
|
||||||
|
incr, mono, temp := monoIncr, true, metricdata.CumulativeTemporality
|
||||||
|
eFunc := preExpecter[N](incr, mono, temp)
|
||||||
|
t.Run("Monotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc))
|
||||||
|
|
||||||
|
incr, mono = nonMonoIncr, false
|
||||||
|
eFunc = preExpecter[N](incr, mono, temp)
|
||||||
|
t.Run("NonMonotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
|
func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
|
||||||
@ -61,7 +81,7 @@ func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
|
|||||||
return func(m int) metricdata.Aggregation {
|
return func(m int) metricdata.Aggregation {
|
||||||
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
|
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
|
||||||
for a, v := range incr {
|
for a, v := range incr {
|
||||||
sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*m)))
|
sum.DataPoints = append(sum.DataPoints, point(a, N(v*m)))
|
||||||
}
|
}
|
||||||
return sum
|
return sum
|
||||||
}
|
}
|
||||||
@ -74,7 +94,18 @@ func cumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
|
|||||||
cycle++
|
cycle++
|
||||||
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
|
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
|
||||||
for a, v := range incr {
|
for a, v := range incr {
|
||||||
sum.DataPoints = append(sum.DataPoints, point[N](a, N(v*cycle*m)))
|
sum.DataPoints = append(sum.DataPoints, point(a, N(v*cycle*m)))
|
||||||
|
}
|
||||||
|
return sum
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func preExpecter[N int64 | float64](incr setMap, mono bool, temp metricdata.Temporality) expectFunc {
|
||||||
|
sum := metricdata.Sum[N]{Temporality: temp, IsMonotonic: mono}
|
||||||
|
return func(int) metricdata.Aggregation {
|
||||||
|
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
|
||||||
|
for a, v := range incr {
|
||||||
|
sum.DataPoints = append(sum.DataPoints, point(a, N(v)))
|
||||||
}
|
}
|
||||||
return sum
|
return sum
|
||||||
}
|
}
|
||||||
|
@ -266,7 +266,7 @@ func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (inter
|
|||||||
// still be applied and a warning should be logged.
|
// still be applied and a warning should be logged.
|
||||||
i.logConflict(id)
|
i.logConflict(id)
|
||||||
return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
|
return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
|
||||||
agg, err := i.aggregator(inst.Aggregation, id.Temporality, id.Monotonic)
|
agg, err := i.aggregator(inst.Aggregation, inst.Kind, id.Temporality, id.Monotonic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -322,16 +322,31 @@ func (i *inserter[N]) instrumentID(vi view.Instrument, u unit.Unit) instrumentID
|
|||||||
return id
|
return id
|
||||||
}
|
}
|
||||||
|
|
||||||
// aggregator returns a new Aggregator matching agg, temporality, and
|
// aggregator returns a new Aggregator matching agg, kind, temporality, and
|
||||||
// monotonic. If the agg is unknown or temporality is invalid, an error is
|
// monotonic. If the agg is unknown or temporality is invalid, an error is
|
||||||
// returned.
|
// returned.
|
||||||
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
|
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind view.InstrumentKind, temporality metricdata.Temporality, monotonic bool) (internal.Aggregator[N], error) {
|
||||||
switch a := agg.(type) {
|
switch a := agg.(type) {
|
||||||
case aggregation.Drop:
|
case aggregation.Drop:
|
||||||
return nil, nil
|
return nil, nil
|
||||||
case aggregation.LastValue:
|
case aggregation.LastValue:
|
||||||
return internal.NewLastValue[N](), nil
|
return internal.NewLastValue[N](), nil
|
||||||
case aggregation.Sum:
|
case aggregation.Sum:
|
||||||
|
switch kind {
|
||||||
|
case view.AsyncCounter, view.AsyncUpDownCounter:
|
||||||
|
// Asynchronous counters and up-down-counters are defined to record
|
||||||
|
// the absolute value of the count:
|
||||||
|
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#asynchronous-counter-creation
|
||||||
|
switch temporality {
|
||||||
|
case metricdata.CumulativeTemporality:
|
||||||
|
return internal.NewPrecomputedCumulativeSum[N](monotonic), nil
|
||||||
|
case metricdata.DeltaTemporality:
|
||||||
|
return internal.NewPrecomputedDeltaSum[N](monotonic), nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
switch temporality {
|
switch temporality {
|
||||||
case metricdata.CumulativeTemporality:
|
case metricdata.CumulativeTemporality:
|
||||||
return internal.NewCumulativeSum[N](monotonic), nil
|
return internal.NewCumulativeSum[N](monotonic), nil
|
||||||
|
@ -107,7 +107,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
|
|||||||
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
|
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
|
||||||
views: []view.View{defaultAggView},
|
views: []view.View{defaultAggView},
|
||||||
inst: instruments[view.AsyncCounter],
|
inst: instruments[view.AsyncCounter],
|
||||||
wantKind: internal.NewDeltaSum[N](true),
|
wantKind: internal.NewPrecomputedDeltaSum[N](true),
|
||||||
wantLen: 1,
|
wantLen: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -115,7 +115,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
|
|||||||
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
|
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
|
||||||
views: []view.View{defaultAggView},
|
views: []view.View{defaultAggView},
|
||||||
inst: instruments[view.AsyncUpDownCounter],
|
inst: instruments[view.AsyncUpDownCounter],
|
||||||
wantKind: internal.NewDeltaSum[N](false),
|
wantKind: internal.NewPrecomputedDeltaSum[N](false),
|
||||||
wantLen: 1,
|
wantLen: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -155,7 +155,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
|
|||||||
reader: NewManualReader(),
|
reader: NewManualReader(),
|
||||||
views: []view.View{{}},
|
views: []view.View{{}},
|
||||||
inst: instruments[view.AsyncCounter],
|
inst: instruments[view.AsyncCounter],
|
||||||
wantKind: internal.NewCumulativeSum[N](true),
|
wantKind: internal.NewPrecomputedCumulativeSum[N](true),
|
||||||
wantLen: 1,
|
wantLen: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -163,7 +163,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
|
|||||||
reader: NewManualReader(),
|
reader: NewManualReader(),
|
||||||
views: []view.View{{}},
|
views: []view.View{{}},
|
||||||
inst: instruments[view.AsyncUpDownCounter],
|
inst: instruments[view.AsyncUpDownCounter],
|
||||||
wantKind: internal.NewCumulativeSum[N](false),
|
wantKind: internal.NewPrecomputedCumulativeSum[N](false),
|
||||||
wantLen: 1,
|
wantLen: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user