diff --git a/api/metric/api_test.go b/api/metric/api_test.go index 71c2f0a0f..0650269f1 100644 --- a/api/metric/api_test.go +++ b/api/metric/api_test.go @@ -180,27 +180,50 @@ func TestValueRecorder(t *testing.T) { } func TestObserverInstruments(t *testing.T) { - { + t.Run("float valueobserver", func(t *testing.T) { labels := []kv.KeyValue{kv.String("O", "P")} mockSDK, meter := mockTest.NewMeter() - o := Must(meter).RegisterFloat64ValueObserver("test.observer.float", func(result metric.Float64ObserverResult) { - result.Observe(42, labels...) + o := Must(meter).RegisterFloat64ValueObserver("test.valueobserver.float", func(result metric.Float64ObserverResult) { + result.Observe(42.1, labels...) }) - t.Log("Testing float observer") - mockSDK.RunAsyncInstruments() - checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, o.AsyncImpl()) - } - { + checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, metric.ValueObserverKind, o.AsyncImpl(), + 42.1, + ) + }) + t.Run("int valueobserver", func(t *testing.T) { labels := []kv.KeyValue{} mockSDK, meter := mockTest.NewMeter() o := Must(meter).RegisterInt64ValueObserver("test.observer.int", func(result metric.Int64ObserverResult) { - result.Observe(42, labels...) + result.Observe(-142, labels...) }) - t.Log("Testing int observer") mockSDK.RunAsyncInstruments() - checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, o.AsyncImpl()) - } + checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, metric.ValueObserverKind, o.AsyncImpl(), + -142, + ) + }) + t.Run("float sumobserver", func(t *testing.T) { + labels := []kv.KeyValue{kv.String("O", "P")} + mockSDK, meter := mockTest.NewMeter() + o := Must(meter).RegisterFloat64SumObserver("test.sumobserver.float", func(result metric.Float64ObserverResult) { + result.Observe(42.1, labels...) + }) + mockSDK.RunAsyncInstruments() + checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, metric.SumObserverKind, o.AsyncImpl(), + 42.1, + ) + }) + t.Run("int sumobserver", func(t *testing.T) { + labels := []kv.KeyValue{} + mockSDK, meter := mockTest.NewMeter() + o := Must(meter).RegisterInt64SumObserver("test.observer.int", func(result metric.Int64ObserverResult) { + result.Observe(-142, labels...) + }) + mockSDK.RunAsyncInstruments() + checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, metric.SumObserverKind, o.AsyncImpl(), + -142, + ) + }) } func checkSyncBatches(t *testing.T, ctx context.Context, labels []kv.KeyValue, mock *mockTest.MeterImpl, nkind metric.NumberKind, mkind metric.Kind, instrument metric.InstrumentImpl, expected ...float64) { @@ -290,7 +313,7 @@ func TestBatchObserverInstruments(t *testing.T) { require.Equal(t, 0, m2.Number.CompareNumber(metric.Float64NumberKind, number(t, metric.Float64NumberKind, 42))) } -func checkObserverBatch(t *testing.T, labels []kv.KeyValue, mock *mockTest.MeterImpl, kind metric.NumberKind, observer metric.AsyncImpl) { +func checkObserverBatch(t *testing.T, labels []kv.KeyValue, mock *mockTest.MeterImpl, nkind metric.NumberKind, mkind metric.Kind, observer metric.AsyncImpl, expected float64) { t.Helper() assert.Len(t, mock.MeasurementBatches, 1) if len(mock.MeasurementBatches) < 1 { @@ -307,9 +330,10 @@ func checkObserverBatch(t *testing.T, labels []kv.KeyValue, mock *mockTest.Meter return } measurement := got.Measurements[0] + require.Equal(t, mkind, measurement.Instrument.Descriptor().MetricKind()) assert.Equal(t, o, measurement.Instrument.Implementation().(*mockTest.Async)) - ft := number(t, kind, 42) - assert.Equal(t, 0, measurement.Number.CompareNumber(kind, ft)) + ft := number(t, nkind, expected) + assert.Equal(t, 0, measurement.Number.CompareNumber(nkind, ft)) } func number(t *testing.T, kind metric.NumberKind, value float64) metric.Number { diff --git a/api/metric/async.go b/api/metric/async.go index 7f766e1ed..e54f0cf0d 100644 --- a/api/metric/async.go +++ b/api/metric/async.go @@ -176,20 +176,26 @@ func (b *BatchObserverCallback) Run(function func([]kv.KeyValue, ...Observation) }) } -// wrapInt64ValueObserverInstrument returns an `Int64ValueObserver` from a -// `AsyncImpl`. An error will be generated if the -// `AsyncImpl` is nil (in which case a No-op is substituted), -// otherwise the error passes through. +// wrapInt64ValueObserverInstrument converts an AsyncImpl into Int64ValueObserver. func wrapInt64ValueObserverInstrument(asyncInst AsyncImpl, err error) (Int64ValueObserver, error) { common, err := checkNewAsync(asyncInst, err) return Int64ValueObserver{asyncInstrument: common}, err } -// wrapFloat64ValueObserverInstrument returns an `Float64ValueObserver` from a -// `AsyncImpl`. An error will be generated if the -// `AsyncImpl` is nil (in which case a No-op is substituted), -// otherwise the error passes through. +// wrapFloat64ValueObserverInstrument converts an AsyncImpl into Float64ValueObserver. func wrapFloat64ValueObserverInstrument(asyncInst AsyncImpl, err error) (Float64ValueObserver, error) { common, err := checkNewAsync(asyncInst, err) return Float64ValueObserver{asyncInstrument: common}, err } + +// wrapInt64SumObserverInstrument converts an AsyncImpl into Int64SumObserver. +func wrapInt64SumObserverInstrument(asyncInst AsyncImpl, err error) (Int64SumObserver, error) { + common, err := checkNewAsync(asyncInst, err) + return Int64SumObserver{asyncInstrument: common}, err +} + +// wrapFloat64SumObserverInstrument converts an AsyncImpl into Float64SumObserver. +func wrapFloat64SumObserverInstrument(asyncInst AsyncImpl, err error) (Float64SumObserver, error) { + common, err := checkNewAsync(asyncInst, err) + return Float64SumObserver{asyncInstrument: common}, err +} diff --git a/api/metric/kind.go b/api/metric/kind.go index 66fc1b01e..fca4fa6fb 100644 --- a/api/metric/kind.go +++ b/api/metric/kind.go @@ -29,4 +29,7 @@ const ( CounterKind // UpDownCounterKind indicates a UpDownCounter instrument. UpDownCounterKind + + // SumObserverKind indicates a SumObserver instrument. + SumObserverKind ) diff --git a/api/metric/kind_string.go b/api/metric/kind_string.go index 5f089067e..33118e2a0 100644 --- a/api/metric/kind_string.go +++ b/api/metric/kind_string.go @@ -12,11 +12,12 @@ func _() { _ = x[ValueObserverKind-1] _ = x[CounterKind-2] _ = x[UpDownCounterKind-3] + _ = x[SumObserverKind-4] } -const _Kind_name = "ValueRecorderKindValueObserverKindCounterKindUpDownCounterKind" +const _Kind_name = "ValueRecorderKindValueObserverKindCounterKindUpDownCounterKindSumObserverKind" -var _Kind_index = [...]uint8{0, 17, 34, 45, 62} +var _Kind_index = [...]uint8{0, 17, 34, 45, 62, 77} func (i Kind) String() string { if i < 0 || i >= Kind(len(_Kind_index)-1) { diff --git a/api/metric/meter.go b/api/metric/meter.go index 9ca493e0e..e1b546bf9 100644 --- a/api/metric/meter.go +++ b/api/metric/meter.go @@ -144,6 +144,32 @@ func (m Meter) RegisterFloat64ValueObserver(name string, callback Float64Observe newFloat64AsyncRunner(callback))) } +// RegisterInt64SumObserver creates a new integer SumObserver instrument +// with the given name, running a given callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (m Meter) RegisterInt64SumObserver(name string, callback Int64ObserverCallback, opts ...Option) (Int64SumObserver, error) { + if callback == nil { + return wrapInt64SumObserverInstrument(NoopAsync{}, nil) + } + return wrapInt64SumObserverInstrument( + m.newAsync(name, SumObserverKind, Int64NumberKind, opts, + newInt64AsyncRunner(callback))) +} + +// RegisterFloat64SumObserver creates a new floating point SumObserver with +// the given name, running a given callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (m Meter) RegisterFloat64SumObserver(name string, callback Float64ObserverCallback, opts ...Option) (Float64SumObserver, error) { + if callback == nil { + return wrapFloat64SumObserverInstrument(NoopAsync{}, nil) + } + return wrapFloat64SumObserverInstrument( + m.newAsync(name, SumObserverKind, Float64NumberKind, opts, + newFloat64AsyncRunner(callback))) +} + // RegisterInt64ValueObserver creates a new integer ValueObserver instrument // with the given name, running in a batch callback, and customized with // options. May return an error if the name is invalid (e.g., empty) @@ -169,6 +195,31 @@ func (b BatchObserver) RegisterFloat64ValueObserver(name string, opts ...Option) b.runner)) } +// RegisterInt64SumObserver creates a new integer SumObserver instrument +// with the given name, running in a batch callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (b BatchObserver) RegisterInt64SumObserver(name string, opts ...Option) (Int64SumObserver, error) { + if b.runner == nil { + return wrapInt64SumObserverInstrument(NoopAsync{}, nil) + } + return wrapInt64SumObserverInstrument( + b.meter.newAsync(name, SumObserverKind, Int64NumberKind, opts, b.runner)) +} + +// RegisterFloat64SumObserver creates a new floating point SumObserver with +// the given name, running in a batch callback, and customized with +// options. May return an error if the name is invalid (e.g., empty) +// or improperly registered (e.g., duplicate registration). +func (b BatchObserver) RegisterFloat64SumObserver(name string, opts ...Option) (Float64SumObserver, error) { + if b.runner == nil { + return wrapFloat64SumObserverInstrument(NoopAsync{}, nil) + } + return wrapFloat64SumObserverInstrument( + b.meter.newAsync(name, SumObserverKind, Float64NumberKind, opts, + b.runner)) +} + // MeterImpl returns the underlying MeterImpl of this Meter. func (m Meter) MeterImpl() MeterImpl { return m.impl diff --git a/api/metric/must.go b/api/metric/must.go index e734e8292..bf4b60284 100644 --- a/api/metric/must.go +++ b/api/metric/must.go @@ -113,6 +113,26 @@ func (mm MeterMust) RegisterFloat64ValueObserver(name string, callback Float64Ob } } +// RegisterInt64SumObserver calls `Meter.RegisterInt64SumObserver` and +// returns the instrument, panicking if it encounters an error. +func (mm MeterMust) RegisterInt64SumObserver(name string, callback Int64ObserverCallback, oos ...Option) Int64SumObserver { + if inst, err := mm.meter.RegisterInt64SumObserver(name, callback, oos...); err != nil { + panic(err) + } else { + return inst + } +} + +// RegisterFloat64SumObserver calls `Meter.RegisterFloat64SumObserver` and +// returns the instrument, panicking if it encounters an error. +func (mm MeterMust) RegisterFloat64SumObserver(name string, callback Float64ObserverCallback, oos ...Option) Float64SumObserver { + if inst, err := mm.meter.RegisterFloat64SumObserver(name, callback, oos...); err != nil { + panic(err) + } else { + return inst + } +} + // NewBatchObserver returns a wrapper around BatchObserver that panics // when any instrument constructor returns an error. func (mm MeterMust) NewBatchObserver(callback BatchObserverCallback) BatchObserverMust { @@ -140,3 +160,23 @@ func (bm BatchObserverMust) RegisterFloat64ValueObserver(name string, oos ...Opt return inst } } + +// RegisterInt64SumObserver calls `BatchObserver.RegisterInt64SumObserver` and +// returns the instrument, panicking if it encounters an error. +func (bm BatchObserverMust) RegisterInt64SumObserver(name string, oos ...Option) Int64SumObserver { + if inst, err := bm.batch.RegisterInt64SumObserver(name, oos...); err != nil { + panic(err) + } else { + return inst + } +} + +// RegisterFloat64SumObserver calls `BatchObserver.RegisterFloat64SumObserver` and +// returns the instrument, panicking if it encounters an error. +func (bm BatchObserverMust) RegisterFloat64SumObserver(name string, oos ...Option) Float64SumObserver { + if inst, err := bm.batch.RegisterFloat64SumObserver(name, oos...); err != nil { + panic(err) + } else { + return inst + } +} diff --git a/api/metric/observer.go b/api/metric/observer.go index 9d1a0582c..f9100f310 100644 --- a/api/metric/observer.go +++ b/api/metric/observer.go @@ -33,6 +33,18 @@ type Float64ValueObserver struct { asyncInstrument } +// Int64SumObserver is a metric that captures a precomputed sum of +// int64 values at a point in time. +type Int64SumObserver struct { + asyncInstrument +} + +// Float64SumObserver is a metric that captures a precomputed sum of +// float64 values at a point in time. +type Float64SumObserver struct { + asyncInstrument +} + // Observation returns an Observation, a BatchObserverCallback // argument, for an asynchronous integer instrument. // This returns an implementation-level object for use by the SDK, @@ -54,3 +66,25 @@ func (f Float64ValueObserver) Observation(v float64) Observation { instrument: f.instrument, } } + +// Observation returns an Observation, a BatchObserverCallback +// argument, for an asynchronous integer instrument. +// This returns an implementation-level object for use by the SDK, +// users should not refer to this. +func (i Int64SumObserver) Observation(v int64) Observation { + return Observation{ + number: NewInt64Number(v), + instrument: i.instrument, + } +} + +// Observation returns an Observation, a BatchObserverCallback +// argument, for an asynchronous integer instrument. +// This returns an implementation-level object for use by the SDK, +// users should not refer to this. +func (f Float64SumObserver) Observation(v float64) Observation { + return Observation{ + number: NewFloat64Number(v), + instrument: f.instrument, + } +} diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 660e83ef3..f0b6409e6 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -125,7 +125,7 @@ func RangeTest(number metric.Number, descriptor *metric.Descriptor) error { } switch descriptor.MetricKind() { - case metric.CounterKind: + case metric.CounterKind, metric.SumObserverKind: if number.IsNegative(numberKind) { return ErrNegativeInput } diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 3ffd78929..8eccc0fa0 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -19,6 +19,7 @@ import ( "fmt" "math" "strings" + "sync" "sync/atomic" "testing" @@ -45,18 +46,37 @@ type correctnessIntegrator struct { t *testing.T records []export.Record + + sync.Mutex + err error } func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) { integrator := &correctnessIntegrator{ t: t, } - accum := metricsdk.NewAccumulator(integrator, metricsdk.WithResource(testResource)) + accum := metricsdk.NewAccumulator( + integrator, + metricsdk.WithResource(testResource), + metricsdk.WithErrorHandler(func(err error) { + integrator.Lock() + defer integrator.Unlock() + integrator.err = err + }), + ) meter := metric.WrapMeterImpl(accum, "test") return meter, accum, integrator } -func (cb *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) { +func (ci *correctnessIntegrator) sdkErr() error { + ci.Lock() + defer ci.Unlock() + t := ci.err + ci.err = nil + return t +} + +func (ci *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) { name := descriptor.Name() switch { @@ -68,21 +88,21 @@ func (cb *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (a agg = array.New() } if agg != nil { - atomic.AddInt64(&cb.newAggCount, 1) + atomic.AddInt64(&ci.newAggCount, 1) } return } -func (cb *correctnessIntegrator) CheckpointSet() export.CheckpointSet { - cb.t.Fatal("Should not be called") +func (ci *correctnessIntegrator) CheckpointSet() export.CheckpointSet { + ci.t.Fatal("Should not be called") return nil } func (*correctnessIntegrator) FinishedCollection() { } -func (cb *correctnessIntegrator) Process(_ context.Context, record export.Record) error { - cb.records = append(cb.records, record) +func (ci *correctnessIntegrator) Process(_ context.Context, record export.Record) error { + ci.records = append(ci.records, record) return nil } @@ -313,19 +333,37 @@ func TestObserverCollection(t *testing.T) { result.Observe(1, kv.String("A", "B")) result.Observe(1) }) + + _ = Must(meter).RegisterFloat64SumObserver("float.sumobserver", func(result metric.Float64ObserverResult) { + result.Observe(1, kv.String("A", "B")) + result.Observe(2, kv.String("A", "B")) + result.Observe(1, kv.String("C", "D")) + }) + _ = Must(meter).RegisterInt64SumObserver("int.sumobserver", func(result metric.Int64ObserverResult) { + result.Observe(2, kv.String("A", "B")) + result.Observe(1) + // last value wins + result.Observe(1, kv.String("A", "B")) + result.Observe(1) + }) + _ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(result metric.Int64ObserverResult) { }) collected := sdk.Collect(ctx) - require.Equal(t, 4, collected) - require.Equal(t, 4, len(integrator.records)) + require.Equal(t, 8, collected) + require.Equal(t, 8, len(integrator.records)) out := batchTest.NewOutput(label.DefaultEncoder()) for _, rec := range integrator.records { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ + "float.sumobserver/A=B/R=V": 2, + "float.sumobserver/C=D/R=V": 1, + "int.sumobserver//R=V": 1, + "int.sumobserver/A=B/R=V": 1, "float.valueobserver/A=B/R=V": -1, "float.valueobserver/C=D/R=V": -1, "int.valueobserver//R=V": 1, @@ -333,48 +371,88 @@ func TestObserverCollection(t *testing.T) { }, out.Map) } +func TestSumObserverInputRange(t *testing.T) { + ctx := context.Background() + meter, sdk, integrator := newSDK(t) + + _ = Must(meter).RegisterFloat64SumObserver("float.sumobserver", func(result metric.Float64ObserverResult) { + result.Observe(-2, kv.String("A", "B")) + require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr()) + result.Observe(-1, kv.String("C", "D")) + require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr()) + }) + _ = Must(meter).RegisterInt64SumObserver("int.sumobserver", func(result metric.Int64ObserverResult) { + result.Observe(-1, kv.String("A", "B")) + require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr()) + result.Observe(-1) + require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr()) + }) + + collected := sdk.Collect(ctx) + + require.Equal(t, 0, collected) + require.Equal(t, 0, len(integrator.records)) + + // check that the error condition was reset + require.NoError(t, integrator.sdkErr()) +} + func TestObserverBatch(t *testing.T) { ctx := context.Background() meter, sdk, integrator := newSDK(t) - var floatObs metric.Float64ValueObserver - var intObs metric.Int64ValueObserver + var floatValueObs metric.Float64ValueObserver + var intValueObs metric.Int64ValueObserver + var floatSumObs metric.Float64SumObserver + var intSumObs metric.Int64SumObserver + var batch = Must(meter).NewBatchObserver( func(result metric.BatchObserverResult) { result.Observe( []kv.KeyValue{ kv.String("A", "B"), }, - floatObs.Observation(1), - floatObs.Observation(-1), - intObs.Observation(-1), - intObs.Observation(1), + floatValueObs.Observation(1), + floatValueObs.Observation(-1), + intValueObs.Observation(-1), + intValueObs.Observation(1), + floatSumObs.Observation(1000), + intSumObs.Observation(100), ) result.Observe( []kv.KeyValue{ kv.String("C", "D"), }, - floatObs.Observation(-1), + floatValueObs.Observation(-1), + floatSumObs.Observation(-1), ) result.Observe( nil, - intObs.Observation(1), - intObs.Observation(1), + intValueObs.Observation(1), + intValueObs.Observation(1), + intSumObs.Observation(10), + floatSumObs.Observation(1.1), ) }) - floatObs = batch.RegisterFloat64ValueObserver("float.valueobserver") - intObs = batch.RegisterInt64ValueObserver("int.valueobserver") + floatValueObs = batch.RegisterFloat64ValueObserver("float.valueobserver") + intValueObs = batch.RegisterInt64ValueObserver("int.valueobserver") + floatSumObs = batch.RegisterFloat64SumObserver("float.sumobserver") + intSumObs = batch.RegisterInt64SumObserver("int.sumobserver") collected := sdk.Collect(ctx) - require.Equal(t, 4, collected) - require.Equal(t, 4, len(integrator.records)) + require.Equal(t, 8, collected) + require.Equal(t, 8, len(integrator.records)) out := batchTest.NewOutput(label.DefaultEncoder()) for _, rec := range integrator.records { _ = out.AddTo(rec) } require.EqualValues(t, map[string]float64{ + "float.sumobserver//R=V": 1.1, + "float.sumobserver/A=B/R=V": 1000, + "int.sumobserver//R=V": 10, + "int.sumobserver/A=B/R=V": 100, "float.valueobserver/A=B/R=V": -1, "float.valueobserver/C=D/R=V": -1, "int.valueobserver//R=V": 1,