You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
Remove the unneeded Observe method from the async instruments (#3586)
* Update RegisterCallback and Callback decls RegisterCallback accept variadic Asynchronous instruments instead of a slice. Callback accept an observation result recorder to ensure instruments that are observed by a callback. * Update global impl * Update noop impl * Update SDK impl * Fix prometheus example * Fix metric API example_test * Remove unused registerabler * Rename ObservationRecorder to MultiObserver * Update Callback documentation about MultiObserver * Remove the Observe method from async inst * Revert to iface for Observers * Fix async inst docs * Update global async delegate race test * Restore removed observe doc * Remove TODO * Remove stale comment * Update changelog
This commit is contained in:
+30
-20
@@ -20,7 +20,6 @@ import (
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/metric/instrument"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
@@ -211,6 +210,36 @@ type observablID[N int64 | float64] struct {
|
||||
scope instrumentation.Scope
|
||||
}
|
||||
|
||||
type float64Observable struct {
|
||||
instrument.Float64Observable
|
||||
*observable[float64]
|
||||
}
|
||||
|
||||
var _ instrument.Float64ObservableCounter = float64Observable{}
|
||||
var _ instrument.Float64ObservableUpDownCounter = float64Observable{}
|
||||
var _ instrument.Float64ObservableGauge = float64Observable{}
|
||||
|
||||
func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc string, u unit.Unit, agg []internal.Aggregator[float64]) float64Observable {
|
||||
return float64Observable{
|
||||
observable: newObservable[float64](scope, kind, name, desc, u, agg),
|
||||
}
|
||||
}
|
||||
|
||||
type int64Observable struct {
|
||||
instrument.Int64Observable
|
||||
*observable[int64]
|
||||
}
|
||||
|
||||
var _ instrument.Int64ObservableCounter = int64Observable{}
|
||||
var _ instrument.Int64ObservableUpDownCounter = int64Observable{}
|
||||
var _ instrument.Int64ObservableGauge = int64Observable{}
|
||||
|
||||
func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc string, u unit.Unit, agg []internal.Aggregator[int64]) int64Observable {
|
||||
return int64Observable{
|
||||
observable: newObservable[int64](scope, kind, name, desc, u, agg),
|
||||
}
|
||||
}
|
||||
|
||||
type observable[N int64 | float64] struct {
|
||||
instrument.Asynchronous
|
||||
observablID[N]
|
||||
@@ -231,25 +260,6 @@ func newObservable[N int64 | float64](scope instrumentation.Scope, kind Instrume
|
||||
}
|
||||
}
|
||||
|
||||
var _ instrument.Float64ObservableCounter = (*observable[float64])(nil)
|
||||
var _ instrument.Float64ObservableUpDownCounter = (*observable[float64])(nil)
|
||||
var _ instrument.Float64ObservableGauge = (*observable[float64])(nil)
|
||||
var _ instrument.Int64ObservableCounter = (*observable[int64])(nil)
|
||||
var _ instrument.Int64ObservableUpDownCounter = (*observable[int64])(nil)
|
||||
var _ instrument.Int64ObservableGauge = (*observable[int64])(nil)
|
||||
|
||||
// Observe logs an error.
|
||||
func (o *observable[N]) Observe(ctx context.Context, val N, attrs ...attribute.KeyValue) {
|
||||
var zero N
|
||||
err := errors.New("invalid observation")
|
||||
global.Error(err, "dropping observation made outside a callback",
|
||||
"name", o.name,
|
||||
"description", o.description,
|
||||
"unit", o.unit,
|
||||
"number", fmt.Sprintf("%T", zero),
|
||||
)
|
||||
}
|
||||
|
||||
// observe records the val for the set of attrs.
|
||||
func (o *observable[N]) observe(val N, attrs []attribute.KeyValue) {
|
||||
for _, agg := range o.aggregators {
|
||||
|
||||
+33
-27
@@ -232,7 +232,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...instrument.Asynchro
|
||||
}
|
||||
|
||||
switch o := inst.(type) {
|
||||
case *observable[int64]:
|
||||
case int64Observable:
|
||||
if err := o.registerable(m.scope); err != nil {
|
||||
if !errors.Is(err, errEmptyAgg) {
|
||||
errs.append(err)
|
||||
@@ -240,7 +240,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...instrument.Asynchro
|
||||
continue
|
||||
}
|
||||
reg.registerInt64(o.observablID)
|
||||
case *observable[float64]:
|
||||
case float64Observable:
|
||||
if err := o.registerable(m.scope); err != nil {
|
||||
if !errors.Is(err, errEmptyAgg) {
|
||||
errs.append(err)
|
||||
@@ -298,10 +298,10 @@ var (
|
||||
errUnregObserver = errors.New("observable instrument not registered for callback")
|
||||
)
|
||||
|
||||
func (r observer) ObserveFloat64(o instrument.Float64Observer, v float64, a ...attribute.KeyValue) {
|
||||
var oImpl *observable[float64]
|
||||
func (r observer) ObserveFloat64(o instrument.Float64Observable, v float64, a ...attribute.KeyValue) {
|
||||
var oImpl float64Observable
|
||||
switch conv := o.(type) {
|
||||
case *observable[float64]:
|
||||
case float64Observable:
|
||||
oImpl = conv
|
||||
case interface {
|
||||
Unwrap() instrument.Asynchronous
|
||||
@@ -309,7 +309,7 @@ func (r observer) ObserveFloat64(o instrument.Float64Observer, v float64, a ...a
|
||||
// Unwrap any global.
|
||||
async := conv.Unwrap()
|
||||
var ok bool
|
||||
if oImpl, ok = async.(*observable[float64]); !ok {
|
||||
if oImpl, ok = async.(float64Observable); !ok {
|
||||
global.Error(errUnknownObserver, "failed to record asynchronous")
|
||||
return
|
||||
}
|
||||
@@ -330,10 +330,10 @@ func (r observer) ObserveFloat64(o instrument.Float64Observer, v float64, a ...a
|
||||
oImpl.observe(v, a)
|
||||
}
|
||||
|
||||
func (r observer) ObserveInt64(o instrument.Int64Observer, v int64, a ...attribute.KeyValue) {
|
||||
var oImpl *observable[int64]
|
||||
func (r observer) ObserveInt64(o instrument.Int64Observable, v int64, a ...attribute.KeyValue) {
|
||||
var oImpl int64Observable
|
||||
switch conv := o.(type) {
|
||||
case *observable[int64]:
|
||||
case int64Observable:
|
||||
oImpl = conv
|
||||
case interface {
|
||||
Unwrap() instrument.Asynchronous
|
||||
@@ -341,7 +341,7 @@ func (r observer) ObserveInt64(o instrument.Int64Observer, v int64, a ...attribu
|
||||
// Unwrap any global.
|
||||
async := conv.Unwrap()
|
||||
var ok bool
|
||||
if oImpl, ok = async.(*observable[int64]); !ok {
|
||||
if oImpl, ok = async.(int64Observable); !ok {
|
||||
global.Error(errUnknownObserver, "failed to record asynchronous")
|
||||
return
|
||||
}
|
||||
@@ -398,13 +398,13 @@ func (p *instProvider[N]) lookup(kind InstrumentKind, name, desc string, u unit.
|
||||
|
||||
type int64ObservProvider struct{ *instProvider[int64] }
|
||||
|
||||
func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc string, u unit.Unit) (*observable[int64], error) {
|
||||
func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc string, u unit.Unit) (int64Observable, error) {
|
||||
aggs, err := p.aggs(kind, name, desc, u)
|
||||
return newObservable(p.scope, kind, name, desc, u, aggs), err
|
||||
return newInt64Observable(p.scope, kind, name, desc, u, aggs), err
|
||||
}
|
||||
|
||||
func (p int64ObservProvider) registerCallbacks(inst *observable[int64], cBacks []instrument.Int64Callback) {
|
||||
if inst == nil {
|
||||
func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []instrument.Int64Callback) {
|
||||
if inst.observable == nil || len(inst.aggregators) == 0 {
|
||||
// Drop aggregator.
|
||||
return
|
||||
}
|
||||
@@ -414,20 +414,28 @@ func (p int64ObservProvider) registerCallbacks(inst *observable[int64], cBacks [
|
||||
}
|
||||
}
|
||||
|
||||
func (p int64ObservProvider) callback(i *observable[int64], f instrument.Int64Callback) func(context.Context) error {
|
||||
inst := callbackObserver[int64]{i}
|
||||
func (p int64ObservProvider) callback(i int64Observable, f instrument.Int64Callback) func(context.Context) error {
|
||||
inst := int64Observer{i}
|
||||
return func(ctx context.Context) error { return f(ctx, inst) }
|
||||
}
|
||||
|
||||
type int64Observer struct {
|
||||
int64Observable
|
||||
}
|
||||
|
||||
func (o int64Observer) Observe(val int64, attrs ...attribute.KeyValue) {
|
||||
o.observe(val, attrs)
|
||||
}
|
||||
|
||||
type float64ObservProvider struct{ *instProvider[float64] }
|
||||
|
||||
func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc string, u unit.Unit) (*observable[float64], error) {
|
||||
func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc string, u unit.Unit) (float64Observable, error) {
|
||||
aggs, err := p.aggs(kind, name, desc, u)
|
||||
return newObservable(p.scope, kind, name, desc, u, aggs), err
|
||||
return newFloat64Observable(p.scope, kind, name, desc, u, aggs), err
|
||||
}
|
||||
|
||||
func (p float64ObservProvider) registerCallbacks(inst *observable[float64], cBacks []instrument.Float64Callback) {
|
||||
if inst == nil {
|
||||
func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []instrument.Float64Callback) {
|
||||
if inst.observable == nil || len(inst.aggregators) == 0 {
|
||||
// Drop aggregator.
|
||||
return
|
||||
}
|
||||
@@ -437,17 +445,15 @@ func (p float64ObservProvider) registerCallbacks(inst *observable[float64], cBac
|
||||
}
|
||||
}
|
||||
|
||||
func (p float64ObservProvider) callback(i *observable[float64], f instrument.Float64Callback) func(context.Context) error {
|
||||
inst := callbackObserver[float64]{i}
|
||||
func (p float64ObservProvider) callback(i float64Observable, f instrument.Float64Callback) func(context.Context) error {
|
||||
inst := float64Observer{i}
|
||||
return func(ctx context.Context) error { return f(ctx, inst) }
|
||||
}
|
||||
|
||||
// callbackObserver is an observer that records values for a wrapped
|
||||
// observable.
|
||||
type callbackObserver[N int64 | float64] struct {
|
||||
*observable[N]
|
||||
type float64Observer struct {
|
||||
float64Observable
|
||||
}
|
||||
|
||||
func (o callbackObserver[N]) Observe(_ context.Context, val N, attrs ...attribute.KeyValue) {
|
||||
func (o float64Observer) Observe(val float64, attrs ...attribute.KeyValue) {
|
||||
o.observe(val, attrs)
|
||||
}
|
||||
|
||||
+16
-35
@@ -179,8 +179,8 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableInt64Count",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
cback := func(ctx context.Context, o instrument.Int64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
cback := func(_ context.Context, o instrument.Int64Observer) error {
|
||||
o.Observe(4, attrs...)
|
||||
return nil
|
||||
}
|
||||
ctr, err := m.Int64ObservableCounter("aint", instrument.WithInt64Callback(cback))
|
||||
@@ -190,9 +190,6 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
return nil
|
||||
}, ctr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Observed outside of a callback, it should be ignored.
|
||||
ctr.Observe(context.Background(), 19)
|
||||
},
|
||||
want: metricdata.Metrics{
|
||||
Name: "aint",
|
||||
@@ -209,8 +206,8 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableInt64UpDownCount",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
cback := func(ctx context.Context, o instrument.Int64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
cback := func(_ context.Context, o instrument.Int64Observer) error {
|
||||
o.Observe(4, attrs...)
|
||||
return nil
|
||||
}
|
||||
ctr, err := m.Int64ObservableUpDownCounter("aint", instrument.WithInt64Callback(cback))
|
||||
@@ -220,9 +217,6 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
return nil
|
||||
}, ctr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Observed outside of a callback, it should be ignored.
|
||||
ctr.Observe(context.Background(), 19)
|
||||
},
|
||||
want: metricdata.Metrics{
|
||||
Name: "aint",
|
||||
@@ -239,8 +233,8 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableInt64Gauge",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
cback := func(ctx context.Context, o instrument.Int64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
cback := func(_ context.Context, o instrument.Int64Observer) error {
|
||||
o.Observe(4, attrs...)
|
||||
return nil
|
||||
}
|
||||
gauge, err := m.Int64ObservableGauge("agauge", instrument.WithInt64Callback(cback))
|
||||
@@ -250,9 +244,6 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
return nil
|
||||
}, gauge)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Observed outside of a callback, it should be ignored.
|
||||
gauge.Observe(context.Background(), 19)
|
||||
},
|
||||
want: metricdata.Metrics{
|
||||
Name: "agauge",
|
||||
@@ -267,8 +258,8 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableFloat64Count",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
cback := func(ctx context.Context, o instrument.Float64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
cback := func(_ context.Context, o instrument.Float64Observer) error {
|
||||
o.Observe(4, attrs...)
|
||||
return nil
|
||||
}
|
||||
ctr, err := m.Float64ObservableCounter("afloat", instrument.WithFloat64Callback(cback))
|
||||
@@ -278,9 +269,6 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
return nil
|
||||
}, ctr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Observed outside of a callback, it should be ignored.
|
||||
ctr.Observe(context.Background(), 19)
|
||||
},
|
||||
want: metricdata.Metrics{
|
||||
Name: "afloat",
|
||||
@@ -297,8 +285,8 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableFloat64UpDownCount",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
cback := func(ctx context.Context, o instrument.Float64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
cback := func(_ context.Context, o instrument.Float64Observer) error {
|
||||
o.Observe(4, attrs...)
|
||||
return nil
|
||||
}
|
||||
ctr, err := m.Float64ObservableUpDownCounter("afloat", instrument.WithFloat64Callback(cback))
|
||||
@@ -308,9 +296,6 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
return nil
|
||||
}, ctr)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Observed outside of a callback, it should be ignored.
|
||||
ctr.Observe(context.Background(), 19)
|
||||
},
|
||||
want: metricdata.Metrics{
|
||||
Name: "afloat",
|
||||
@@ -327,8 +312,8 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
{
|
||||
name: "ObservableFloat64Gauge",
|
||||
fn: func(t *testing.T, m metric.Meter) {
|
||||
cback := func(ctx context.Context, o instrument.Float64Observer) error {
|
||||
o.Observe(ctx, 4, attrs...)
|
||||
cback := func(_ context.Context, o instrument.Float64Observer) error {
|
||||
o.Observe(4, attrs...)
|
||||
return nil
|
||||
}
|
||||
gauge, err := m.Float64ObservableGauge("agauge", instrument.WithFloat64Callback(cback))
|
||||
@@ -338,9 +323,6 @@ func TestMeterCreatesInstruments(t *testing.T) {
|
||||
return nil
|
||||
}, gauge)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// Observed outside of a callback, it should be ignored.
|
||||
gauge.Observe(context.Background(), 19)
|
||||
},
|
||||
want: metricdata.Metrics{
|
||||
Name: "agauge",
|
||||
@@ -564,10 +546,9 @@ func TestCallbackObserverNonRegistered(t *testing.T) {
|
||||
fCtr, err := m2.Float64ObservableCounter("float64 ctr")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Panics if Observe is called.
|
||||
type int64Obsrv struct{ instrument.Int64Observer }
|
||||
type int64Obsrv struct{ instrument.Int64Observable }
|
||||
int64Foreign := int64Obsrv{}
|
||||
type float64Obsrv struct{ instrument.Float64Observer }
|
||||
type float64Obsrv struct{ instrument.Float64Observable }
|
||||
float64Foreign := float64Obsrv{}
|
||||
|
||||
_, err = m1.RegisterCallback(
|
||||
@@ -1311,9 +1292,9 @@ func TestAsynchronousExample(t *testing.T) {
|
||||
|
||||
observations := make(map[attribute.Set]int64)
|
||||
_, err := meter.Int64ObservableCounter(instName, instrument.WithInt64Callback(
|
||||
func(ctx context.Context, o instrument.Int64Observer) error {
|
||||
func(_ context.Context, o instrument.Int64Observer) error {
|
||||
for attrSet, val := range observations {
|
||||
o.Observe(ctx, val, attrSet.ToSlice()...)
|
||||
o.Observe(val, attrSet.ToSlice()...)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user