1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-03 22:52:30 +02:00

sdk/metric: Fix observable not registered error when the asynchronous instrument has a drop aggregation (#4772)

* Fix observable instrument not registered on drop aggregation

* Add TestObservableDropAggregation

* Add testcase for dropping unregistered observable

* Update CHANGELOG

* Add observable name const + suggestions

* Add suggestions

* Only error if the instrument is not dropped

* Decrease indentation

* Revert "Decrease indentation"

This reverts commit 9e7e7729bf.

---------

Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
This commit is contained in:
CZ 2024-01-12 12:27:40 +13:00 committed by GitHub
parent 01472db75f
commit 7fa7d1b252
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 231 additions and 14 deletions

View File

@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Fix `Parse` in `go.opentelemetry.io/otel/baggage` to validate member value before percent-decoding. (#4755)
- Fix whitespace encoding of `Member.String` in `go.opentelemetry.io/otel/baggage`. (#4756)
- Fix observable not registered error when the asynchronous instrument has a drop aggregation in `go.opentelemetry.io/otel/sdk/metric`. (#4772)
- Fix baggage item key so that it is not canonicalized in `go.opentelemetry.io/otel/bridge/opentracing`. (#4776)
- Fix `go.opentelemetry.io/otel/bridge/opentracing` to properly handle baggage values that requires escaping during propagation. (#4804)

View File

@ -295,8 +295,9 @@ type observable[N int64 | float64] struct {
metric.Observable
observablID[N]
meter *meter
measures measures[N]
meter *meter
measures measures[N]
dropAggregation bool
}
func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string) *observable[N] {

View File

@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)
@ -117,6 +118,7 @@ func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int6
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
@ -233,6 +235,7 @@ func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Fl
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
@ -437,12 +440,14 @@ func (r observer) ObserveFloat64(o metric.Float64Observable, v float64, opts ...
}
if _, registered := r.float64[oImpl.observablID]; !registered {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
"description", oImpl.description,
"unit", oImpl.unit,
"number", fmt.Sprintf("%T", float64(0)),
)
if !oImpl.dropAggregation {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
"description", oImpl.description,
"unit", oImpl.unit,
"number", fmt.Sprintf("%T", float64(0)),
)
}
return
}
c := metric.NewObserveConfig(opts)
@ -470,12 +475,14 @@ func (r observer) ObserveInt64(o metric.Int64Observable, v int64, opts ...metric
}
if _, registered := r.int64[oImpl.observablID]; !registered {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
"description", oImpl.description,
"unit", oImpl.unit,
"number", fmt.Sprintf("%T", int64(0)),
)
if !oImpl.dropAggregation {
global.Error(errUnregObserver, "failed to record",
"name", oImpl.name,
"description", oImpl.description,
"unit", oImpl.unit,
"number", fmt.Sprintf("%T", int64(0)),
)
}
return
}
c := metric.NewObserveConfig(opts)

View File

@ -16,6 +16,7 @@ package metric
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
@ -23,6 +24,7 @@ import (
"testing"
"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
"github.com/go-logr/logr/testr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -2064,3 +2066,209 @@ func TestHistogramBucketPrecedenceOrdering(t *testing.T) {
})
}
}
func TestObservableDropAggregation(t *testing.T) {
const (
intPrefix = "observable.int64."
intCntName = "observable.int64.counter"
intUDCntName = "observable.int64.up.down.counter"
intGaugeName = "observable.int64.gauge"
floatPrefix = "observable.float64."
floatCntName = "observable.float64.counter"
floatUDCntName = "observable.float64.up.down.counter"
floatGaugeName = "observable.float64.gauge"
unregPrefix = "unregistered.observable."
unregIntCntName = "unregistered.observable.int64.counter"
unregFloatCntName = "unregistered.observable.float64.counter"
)
type log struct {
name string
number string
}
testcases := []struct {
name string
views []View
wantObservables []string
wantUnregLogs []log
}{
{
name: "default",
views: nil,
wantObservables: []string{
intCntName, intUDCntName, intGaugeName,
floatCntName, floatUDCntName, floatGaugeName,
},
wantUnregLogs: []log{
{
name: unregIntCntName,
number: "int64",
},
{
name: unregFloatCntName,
number: "float64",
},
},
},
{
name: "drop all metrics",
views: []View{
func(i Instrument) (Stream, bool) {
return Stream{Aggregation: AggregationDrop{}}, true
},
},
wantObservables: nil,
wantUnregLogs: nil,
},
{
name: "drop float64 observable",
views: []View{
func(i Instrument) (Stream, bool) {
if strings.HasPrefix(i.Name, floatPrefix) {
return Stream{Aggregation: AggregationDrop{}}, true
}
return Stream{}, false
},
},
wantObservables: []string{
intCntName, intUDCntName, intGaugeName,
},
wantUnregLogs: []log{
{
name: unregIntCntName,
number: "int64",
},
{
name: unregFloatCntName,
number: "float64",
},
},
},
{
name: "drop int64 observable",
views: []View{
func(i Instrument) (Stream, bool) {
if strings.HasPrefix(i.Name, intPrefix) {
return Stream{Aggregation: AggregationDrop{}}, true
}
return Stream{}, false
},
},
wantObservables: []string{
floatCntName, floatUDCntName, floatGaugeName,
},
wantUnregLogs: []log{
{
name: unregIntCntName,
number: "int64",
},
{
name: unregFloatCntName,
number: "float64",
},
},
},
{
name: "drop unregistered observable",
views: []View{
func(i Instrument) (Stream, bool) {
if strings.HasPrefix(i.Name, unregPrefix) {
return Stream{Aggregation: AggregationDrop{}}, true
}
return Stream{}, false
},
},
wantObservables: []string{
intCntName, intUDCntName, intGaugeName,
floatCntName, floatUDCntName, floatGaugeName,
},
wantUnregLogs: nil,
},
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var unregLogs []log
otel.SetLogger(
funcr.NewJSON(
func(obj string) {
var entry map[string]interface{}
_ = json.Unmarshal([]byte(obj), &entry)
// All unregistered observables should log `errUnregObserver` error.
// A observable with drop aggregation is also unregistered,
// however this is expected and should not log an error.
assert.Equal(t, errUnregObserver.Error(), entry["error"])
unregLogs = append(unregLogs, log{
name: fmt.Sprintf("%v", entry["name"]),
number: fmt.Sprintf("%v", entry["number"]),
})
},
funcr.Options{Verbosity: 0},
),
)
defer otel.SetLogger(logr.Discard())
reader := NewManualReader()
meter := NewMeterProvider(WithView(tt.views...), WithReader(reader)).Meter("TestObservableDropAggregation")
intCnt, err := meter.Int64ObservableCounter(intCntName)
require.NoError(t, err)
intUDCnt, err := meter.Int64ObservableUpDownCounter(intUDCntName)
require.NoError(t, err)
intGaugeCnt, err := meter.Int64ObservableGauge(intGaugeName)
require.NoError(t, err)
floatCnt, err := meter.Float64ObservableCounter(floatCntName)
require.NoError(t, err)
floatUDCnt, err := meter.Float64ObservableUpDownCounter(floatUDCntName)
require.NoError(t, err)
floatGaugeCnt, err := meter.Float64ObservableGauge(floatGaugeName)
require.NoError(t, err)
unregIntCnt, err := meter.Int64ObservableCounter(unregIntCntName)
require.NoError(t, err)
unregFloatCnt, err := meter.Float64ObservableCounter(unregFloatCntName)
require.NoError(t, err)
_, err = meter.RegisterCallback(
func(ctx context.Context, obs metric.Observer) error {
obs.ObserveInt64(intCnt, 1)
obs.ObserveInt64(intUDCnt, 1)
obs.ObserveInt64(intGaugeCnt, 1)
obs.ObserveFloat64(floatCnt, 1)
obs.ObserveFloat64(floatUDCnt, 1)
obs.ObserveFloat64(floatGaugeCnt, 1)
// We deliberately call observe to unregistered observables
obs.ObserveInt64(unregIntCnt, 1)
obs.ObserveFloat64(unregFloatCnt, 1)
return nil
},
intCnt, intUDCnt, intGaugeCnt,
floatCnt, floatUDCnt, floatGaugeCnt,
// We deliberately do not register `unregIntCnt` and `unregFloatCnt`
// to test that `errUnregObserver` is logged when observed by callback.
)
require.NoError(t, err)
var rm metricdata.ResourceMetrics
err = reader.Collect(context.Background(), &rm)
require.NoError(t, err)
if len(tt.wantObservables) == 0 {
require.Len(t, rm.ScopeMetrics, 0)
return
}
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, len(tt.wantObservables))
for i, m := range rm.ScopeMetrics[0].Metrics {
assert.Equal(t, tt.wantObservables[i], m.Name)
}
assert.Equal(t, tt.wantUnregLogs, unregLogs)
})
}
}