You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
Restrict Meters to only register and collect instruments it created (#4333)
* Add acceptance test * Update Meter Register and collect only inst from itself * Add change to changelog * Fix spelling error * Update changelog entry wording * Simplify the partial success code path
This commit is contained in:
@@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
|
||||
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)
|
||||
- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332)
|
||||
- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333)
|
||||
|
||||
### Fixed
|
||||
|
||||
|
||||
@@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{}
|
||||
var _ metric.Float64ObservableUpDownCounter = float64Observable{}
|
||||
var _ metric.Float64ObservableGauge = float64Observable{}
|
||||
|
||||
func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
|
||||
func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
|
||||
return float64Observable{
|
||||
observable: newObservable(scope, kind, name, desc, u, meas),
|
||||
observable: newObservable(m, kind, name, desc, u, meas),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,9 +296,9 @@ var _ metric.Int64ObservableCounter = int64Observable{}
|
||||
var _ metric.Int64ObservableUpDownCounter = int64Observable{}
|
||||
var _ metric.Int64ObservableGauge = int64Observable{}
|
||||
|
||||
func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
|
||||
func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
|
||||
return int64Observable{
|
||||
observable: newObservable(scope, kind, name, desc, u, meas),
|
||||
observable: newObservable(m, kind, name, desc, u, meas),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -306,18 +306,20 @@ type observable[N int64 | float64] struct {
|
||||
metric.Observable
|
||||
observablID[N]
|
||||
|
||||
meter *meter
|
||||
measures []aggregate.Measure[N]
|
||||
}
|
||||
|
||||
func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
|
||||
func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
|
||||
return &observable[N]{
|
||||
observablID: observablID[N]{
|
||||
name: name,
|
||||
description: desc,
|
||||
kind: kind,
|
||||
unit: u,
|
||||
scope: scope,
|
||||
scope: m.scope,
|
||||
},
|
||||
meter: m,
|
||||
measures: meas,
|
||||
}
|
||||
}
|
||||
@@ -335,16 +337,16 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument")
|
||||
// and nil if it should. An errEmptyAgg error is returned if o is effectively a
|
||||
// no-op because it does not have any aggregators. Also, an error is returned
|
||||
// if scope defines a Meter other than the one o was created by.
|
||||
func (o *observable[N]) registerable(scope instrumentation.Scope) error {
|
||||
func (o *observable[N]) registerable(m *meter) error {
|
||||
if len(o.measures) == 0 {
|
||||
return errEmptyAgg
|
||||
}
|
||||
if scope != o.scope {
|
||||
if m != o.meter {
|
||||
return fmt.Errorf(
|
||||
"invalid registration: observable %q from Meter %q, registered with Meter %q",
|
||||
o.name,
|
||||
o.scope.Name,
|
||||
scope.Name,
|
||||
m.scope.Name,
|
||||
)
|
||||
}
|
||||
return nil
|
||||
|
||||
+46
-60
@@ -42,8 +42,8 @@ type meter struct {
|
||||
scope instrumentation.Scope
|
||||
pipes pipelines
|
||||
|
||||
int64IP *int64InstProvider
|
||||
float64IP *float64InstProvider
|
||||
int64Resolver resolver[int64]
|
||||
float64Resolver resolver[float64]
|
||||
}
|
||||
|
||||
func newMeter(s instrumentation.Scope, p pipelines) *meter {
|
||||
@@ -52,10 +52,10 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
|
||||
var viewCache cache[string, streamID]
|
||||
|
||||
return &meter{
|
||||
scope: s,
|
||||
pipes: p,
|
||||
int64IP: newInt64InstProvider(s, p, &viewCache),
|
||||
float64IP: newFloat64InstProvider(s, p, &viewCache),
|
||||
scope: s,
|
||||
pipes: p,
|
||||
int64Resolver: newResolver[int64](p, &viewCache),
|
||||
float64Resolver: newResolver[float64](p, &viewCache),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,7 +68,8 @@ var _ metric.Meter = (*meter)(nil)
|
||||
func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
|
||||
cfg := metric.NewInt64CounterConfig(options...)
|
||||
const kind = InstrumentKindCounter
|
||||
i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
p := int64InstProvider{m}
|
||||
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
@@ -82,7 +83,8 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption)
|
||||
func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
|
||||
cfg := metric.NewInt64UpDownCounterConfig(options...)
|
||||
const kind = InstrumentKindUpDownCounter
|
||||
i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
p := int64InstProvider{m}
|
||||
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
@@ -96,7 +98,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
|
||||
func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
|
||||
cfg := metric.NewInt64HistogramConfig(options...)
|
||||
const kind = InstrumentKindHistogram
|
||||
i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
p := int64InstProvider{m}
|
||||
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
@@ -111,7 +114,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
|
||||
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
|
||||
cfg := metric.NewInt64ObservableCounterConfig(options...)
|
||||
const kind = InstrumentKindObservableCounter
|
||||
p := int64ObservProvider{m.int64IP}
|
||||
p := int64ObservProvider{m}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -127,7 +130,7 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
|
||||
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
|
||||
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
|
||||
const kind = InstrumentKindObservableUpDownCounter
|
||||
p := int64ObservProvider{m.int64IP}
|
||||
p := int64ObservProvider{m}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -143,7 +146,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
|
||||
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
|
||||
cfg := metric.NewInt64ObservableGaugeConfig(options...)
|
||||
const kind = InstrumentKindObservableGauge
|
||||
p := int64ObservProvider{m.int64IP}
|
||||
p := int64ObservProvider{m}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -158,7 +161,8 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa
|
||||
func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
|
||||
cfg := metric.NewFloat64CounterConfig(options...)
|
||||
const kind = InstrumentKindCounter
|
||||
i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
p := float64InstProvider{m}
|
||||
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
@@ -172,7 +176,8 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti
|
||||
func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
|
||||
cfg := metric.NewFloat64UpDownCounterConfig(options...)
|
||||
const kind = InstrumentKindUpDownCounter
|
||||
i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
p := float64InstProvider{m}
|
||||
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
@@ -186,7 +191,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
|
||||
func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
|
||||
cfg := metric.NewFloat64HistogramConfig(options...)
|
||||
const kind = InstrumentKindHistogram
|
||||
i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
p := float64InstProvider{m}
|
||||
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return i, err
|
||||
}
|
||||
@@ -201,7 +207,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
|
||||
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
|
||||
cfg := metric.NewFloat64ObservableCounterConfig(options...)
|
||||
const kind = InstrumentKindObservableCounter
|
||||
p := float64ObservProvider{m.float64IP}
|
||||
p := float64ObservProvider{m}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -217,7 +223,7 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
|
||||
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
|
||||
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
|
||||
const kind = InstrumentKindObservableUpDownCounter
|
||||
p := float64ObservProvider{m.float64IP}
|
||||
p := float64ObservProvider{m}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -233,7 +239,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
|
||||
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
|
||||
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
|
||||
const kind = InstrumentKindObservableGauge
|
||||
p := float64ObservProvider{m.float64IP}
|
||||
p := float64ObservProvider{m}
|
||||
inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -301,7 +307,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
|
||||
|
||||
switch o := inst.(type) {
|
||||
case int64Observable:
|
||||
if err := o.registerable(m.scope); err != nil {
|
||||
if err := o.registerable(m); err != nil {
|
||||
if !errors.Is(err, errEmptyAgg) {
|
||||
errs.append(err)
|
||||
}
|
||||
@@ -309,7 +315,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
|
||||
}
|
||||
reg.registerInt64(o.observablID)
|
||||
case float64Observable:
|
||||
if err := o.registerable(m.scope); err != nil {
|
||||
if err := o.registerable(m); err != nil {
|
||||
if !errors.Is(err, errEmptyAgg) {
|
||||
errs.append(err)
|
||||
}
|
||||
@@ -322,19 +328,15 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
|
||||
}
|
||||
}
|
||||
|
||||
if err := errs.errorOrNil(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := errs.errorOrNil()
|
||||
if reg.len() == 0 {
|
||||
// All insts use drop aggregation.
|
||||
return noopRegister{}, nil
|
||||
// All insts use drop aggregation or are invalid.
|
||||
return noopRegister{}, err
|
||||
}
|
||||
|
||||
cback := func(ctx context.Context) error {
|
||||
return f(ctx, reg)
|
||||
}
|
||||
return m.pipes.registerMultiCallback(cback), nil
|
||||
// Some or all instruments were valid.
|
||||
cback := func(ctx context.Context) error { return f(ctx, reg) }
|
||||
return m.pipes.registerMultiCallback(cback), err
|
||||
}
|
||||
|
||||
type observer struct {
|
||||
@@ -441,17 +443,9 @@ func (noopRegister) Unregister() error {
|
||||
}
|
||||
|
||||
// int64InstProvider provides int64 OpenTelemetry instruments.
|
||||
type int64InstProvider struct {
|
||||
scope instrumentation.Scope
|
||||
pipes pipelines
|
||||
resolve resolver[int64]
|
||||
}
|
||||
type int64InstProvider struct{ *meter }
|
||||
|
||||
func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *int64InstProvider {
|
||||
return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)}
|
||||
}
|
||||
|
||||
func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
|
||||
func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
|
||||
inst := Instrument{
|
||||
Name: name,
|
||||
Description: desc,
|
||||
@@ -459,27 +453,19 @@ func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]a
|
||||
Kind: kind,
|
||||
Scope: p.scope,
|
||||
}
|
||||
return p.resolve.Aggregators(inst)
|
||||
return p.int64Resolver.Aggregators(inst)
|
||||
}
|
||||
|
||||
// lookup returns the resolved instrumentImpl.
|
||||
func (p *int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
|
||||
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
|
||||
aggs, err := p.aggs(kind, name, desc, u)
|
||||
return &int64Inst{measures: aggs}, err
|
||||
}
|
||||
|
||||
// float64InstProvider provides float64 OpenTelemetry instruments.
|
||||
type float64InstProvider struct {
|
||||
scope instrumentation.Scope
|
||||
pipes pipelines
|
||||
resolve resolver[float64]
|
||||
}
|
||||
type float64InstProvider struct{ *meter }
|
||||
|
||||
func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *float64InstProvider {
|
||||
return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)}
|
||||
}
|
||||
|
||||
func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
|
||||
func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
|
||||
inst := Instrument{
|
||||
Name: name,
|
||||
Description: desc,
|
||||
@@ -487,20 +473,20 @@ func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([
|
||||
Kind: kind,
|
||||
Scope: p.scope,
|
||||
}
|
||||
return p.resolve.Aggregators(inst)
|
||||
return p.float64Resolver.Aggregators(inst)
|
||||
}
|
||||
|
||||
// lookup returns the resolved instrumentImpl.
|
||||
func (p *float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
|
||||
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
|
||||
aggs, err := p.aggs(kind, name, desc, u)
|
||||
return &float64Inst{measures: aggs}, err
|
||||
}
|
||||
|
||||
type int64ObservProvider struct{ *int64InstProvider }
|
||||
type int64ObservProvider struct{ *meter }
|
||||
|
||||
func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {
|
||||
aggs, err := p.aggs(kind, name, desc, u)
|
||||
return newInt64Observable(p.scope, kind, name, desc, u, aggs), err
|
||||
aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u)
|
||||
return newInt64Observable(p.meter, kind, name, desc, u, aggs), err
|
||||
}
|
||||
|
||||
func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) {
|
||||
@@ -529,11 +515,11 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) {
|
||||
o.observe(val, c.Attributes())
|
||||
}
|
||||
|
||||
type float64ObservProvider struct{ *float64InstProvider }
|
||||
type float64ObservProvider struct{ *meter }
|
||||
|
||||
func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) {
|
||||
aggs, err := p.aggs(kind, name, desc, u)
|
||||
return newFloat64Observable(p.scope, kind, name, desc, u, aggs), err
|
||||
aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u)
|
||||
return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err
|
||||
}
|
||||
|
||||
func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) {
|
||||
|
||||
@@ -21,11 +21,14 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/go-logr/logr/funcr"
|
||||
"github.com/go-logr/logr/testr"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
api "go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
func TestMeterConcurrentSafe(t *testing.T) {
|
||||
@@ -120,3 +123,51 @@ func TestMeterProviderReturnsNoopMeterAfterShutdown(t *testing.T) {
|
||||
_, ok = m.(noop.Meter)
|
||||
assert.Truef(t, ok, "Meter from shutdown MeterProvider is not NoOp: %T", m)
|
||||
}
|
||||
|
||||
func TestMeterProviderMixingOnRegisterErrors(t *testing.T) {
|
||||
otel.SetLogger(testr.New(t))
|
||||
|
||||
rdr0 := NewManualReader()
|
||||
mp0 := NewMeterProvider(WithReader(rdr0))
|
||||
|
||||
rdr1 := NewManualReader()
|
||||
mp1 := NewMeterProvider(WithReader(rdr1))
|
||||
|
||||
// Meters with the same scope but different MeterProviders.
|
||||
m0 := mp0.Meter("TestMeterProviderMixingOnRegisterErrors")
|
||||
m1 := mp1.Meter("TestMeterProviderMixingOnRegisterErrors")
|
||||
|
||||
m0Gauge, err := m0.Float64ObservableGauge("float64Gauge")
|
||||
require.NoError(t, err)
|
||||
|
||||
m1Gauge, err := m1.Int64ObservableGauge("int64Gauge")
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = m0.RegisterCallback(
|
||||
func(_ context.Context, o api.Observer) error {
|
||||
o.ObserveFloat64(m0Gauge, 2)
|
||||
// Observe an instrument from a different MeterProvider.
|
||||
o.ObserveInt64(m1Gauge, 1)
|
||||
|
||||
return nil
|
||||
},
|
||||
m0Gauge, m1Gauge,
|
||||
)
|
||||
assert.Error(
|
||||
t,
|
||||
err,
|
||||
"Instrument registered with Meter from different MeterProvider",
|
||||
)
|
||||
|
||||
var data metricdata.ResourceMetrics
|
||||
_ = rdr0.Collect(context.Background(), &data)
|
||||
// Only the metrics from mp0 should be produced.
|
||||
assert.Len(t, data.ScopeMetrics, 1)
|
||||
|
||||
err = rdr1.Collect(context.Background(), &data)
|
||||
assert.NoError(t, err, "Errored when collect should be a noop")
|
||||
assert.Len(
|
||||
t, data.ScopeMetrics, 0,
|
||||
"Metrics produced for instrument collected by different MeterProvider",
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user