diff --git a/api/metric/api_test.go b/api/metric/api_test.go index 369baa661..e5c194977 100644 --- a/api/metric/api_test.go +++ b/api/metric/api_test.go @@ -224,6 +224,28 @@ func TestObserverInstruments(t *testing.T) { -142, ) }) + t.Run("float updownsumobserver", func(t *testing.T) { + labels := []kv.KeyValue{kv.String("O", "P")} + mockSDK, meter := mockTest.NewMeter() + o := Must(meter).RegisterFloat64UpDownSumObserver("test.updownsumobserver.float", func(_ context.Context, result metric.Float64ObserverResult) { + result.Observe(42.1, labels...) + }) + mockSDK.RunAsyncInstruments() + checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, metric.UpDownSumObserverKind, o.AsyncImpl(), + 42.1, + ) + }) + t.Run("int updownsumobserver", func(t *testing.T) { + labels := []kv.KeyValue{} + mockSDK, meter := mockTest.NewMeter() + o := Must(meter).RegisterInt64UpDownSumObserver("test.observer.int", func(_ context.Context, result metric.Int64ObserverResult) { + result.Observe(-142, labels...) + }) + mockSDK.RunAsyncInstruments() + checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, metric.UpDownSumObserverKind, o.AsyncImpl(), + -142, + ) + }) } func checkSyncBatches(t *testing.T, ctx context.Context, labels []kv.KeyValue, mock *mockTest.MeterImpl, nkind metric.NumberKind, mkind metric.Kind, instrument metric.InstrumentImpl, expected ...float64) { diff --git a/api/metric/async.go b/api/metric/async.go index c82fdc409..d721dffef 100644 --- a/api/metric/async.go +++ b/api/metric/async.go @@ -203,3 +203,15 @@ func wrapFloat64SumObserverInstrument(asyncInst AsyncImpl, err error) (Float64Su common, err := checkNewAsync(asyncInst, err) return Float64SumObserver{asyncInstrument: common}, err } + +// wrapInt64UpDownSumObserverInstrument converts an AsyncImpl into Int64UpDownSumObserver. +func wrapInt64UpDownSumObserverInstrument(asyncInst AsyncImpl, err error) (Int64UpDownSumObserver, error) { + common, err := checkNewAsync(asyncInst, err) + return Int64UpDownSumObserver{asyncInstrument: common}, err +} + +// wrapFloat64UpDownSumObserverInstrument converts an AsyncImpl into Float64UpDownSumObserver. +func wrapFloat64UpDownSumObserverInstrument(asyncInst AsyncImpl, err error) (Float64UpDownSumObserver, error) { + common, err := checkNewAsync(asyncInst, err) + return Float64UpDownSumObserver{asyncInstrument: common}, err +} diff --git a/api/metric/kind.go b/api/metric/kind.go index fca4fa6fb..5020384c5 100644 --- a/api/metric/kind.go +++ b/api/metric/kind.go @@ -32,4 +32,6 @@ const ( // SumObserverKind indicates a SumObserver instrument. SumObserverKind + // UpDownSumObserverKind indicates a UpDownSumObserver instrument. + UpDownSumObserverKind ) diff --git a/api/metric/kind_string.go b/api/metric/kind_string.go index 33118e2a0..eb1a0d503 100644 --- a/api/metric/kind_string.go +++ b/api/metric/kind_string.go @@ -13,11 +13,12 @@ func _() { _ = x[CounterKind-2] _ = x[UpDownCounterKind-3] _ = x[SumObserverKind-4] + _ = x[UpDownSumObserverKind-5] } -const _Kind_name = "ValueRecorderKindValueObserverKindCounterKindUpDownCounterKindSumObserverKind" +const _Kind_name = "ValueRecorderKindValueObserverKindCounterKindUpDownCounterKindSumObserverKindUpDownSumObserverKind" -var _Kind_index = [...]uint8{0, 17, 34, 45, 62, 77} +var _Kind_index = [...]uint8{0, 17, 34, 45, 62, 77, 98} func (i Kind) String() string { if i < 0 || i >= Kind(len(_Kind_index)-1) { diff --git a/api/metric/meter.go b/api/metric/meter.go index e1b546bf9..4caa7a7a6 100644 --- a/api/metric/meter.go +++ b/api/metric/meter.go @@ -170,6 +170,32 @@ func (m Meter) RegisterFloat64SumObserver(name string, callback Float64ObserverC newFloat64AsyncRunner(callback))) } +// RegisterInt64UpDownSumObserver creates a new integer UpDownSumObserver instrument +// with the given name, running a given callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (m Meter) RegisterInt64UpDownSumObserver(name string, callback Int64ObserverCallback, opts ...Option) (Int64UpDownSumObserver, error) { + if callback == nil { + return wrapInt64UpDownSumObserverInstrument(NoopAsync{}, nil) + } + return wrapInt64UpDownSumObserverInstrument( + m.newAsync(name, UpDownSumObserverKind, Int64NumberKind, opts, + newInt64AsyncRunner(callback))) +} + +// RegisterFloat64UpDownSumObserver creates a new floating point UpDownSumObserver with +// the given name, running a given callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (m Meter) RegisterFloat64UpDownSumObserver(name string, callback Float64ObserverCallback, opts ...Option) (Float64UpDownSumObserver, error) { + if callback == nil { + return wrapFloat64UpDownSumObserverInstrument(NoopAsync{}, nil) + } + return wrapFloat64UpDownSumObserverInstrument( + m.newAsync(name, UpDownSumObserverKind, Float64NumberKind, opts, + newFloat64AsyncRunner(callback))) +} + // RegisterInt64ValueObserver creates a new integer ValueObserver instrument // with the given name, running in a batch callback, and customized with // options. May return an error if the name is invalid (e.g., empty) @@ -220,6 +246,31 @@ func (b BatchObserver) RegisterFloat64SumObserver(name string, opts ...Option) ( b.runner)) } +// RegisterInt64UpDownSumObserver creates a new integer UpDownSumObserver instrument +// with the given name, running in a batch callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (b BatchObserver) RegisterInt64UpDownSumObserver(name string, opts ...Option) (Int64UpDownSumObserver, error) { + if b.runner == nil { + return wrapInt64UpDownSumObserverInstrument(NoopAsync{}, nil) + } + return wrapInt64UpDownSumObserverInstrument( + b.meter.newAsync(name, UpDownSumObserverKind, Int64NumberKind, opts, b.runner)) +} + +// RegisterFloat64UpDownSumObserver creates a new floating point UpDownSumObserver with +// the given name, running in a batch callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (b BatchObserver) RegisterFloat64UpDownSumObserver(name string, opts ...Option) (Float64UpDownSumObserver, error) { + if b.runner == nil { + return wrapFloat64UpDownSumObserverInstrument(NoopAsync{}, nil) + } + return wrapFloat64UpDownSumObserverInstrument( + b.meter.newAsync(name, UpDownSumObserverKind, Float64NumberKind, opts, + b.runner)) +} + // MeterImpl returns the underlying MeterImpl of this Meter. func (m Meter) MeterImpl() MeterImpl { return m.impl diff --git a/api/metric/must.go b/api/metric/must.go index bf4b60284..9c5115812 100644 --- a/api/metric/must.go +++ b/api/metric/must.go @@ -133,6 +133,26 @@ func (mm MeterMust) RegisterFloat64SumObserver(name string, callback Float64Obse } } +// RegisterInt64UpDownSumObserver calls `Meter.RegisterInt64UpDownSumObserver` and +// returns the instrument, panicking if it encounters an error. +func (mm MeterMust) RegisterInt64UpDownSumObserver(name string, callback Int64ObserverCallback, oos ...Option) Int64UpDownSumObserver { + if inst, err := mm.meter.RegisterInt64UpDownSumObserver(name, callback, oos...); err != nil { + panic(err) + } else { + return inst + } +} + +// RegisterFloat64UpDownSumObserver calls `Meter.RegisterFloat64UpDownSumObserver` and +// returns the instrument, panicking if it encounters an error. +func (mm MeterMust) RegisterFloat64UpDownSumObserver(name string, callback Float64ObserverCallback, oos ...Option) Float64UpDownSumObserver { + if inst, err := mm.meter.RegisterFloat64UpDownSumObserver(name, callback, oos...); err != nil { + panic(err) + } else { + return inst + } +} + // NewBatchObserver returns a wrapper around BatchObserver that panics // when any instrument constructor returns an error. func (mm MeterMust) NewBatchObserver(callback BatchObserverCallback) BatchObserverMust { @@ -180,3 +200,23 @@ func (bm BatchObserverMust) RegisterFloat64SumObserver(name string, oos ...Optio return inst } } + +// RegisterInt64UpDownSumObserver calls `BatchObserver.RegisterInt64UpDownSumObserver` and +// returns the instrument, panicking if it encounters an error. +func (bm BatchObserverMust) RegisterInt64UpDownSumObserver(name string, oos ...Option) Int64UpDownSumObserver { + if inst, err := bm.batch.RegisterInt64UpDownSumObserver(name, oos...); err != nil { + panic(err) + } else { + return inst + } +} + +// RegisterFloat64UpDownSumObserver calls `BatchObserver.RegisterFloat64UpDownSumObserver` and +// returns the instrument, panicking if it encounters an error. +func (bm BatchObserverMust) RegisterFloat64UpDownSumObserver(name string, oos ...Option) Float64UpDownSumObserver { + if inst, err := bm.batch.RegisterFloat64UpDownSumObserver(name, oos...); err != nil { + panic(err) + } else { + return inst + } +} diff --git a/api/metric/observer.go b/api/metric/observer.go index f9100f310..1616908cc 100644 --- a/api/metric/observer.go +++ b/api/metric/observer.go @@ -45,6 +45,18 @@ type Float64SumObserver struct { asyncInstrument } +// Int64UpDownSumObserver is a metric that captures a precomputed sum of +// int64 values at a point in time. +type Int64UpDownSumObserver struct { + asyncInstrument +} + +// Float64UpDownSumObserver is a metric that captures a precomputed sum of +// float64 values at a point in time. +type Float64UpDownSumObserver struct { + asyncInstrument +} + // Observation returns an Observation, a BatchObserverCallback // argument, for an asynchronous integer instrument. // This returns an implementation-level object for use by the SDK, @@ -88,3 +100,25 @@ func (f Float64SumObserver) Observation(v float64) Observation { instrument: f.instrument, } } + +// Observation returns an Observation, a BatchObserverCallback +// argument, for an asynchronous integer instrument. +// This returns an implementation-level object for use by the SDK, +// users should not refer to this. +func (i Int64UpDownSumObserver) Observation(v int64) Observation { + return Observation{ + number: NewInt64Number(v), + instrument: i.instrument, + } +} + +// Observation returns an Observation, a BatchObserverCallback +// argument, for an asynchronous integer instrument. +// This returns an implementation-level object for use by the SDK, +// users should not refer to this. +func (f Float64UpDownSumObserver) Observation(v float64) Observation { + return Observation{ + number: NewFloat64Number(v), + instrument: f.instrument, + } +} diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index d2c8a173d..35a64c16b 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -347,27 +347,45 @@ func TestObserverCollection(t *testing.T) { result.Observe(1) }) + _ = Must(meter).RegisterFloat64UpDownSumObserver("float.updownsumobserver", func(_ context.Context, result metric.Float64ObserverResult) { + result.Observe(1, kv.String("A", "B")) + result.Observe(-2, kv.String("A", "B")) + result.Observe(1, kv.String("C", "D")) + }) + _ = Must(meter).RegisterInt64UpDownSumObserver("int.updownsumobserver", func(_ context.Context, result metric.Int64ObserverResult) { + result.Observe(2, kv.String("A", "B")) + result.Observe(1) + // last value wins + result.Observe(1, kv.String("A", "B")) + result.Observe(-1) + }) + _ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(_ context.Context, result metric.Int64ObserverResult) { }) collected := sdk.Collect(ctx) - require.Equal(t, 8, collected) - require.Equal(t, 8, len(integrator.records)) + require.Equal(t, collected, len(integrator.records)) out := batchTest.NewOutput(label.DefaultEncoder()) for _, rec := range integrator.records { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ - "float.sumobserver/A=B/R=V": 2, - "float.sumobserver/C=D/R=V": 1, - "int.sumobserver//R=V": 1, - "int.sumobserver/A=B/R=V": 1, "float.valueobserver/A=B/R=V": -1, "float.valueobserver/C=D/R=V": -1, "int.valueobserver//R=V": 1, "int.valueobserver/A=B/R=V": 1, + + "float.sumobserver/A=B/R=V": 2, + "float.sumobserver/C=D/R=V": 1, + "int.sumobserver//R=V": 1, + "int.sumobserver/A=B/R=V": 1, + + "float.updownsumobserver/A=B/R=V": -2, + "float.updownsumobserver/C=D/R=V": 1, + "int.updownsumobserver//R=V": -1, + "int.updownsumobserver/A=B/R=V": 1, }, out.Map) } @@ -405,6 +423,8 @@ func TestObserverBatch(t *testing.T) { var intValueObs metric.Int64ValueObserver var floatSumObs metric.Float64SumObserver var intSumObs metric.Int64SumObserver + var floatUpDownSumObs metric.Float64UpDownSumObserver + var intUpDownSumObs metric.Int64UpDownSumObserver var batch = Must(meter).NewBatchObserver( func(_ context.Context, result metric.BatchObserverResult) { @@ -418,6 +438,8 @@ func TestObserverBatch(t *testing.T) { intValueObs.Observation(1), floatSumObs.Observation(1000), intSumObs.Observation(100), + floatUpDownSumObs.Observation(-1000), + intUpDownSumObs.Observation(-100), ) result.Observe( []kv.KeyValue{ @@ -425,6 +447,7 @@ func TestObserverBatch(t *testing.T) { }, floatValueObs.Observation(-1), floatSumObs.Observation(-1), + floatUpDownSumObs.Observation(-1), ) result.Observe( nil, @@ -432,27 +455,35 @@ func TestObserverBatch(t *testing.T) { intValueObs.Observation(1), intSumObs.Observation(10), floatSumObs.Observation(1.1), + intUpDownSumObs.Observation(10), ) }) floatValueObs = batch.RegisterFloat64ValueObserver("float.valueobserver") intValueObs = batch.RegisterInt64ValueObserver("int.valueobserver") floatSumObs = batch.RegisterFloat64SumObserver("float.sumobserver") intSumObs = batch.RegisterInt64SumObserver("int.sumobserver") + floatUpDownSumObs = batch.RegisterFloat64UpDownSumObserver("float.updownsumobserver") + intUpDownSumObs = batch.RegisterInt64UpDownSumObserver("int.updownsumobserver") collected := sdk.Collect(ctx) - require.Equal(t, 8, collected) - require.Equal(t, 8, len(integrator.records)) + require.Equal(t, collected, len(integrator.records)) out := batchTest.NewOutput(label.DefaultEncoder()) for _, rec := range integrator.records { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ - "float.sumobserver//R=V": 1.1, - "float.sumobserver/A=B/R=V": 1000, - "int.sumobserver//R=V": 10, - "int.sumobserver/A=B/R=V": 100, + "float.sumobserver//R=V": 1.1, + "float.sumobserver/A=B/R=V": 1000, + "int.sumobserver//R=V": 10, + "int.sumobserver/A=B/R=V": 100, + + "int.updownsumobserver/A=B/R=V": -100, + "float.updownsumobserver/A=B/R=V": -1000, + "int.updownsumobserver//R=V": 10, + "float.updownsumobserver/C=D/R=V": -1, + "float.valueobserver/A=B/R=V": -1, "float.valueobserver/C=D/R=V": -1, "int.valueobserver//R=V": 1, @@ -515,6 +546,42 @@ func TestRecordPersistence(t *testing.T) { require.Equal(t, int64(2), integrator.newAggCount) } +func TestIncorrectInstruments(t *testing.T) { + // The Batch observe/record APIs are susceptible to + // uninitialized instruments. + var counter metric.Int64Counter + var observer metric.Int64ValueObserver + + ctx := context.Background() + meter, sdk, integrator := newSDK(t) + + // Now try with uninitialized instruments. + meter.RecordBatch(ctx, nil, counter.Measurement(1)) + meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) { + result.Observe(nil, observer.Observation(1)) + }) + + collected := sdk.Collect(ctx) + require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr()) + require.Equal(t, 0, collected) + + // Now try with instruments from another SDK. + var noopMeter metric.Meter + counter = metric.Must(noopMeter).NewInt64Counter("counter") + observer = metric.Must(noopMeter).NewBatchObserver( + func(context.Context, metric.BatchObserverResult) {}, + ).RegisterInt64ValueObserver("observer") + + meter.RecordBatch(ctx, nil, counter.Measurement(1)) + meter.NewBatchObserver(func(_ context.Context, result metric.BatchObserverResult) { + result.Observe(nil, observer.Observation(1)) + }) + + collected = sdk.Collect(ctx) + require.Equal(t, 0, collected) + require.Equal(t, metricsdk.ErrUninitializedInstrument, integrator.sdkErr()) +} + func TestSyncInAsync(t *testing.T) { ctx := context.Background() meter, sdk, integrator := newSDK(t) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index ff0f3853c..10883ac06 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -152,6 +152,8 @@ var ( _ api.AsyncImpl = &asyncInstrument{} _ api.SyncImpl = &syncInstrument{} _ api.BoundSyncImpl = &record{} + + ErrUninitializedInstrument = fmt.Errorf("use of an uninitialized instrument") ) func (inst *instrument) Descriptor() api.Descriptor { @@ -422,8 +424,9 @@ func (m *Accumulator) CollectAsync(kv []kv.KeyValue, obs ...metric.Observation) labels := label.NewSetWithSortable(kv, &m.asyncSortSlice) for _, ob := range obs { - a := ob.AsyncImpl().Implementation().(*asyncInstrument) - a.observe(ob.Number(), &labels) + if a := m.fromAsync(ob.AsyncImpl()); a != nil { + a.observe(ob.Number(), &labels) + } } } @@ -438,8 +441,9 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int { m.asyncContext = nil for _, inst := range m.asyncInstruments.Instruments() { - a := inst.Implementation().(*asyncInstrument) - asyncCollected += m.checkpointAsync(a) + if a := m.fromAsync(inst); a != nil { + asyncCollected += m.checkpointAsync(a) + } } return asyncCollected @@ -494,8 +498,10 @@ func (m *Accumulator) RecordBatch(ctx context.Context, kvs []kv.KeyValue, measur // ordered labels. var labelsPtr *label.Set for i, meas := range measurements { - s := meas.SyncImpl().Implementation().(*syncInstrument) - + s := m.fromSync(meas.SyncImpl()) + if s == nil { + continue + } h := s.acquireHandle(kvs, labelsPtr) // Re-use labels for the next measurement. @@ -538,3 +544,27 @@ func (r *record) mapkey() mapkey { ordered: r.labels.Equivalent(), } } + +// fromSync gets a sync implementation object, checking for +// uninitialized instruments and instruments created by another SDK. +func (m *Accumulator) fromSync(sync metric.SyncImpl) *syncInstrument { + if sync != nil { + if inst, ok := sync.Implementation().(*syncInstrument); ok { + return inst + } + } + m.errorHandler(ErrUninitializedInstrument) + return nil +} + +// fromSync gets an async implementation object, checking for +// uninitialized instruments and instruments created by another SDK. +func (m *Accumulator) fromAsync(async metric.AsyncImpl) *asyncInstrument { + if async != nil { + if inst, ok := async.Implementation().(*asyncInstrument); ok { + return inst + } + } + m.errorHandler(ErrUninitializedInstrument) + return nil +}