mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-18 03:22:12 +02:00
Only registers callbacks if non-drop aggregation is used (#3408)
* Do not return an error for Drop aggs The async instruments currently return an error if and only if there are no aggregators returned from a resolve. Returning no aggregators means the instrument aggregation is drop. Do not include this in the error reporting decision. * Only registers callbacks if non-drop agg is used The instruments passed to RegisterCallback need to have some aggregation defined otherwise it is implied they have a Drop aggregation. Check that at least one instrument passed has an aggregation other than Drop before registering the callback with the pipelines. Also, return an error if the user passed another API implementation of an asynchronous instrument. * Remove unneeded TODO from pipeline * Add changes to changelog * Test callback not called for all drop instruments * Test RegisterCallback returns err for non-SDK inst * Fail gracefully for non-SDK instruments Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
This commit is contained in:
parent
d091ba88e4
commit
308d0362e6
@ -44,6 +44,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340)
|
||||
- `Aggregation`s from `go.opentelemetry.io/otel/sdk/metric` with no data are not exported. (#3394, #3436)
|
||||
- Reenabled Attribute Filters in the Metric SDK. (#3396)
|
||||
- Asynchronous callbacks are only called if they are registered with at least one instrument that does not use drop aggragation. (#3408)
|
||||
- Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432)
|
||||
- Handle partial success responses in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric` exporters. (#3162, #3440)
|
||||
|
||||
|
@ -15,8 +15,6 @@
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"go.opentelemetry.io/otel/metric/instrument"
|
||||
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
|
||||
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
|
||||
@ -70,9 +68,6 @@ func (p *instProvider[N]) lookup(kind view.InstrumentKind, name string, opts []i
|
||||
}
|
||||
|
||||
aggs, err := p.resolve.Aggregators(key)
|
||||
if len(aggs) == 0 && err != nil {
|
||||
err = fmt.Errorf("instrument does not match any view: %w", err)
|
||||
}
|
||||
return &instrumentImpl[N]{aggregators: aggs}, err
|
||||
}
|
||||
|
||||
|
@ -73,6 +73,31 @@ func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
|
||||
// RegisterCallback registers the function f to be called when any of the
|
||||
// insts Collect method is called.
|
||||
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, f func(context.Context)) error {
|
||||
for _, inst := range insts {
|
||||
// Only register if at least one instrument has a non-drop aggregation.
|
||||
// Otherwise, calling f during collection will be wasted computation.
|
||||
switch t := inst.(type) {
|
||||
case *instrumentImpl[int64]:
|
||||
if len(t.aggregators) > 0 {
|
||||
return m.registerCallback(f)
|
||||
}
|
||||
case *instrumentImpl[float64]:
|
||||
if len(t.aggregators) > 0 {
|
||||
return m.registerCallback(f)
|
||||
}
|
||||
default:
|
||||
// Instrument external to the SDK. For example, an instrument from
|
||||
// the "go.opentelemetry.io/otel/metric/internal/global" package.
|
||||
//
|
||||
// Fail gracefully here, assume a valid instrument.
|
||||
return m.registerCallback(f)
|
||||
}
|
||||
}
|
||||
// All insts use drop aggregation.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *meter) registerCallback(f func(context.Context)) error {
|
||||
m.pipes.registerCallback(f)
|
||||
return nil
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncint64"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||
"go.opentelemetry.io/otel/sdk/metric/view"
|
||||
@ -480,6 +481,49 @@ func TestMetersProvideScope(t *testing.T) {
|
||||
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
|
||||
}
|
||||
|
||||
func TestRegisterCallbackDropAggregations(t *testing.T) {
|
||||
aggFn := func(view.InstrumentKind) aggregation.Aggregation {
|
||||
return aggregation.Drop{}
|
||||
}
|
||||
r := NewManualReader(WithAggregationSelector(aggFn))
|
||||
mp := NewMeterProvider(WithReader(r))
|
||||
m := mp.Meter("testRegisterCallbackDropAggregations")
|
||||
|
||||
int64Counter, err := m.AsyncInt64().Counter("int64.counter")
|
||||
require.NoError(t, err)
|
||||
|
||||
int64UpDownCounter, err := m.AsyncInt64().UpDownCounter("int64.up_down_counter")
|
||||
require.NoError(t, err)
|
||||
|
||||
int64Gauge, err := m.AsyncInt64().Gauge("int64.gauge")
|
||||
require.NoError(t, err)
|
||||
|
||||
floag64Counter, err := m.AsyncFloat64().Counter("floag64.counter")
|
||||
require.NoError(t, err)
|
||||
|
||||
floag64UpDownCounter, err := m.AsyncFloat64().UpDownCounter("floag64.up_down_counter")
|
||||
require.NoError(t, err)
|
||||
|
||||
floag64Gauge, err := m.AsyncFloat64().Gauge("floag64.gauge")
|
||||
require.NoError(t, err)
|
||||
|
||||
var called bool
|
||||
require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{
|
||||
int64Counter,
|
||||
int64UpDownCounter,
|
||||
int64Gauge,
|
||||
floag64Counter,
|
||||
floag64UpDownCounter,
|
||||
floag64Gauge,
|
||||
}, func(context.Context) { called = true }))
|
||||
|
||||
data, err := r.Collect(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.False(t, called, "callback called for all drop instruments")
|
||||
assert.Len(t, data.ScopeMetrics, 0, "metrics exported for drop instruments")
|
||||
}
|
||||
|
||||
func TestAttributeFilter(t *testing.T) {
|
||||
one := 1.0
|
||||
two := 2.0
|
||||
|
@ -438,7 +438,6 @@ func newPipelines(res *resource.Resource, readers []Reader, views []view.View) p
|
||||
return pipes
|
||||
}
|
||||
|
||||
// TODO (#3053) Only register callbacks if any instrument matches in a view.
|
||||
func (p pipelines) registerCallback(fn func(context.Context)) {
|
||||
for _, pipe := range p {
|
||||
pipe.addCallback(fn)
|
||||
|
Loading…
x
Reference in New Issue
Block a user