mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-02-11 13:38:37 +02:00
Add the SumObserver instrument (#747)
* Add the SumObserver instrument * Lint
This commit is contained in:
parent
0a333cade1
commit
c5f2252c48
@ -180,27 +180,50 @@ func TestValueRecorder(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestObserverInstruments(t *testing.T) {
|
func TestObserverInstruments(t *testing.T) {
|
||||||
{
|
t.Run("float valueobserver", func(t *testing.T) {
|
||||||
labels := []kv.KeyValue{kv.String("O", "P")}
|
labels := []kv.KeyValue{kv.String("O", "P")}
|
||||||
mockSDK, meter := mockTest.NewMeter()
|
mockSDK, meter := mockTest.NewMeter()
|
||||||
o := Must(meter).RegisterFloat64ValueObserver("test.observer.float", func(result metric.Float64ObserverResult) {
|
o := Must(meter).RegisterFloat64ValueObserver("test.valueobserver.float", func(result metric.Float64ObserverResult) {
|
||||||
result.Observe(42, labels...)
|
result.Observe(42.1, labels...)
|
||||||
})
|
})
|
||||||
t.Log("Testing float observer")
|
|
||||||
|
|
||||||
mockSDK.RunAsyncInstruments()
|
mockSDK.RunAsyncInstruments()
|
||||||
checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, o.AsyncImpl())
|
checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, metric.ValueObserverKind, o.AsyncImpl(),
|
||||||
}
|
42.1,
|
||||||
{
|
)
|
||||||
|
})
|
||||||
|
t.Run("int valueobserver", func(t *testing.T) {
|
||||||
labels := []kv.KeyValue{}
|
labels := []kv.KeyValue{}
|
||||||
mockSDK, meter := mockTest.NewMeter()
|
mockSDK, meter := mockTest.NewMeter()
|
||||||
o := Must(meter).RegisterInt64ValueObserver("test.observer.int", func(result metric.Int64ObserverResult) {
|
o := Must(meter).RegisterInt64ValueObserver("test.observer.int", func(result metric.Int64ObserverResult) {
|
||||||
result.Observe(42, labels...)
|
result.Observe(-142, labels...)
|
||||||
})
|
})
|
||||||
t.Log("Testing int observer")
|
|
||||||
mockSDK.RunAsyncInstruments()
|
mockSDK.RunAsyncInstruments()
|
||||||
checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, o.AsyncImpl())
|
checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, metric.ValueObserverKind, o.AsyncImpl(),
|
||||||
}
|
-142,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
t.Run("float sumobserver", func(t *testing.T) {
|
||||||
|
labels := []kv.KeyValue{kv.String("O", "P")}
|
||||||
|
mockSDK, meter := mockTest.NewMeter()
|
||||||
|
o := Must(meter).RegisterFloat64SumObserver("test.sumobserver.float", func(result metric.Float64ObserverResult) {
|
||||||
|
result.Observe(42.1, labels...)
|
||||||
|
})
|
||||||
|
mockSDK.RunAsyncInstruments()
|
||||||
|
checkObserverBatch(t, labels, mockSDK, metric.Float64NumberKind, metric.SumObserverKind, o.AsyncImpl(),
|
||||||
|
42.1,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
t.Run("int sumobserver", func(t *testing.T) {
|
||||||
|
labels := []kv.KeyValue{}
|
||||||
|
mockSDK, meter := mockTest.NewMeter()
|
||||||
|
o := Must(meter).RegisterInt64SumObserver("test.observer.int", func(result metric.Int64ObserverResult) {
|
||||||
|
result.Observe(-142, labels...)
|
||||||
|
})
|
||||||
|
mockSDK.RunAsyncInstruments()
|
||||||
|
checkObserverBatch(t, labels, mockSDK, metric.Int64NumberKind, metric.SumObserverKind, o.AsyncImpl(),
|
||||||
|
-142,
|
||||||
|
)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkSyncBatches(t *testing.T, ctx context.Context, labels []kv.KeyValue, mock *mockTest.MeterImpl, nkind metric.NumberKind, mkind metric.Kind, instrument metric.InstrumentImpl, expected ...float64) {
|
func checkSyncBatches(t *testing.T, ctx context.Context, labels []kv.KeyValue, mock *mockTest.MeterImpl, nkind metric.NumberKind, mkind metric.Kind, instrument metric.InstrumentImpl, expected ...float64) {
|
||||||
@ -290,7 +313,7 @@ func TestBatchObserverInstruments(t *testing.T) {
|
|||||||
require.Equal(t, 0, m2.Number.CompareNumber(metric.Float64NumberKind, number(t, metric.Float64NumberKind, 42)))
|
require.Equal(t, 0, m2.Number.CompareNumber(metric.Float64NumberKind, number(t, metric.Float64NumberKind, 42)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkObserverBatch(t *testing.T, labels []kv.KeyValue, mock *mockTest.MeterImpl, kind metric.NumberKind, observer metric.AsyncImpl) {
|
func checkObserverBatch(t *testing.T, labels []kv.KeyValue, mock *mockTest.MeterImpl, nkind metric.NumberKind, mkind metric.Kind, observer metric.AsyncImpl, expected float64) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
assert.Len(t, mock.MeasurementBatches, 1)
|
assert.Len(t, mock.MeasurementBatches, 1)
|
||||||
if len(mock.MeasurementBatches) < 1 {
|
if len(mock.MeasurementBatches) < 1 {
|
||||||
@ -307,9 +330,10 @@ func checkObserverBatch(t *testing.T, labels []kv.KeyValue, mock *mockTest.Meter
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
measurement := got.Measurements[0]
|
measurement := got.Measurements[0]
|
||||||
|
require.Equal(t, mkind, measurement.Instrument.Descriptor().MetricKind())
|
||||||
assert.Equal(t, o, measurement.Instrument.Implementation().(*mockTest.Async))
|
assert.Equal(t, o, measurement.Instrument.Implementation().(*mockTest.Async))
|
||||||
ft := number(t, kind, 42)
|
ft := number(t, nkind, expected)
|
||||||
assert.Equal(t, 0, measurement.Number.CompareNumber(kind, ft))
|
assert.Equal(t, 0, measurement.Number.CompareNumber(nkind, ft))
|
||||||
}
|
}
|
||||||
|
|
||||||
func number(t *testing.T, kind metric.NumberKind, value float64) metric.Number {
|
func number(t *testing.T, kind metric.NumberKind, value float64) metric.Number {
|
||||||
|
@ -176,20 +176,26 @@ func (b *BatchObserverCallback) Run(function func([]kv.KeyValue, ...Observation)
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapInt64ValueObserverInstrument returns an `Int64ValueObserver` from a
|
// wrapInt64ValueObserverInstrument converts an AsyncImpl into Int64ValueObserver.
|
||||||
// `AsyncImpl`. An error will be generated if the
|
|
||||||
// `AsyncImpl` is nil (in which case a No-op is substituted),
|
|
||||||
// otherwise the error passes through.
|
|
||||||
func wrapInt64ValueObserverInstrument(asyncInst AsyncImpl, err error) (Int64ValueObserver, error) {
|
func wrapInt64ValueObserverInstrument(asyncInst AsyncImpl, err error) (Int64ValueObserver, error) {
|
||||||
common, err := checkNewAsync(asyncInst, err)
|
common, err := checkNewAsync(asyncInst, err)
|
||||||
return Int64ValueObserver{asyncInstrument: common}, err
|
return Int64ValueObserver{asyncInstrument: common}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// wrapFloat64ValueObserverInstrument returns an `Float64ValueObserver` from a
|
// wrapFloat64ValueObserverInstrument converts an AsyncImpl into Float64ValueObserver.
|
||||||
// `AsyncImpl`. An error will be generated if the
|
|
||||||
// `AsyncImpl` is nil (in which case a No-op is substituted),
|
|
||||||
// otherwise the error passes through.
|
|
||||||
func wrapFloat64ValueObserverInstrument(asyncInst AsyncImpl, err error) (Float64ValueObserver, error) {
|
func wrapFloat64ValueObserverInstrument(asyncInst AsyncImpl, err error) (Float64ValueObserver, error) {
|
||||||
common, err := checkNewAsync(asyncInst, err)
|
common, err := checkNewAsync(asyncInst, err)
|
||||||
return Float64ValueObserver{asyncInstrument: common}, err
|
return Float64ValueObserver{asyncInstrument: common}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// wrapInt64SumObserverInstrument converts an AsyncImpl into Int64SumObserver.
|
||||||
|
func wrapInt64SumObserverInstrument(asyncInst AsyncImpl, err error) (Int64SumObserver, error) {
|
||||||
|
common, err := checkNewAsync(asyncInst, err)
|
||||||
|
return Int64SumObserver{asyncInstrument: common}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// wrapFloat64SumObserverInstrument converts an AsyncImpl into Float64SumObserver.
|
||||||
|
func wrapFloat64SumObserverInstrument(asyncInst AsyncImpl, err error) (Float64SumObserver, error) {
|
||||||
|
common, err := checkNewAsync(asyncInst, err)
|
||||||
|
return Float64SumObserver{asyncInstrument: common}, err
|
||||||
|
}
|
||||||
|
@ -29,4 +29,7 @@ const (
|
|||||||
CounterKind
|
CounterKind
|
||||||
// UpDownCounterKind indicates a UpDownCounter instrument.
|
// UpDownCounterKind indicates a UpDownCounter instrument.
|
||||||
UpDownCounterKind
|
UpDownCounterKind
|
||||||
|
|
||||||
|
// SumObserverKind indicates a SumObserver instrument.
|
||||||
|
SumObserverKind
|
||||||
)
|
)
|
||||||
|
@ -12,11 +12,12 @@ func _() {
|
|||||||
_ = x[ValueObserverKind-1]
|
_ = x[ValueObserverKind-1]
|
||||||
_ = x[CounterKind-2]
|
_ = x[CounterKind-2]
|
||||||
_ = x[UpDownCounterKind-3]
|
_ = x[UpDownCounterKind-3]
|
||||||
|
_ = x[SumObserverKind-4]
|
||||||
}
|
}
|
||||||
|
|
||||||
const _Kind_name = "ValueRecorderKindValueObserverKindCounterKindUpDownCounterKind"
|
const _Kind_name = "ValueRecorderKindValueObserverKindCounterKindUpDownCounterKindSumObserverKind"
|
||||||
|
|
||||||
var _Kind_index = [...]uint8{0, 17, 34, 45, 62}
|
var _Kind_index = [...]uint8{0, 17, 34, 45, 62, 77}
|
||||||
|
|
||||||
func (i Kind) String() string {
|
func (i Kind) String() string {
|
||||||
if i < 0 || i >= Kind(len(_Kind_index)-1) {
|
if i < 0 || i >= Kind(len(_Kind_index)-1) {
|
||||||
|
@ -144,6 +144,32 @@ func (m Meter) RegisterFloat64ValueObserver(name string, callback Float64Observe
|
|||||||
newFloat64AsyncRunner(callback)))
|
newFloat64AsyncRunner(callback)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterInt64SumObserver creates a new integer SumObserver instrument
|
||||||
|
// with the given name, running a given callback, and customized with
|
||||||
|
// options. May return an error if the name is invalid (e.g., empty)
|
||||||
|
// or improperly registered (e.g., duplicate registration).
|
||||||
|
func (m Meter) RegisterInt64SumObserver(name string, callback Int64ObserverCallback, opts ...Option) (Int64SumObserver, error) {
|
||||||
|
if callback == nil {
|
||||||
|
return wrapInt64SumObserverInstrument(NoopAsync{}, nil)
|
||||||
|
}
|
||||||
|
return wrapInt64SumObserverInstrument(
|
||||||
|
m.newAsync(name, SumObserverKind, Int64NumberKind, opts,
|
||||||
|
newInt64AsyncRunner(callback)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterFloat64SumObserver creates a new floating point SumObserver with
|
||||||
|
// the given name, running a given callback, and customized with
|
||||||
|
// options. May return an error if the name is invalid (e.g., empty)
|
||||||
|
// or improperly registered (e.g., duplicate registration).
|
||||||
|
func (m Meter) RegisterFloat64SumObserver(name string, callback Float64ObserverCallback, opts ...Option) (Float64SumObserver, error) {
|
||||||
|
if callback == nil {
|
||||||
|
return wrapFloat64SumObserverInstrument(NoopAsync{}, nil)
|
||||||
|
}
|
||||||
|
return wrapFloat64SumObserverInstrument(
|
||||||
|
m.newAsync(name, SumObserverKind, Float64NumberKind, opts,
|
||||||
|
newFloat64AsyncRunner(callback)))
|
||||||
|
}
|
||||||
|
|
||||||
// RegisterInt64ValueObserver creates a new integer ValueObserver instrument
|
// RegisterInt64ValueObserver creates a new integer ValueObserver instrument
|
||||||
// with the given name, running in a batch callback, and customized with
|
// with the given name, running in a batch callback, and customized with
|
||||||
// options. May return an error if the name is invalid (e.g., empty)
|
// options. May return an error if the name is invalid (e.g., empty)
|
||||||
@ -169,6 +195,31 @@ func (b BatchObserver) RegisterFloat64ValueObserver(name string, opts ...Option)
|
|||||||
b.runner))
|
b.runner))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterInt64SumObserver creates a new integer SumObserver instrument
|
||||||
|
// with the given name, running in a batch callback, and customized with
|
||||||
|
// options. May return an error if the name is invalid (e.g., empty)
|
||||||
|
// or improperly registered (e.g., duplicate registration).
|
||||||
|
func (b BatchObserver) RegisterInt64SumObserver(name string, opts ...Option) (Int64SumObserver, error) {
|
||||||
|
if b.runner == nil {
|
||||||
|
return wrapInt64SumObserverInstrument(NoopAsync{}, nil)
|
||||||
|
}
|
||||||
|
return wrapInt64SumObserverInstrument(
|
||||||
|
b.meter.newAsync(name, SumObserverKind, Int64NumberKind, opts, b.runner))
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterFloat64SumObserver creates a new floating point SumObserver with
|
||||||
|
// the given name, running in a batch callback, and customized with
|
||||||
|
// options. May return an error if the name is invalid (e.g., empty)
|
||||||
|
// or improperly registered (e.g., duplicate registration).
|
||||||
|
func (b BatchObserver) RegisterFloat64SumObserver(name string, opts ...Option) (Float64SumObserver, error) {
|
||||||
|
if b.runner == nil {
|
||||||
|
return wrapFloat64SumObserverInstrument(NoopAsync{}, nil)
|
||||||
|
}
|
||||||
|
return wrapFloat64SumObserverInstrument(
|
||||||
|
b.meter.newAsync(name, SumObserverKind, Float64NumberKind, opts,
|
||||||
|
b.runner))
|
||||||
|
}
|
||||||
|
|
||||||
// MeterImpl returns the underlying MeterImpl of this Meter.
|
// MeterImpl returns the underlying MeterImpl of this Meter.
|
||||||
func (m Meter) MeterImpl() MeterImpl {
|
func (m Meter) MeterImpl() MeterImpl {
|
||||||
return m.impl
|
return m.impl
|
||||||
|
@ -113,6 +113,26 @@ func (mm MeterMust) RegisterFloat64ValueObserver(name string, callback Float64Ob
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterInt64SumObserver calls `Meter.RegisterInt64SumObserver` and
|
||||||
|
// returns the instrument, panicking if it encounters an error.
|
||||||
|
func (mm MeterMust) RegisterInt64SumObserver(name string, callback Int64ObserverCallback, oos ...Option) Int64SumObserver {
|
||||||
|
if inst, err := mm.meter.RegisterInt64SumObserver(name, callback, oos...); err != nil {
|
||||||
|
panic(err)
|
||||||
|
} else {
|
||||||
|
return inst
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterFloat64SumObserver calls `Meter.RegisterFloat64SumObserver` and
|
||||||
|
// returns the instrument, panicking if it encounters an error.
|
||||||
|
func (mm MeterMust) RegisterFloat64SumObserver(name string, callback Float64ObserverCallback, oos ...Option) Float64SumObserver {
|
||||||
|
if inst, err := mm.meter.RegisterFloat64SumObserver(name, callback, oos...); err != nil {
|
||||||
|
panic(err)
|
||||||
|
} else {
|
||||||
|
return inst
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewBatchObserver returns a wrapper around BatchObserver that panics
|
// NewBatchObserver returns a wrapper around BatchObserver that panics
|
||||||
// when any instrument constructor returns an error.
|
// when any instrument constructor returns an error.
|
||||||
func (mm MeterMust) NewBatchObserver(callback BatchObserverCallback) BatchObserverMust {
|
func (mm MeterMust) NewBatchObserver(callback BatchObserverCallback) BatchObserverMust {
|
||||||
@ -140,3 +160,23 @@ func (bm BatchObserverMust) RegisterFloat64ValueObserver(name string, oos ...Opt
|
|||||||
return inst
|
return inst
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterInt64SumObserver calls `BatchObserver.RegisterInt64SumObserver` and
|
||||||
|
// returns the instrument, panicking if it encounters an error.
|
||||||
|
func (bm BatchObserverMust) RegisterInt64SumObserver(name string, oos ...Option) Int64SumObserver {
|
||||||
|
if inst, err := bm.batch.RegisterInt64SumObserver(name, oos...); err != nil {
|
||||||
|
panic(err)
|
||||||
|
} else {
|
||||||
|
return inst
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterFloat64SumObserver calls `BatchObserver.RegisterFloat64SumObserver` and
|
||||||
|
// returns the instrument, panicking if it encounters an error.
|
||||||
|
func (bm BatchObserverMust) RegisterFloat64SumObserver(name string, oos ...Option) Float64SumObserver {
|
||||||
|
if inst, err := bm.batch.RegisterFloat64SumObserver(name, oos...); err != nil {
|
||||||
|
panic(err)
|
||||||
|
} else {
|
||||||
|
return inst
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -33,6 +33,18 @@ type Float64ValueObserver struct {
|
|||||||
asyncInstrument
|
asyncInstrument
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Int64SumObserver is a metric that captures a precomputed sum of
|
||||||
|
// int64 values at a point in time.
|
||||||
|
type Int64SumObserver struct {
|
||||||
|
asyncInstrument
|
||||||
|
}
|
||||||
|
|
||||||
|
// Float64SumObserver is a metric that captures a precomputed sum of
|
||||||
|
// float64 values at a point in time.
|
||||||
|
type Float64SumObserver struct {
|
||||||
|
asyncInstrument
|
||||||
|
}
|
||||||
|
|
||||||
// Observation returns an Observation, a BatchObserverCallback
|
// Observation returns an Observation, a BatchObserverCallback
|
||||||
// argument, for an asynchronous integer instrument.
|
// argument, for an asynchronous integer instrument.
|
||||||
// This returns an implementation-level object for use by the SDK,
|
// This returns an implementation-level object for use by the SDK,
|
||||||
@ -54,3 +66,25 @@ func (f Float64ValueObserver) Observation(v float64) Observation {
|
|||||||
instrument: f.instrument,
|
instrument: f.instrument,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Observation returns an Observation, a BatchObserverCallback
|
||||||
|
// argument, for an asynchronous integer instrument.
|
||||||
|
// This returns an implementation-level object for use by the SDK,
|
||||||
|
// users should not refer to this.
|
||||||
|
func (i Int64SumObserver) Observation(v int64) Observation {
|
||||||
|
return Observation{
|
||||||
|
number: NewInt64Number(v),
|
||||||
|
instrument: i.instrument,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Observation returns an Observation, a BatchObserverCallback
|
||||||
|
// argument, for an asynchronous integer instrument.
|
||||||
|
// This returns an implementation-level object for use by the SDK,
|
||||||
|
// users should not refer to this.
|
||||||
|
func (f Float64SumObserver) Observation(v float64) Observation {
|
||||||
|
return Observation{
|
||||||
|
number: NewFloat64Number(v),
|
||||||
|
instrument: f.instrument,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -125,7 +125,7 @@ func RangeTest(number metric.Number, descriptor *metric.Descriptor) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
switch descriptor.MetricKind() {
|
switch descriptor.MetricKind() {
|
||||||
case metric.CounterKind:
|
case metric.CounterKind, metric.SumObserverKind:
|
||||||
if number.IsNegative(numberKind) {
|
if number.IsNegative(numberKind) {
|
||||||
return ErrNegativeInput
|
return ErrNegativeInput
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
@ -45,18 +46,37 @@ type correctnessIntegrator struct {
|
|||||||
t *testing.T
|
t *testing.T
|
||||||
|
|
||||||
records []export.Record
|
records []export.Record
|
||||||
|
|
||||||
|
sync.Mutex
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) {
|
func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) {
|
||||||
integrator := &correctnessIntegrator{
|
integrator := &correctnessIntegrator{
|
||||||
t: t,
|
t: t,
|
||||||
}
|
}
|
||||||
accum := metricsdk.NewAccumulator(integrator, metricsdk.WithResource(testResource))
|
accum := metricsdk.NewAccumulator(
|
||||||
|
integrator,
|
||||||
|
metricsdk.WithResource(testResource),
|
||||||
|
metricsdk.WithErrorHandler(func(err error) {
|
||||||
|
integrator.Lock()
|
||||||
|
defer integrator.Unlock()
|
||||||
|
integrator.err = err
|
||||||
|
}),
|
||||||
|
)
|
||||||
meter := metric.WrapMeterImpl(accum, "test")
|
meter := metric.WrapMeterImpl(accum, "test")
|
||||||
return meter, accum, integrator
|
return meter, accum, integrator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cb *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
|
func (ci *correctnessIntegrator) sdkErr() error {
|
||||||
|
ci.Lock()
|
||||||
|
defer ci.Unlock()
|
||||||
|
t := ci.err
|
||||||
|
ci.err = nil
|
||||||
|
return t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ci *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
|
||||||
name := descriptor.Name()
|
name := descriptor.Name()
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
@ -68,21 +88,21 @@ func (cb *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (a
|
|||||||
agg = array.New()
|
agg = array.New()
|
||||||
}
|
}
|
||||||
if agg != nil {
|
if agg != nil {
|
||||||
atomic.AddInt64(&cb.newAggCount, 1)
|
atomic.AddInt64(&ci.newAggCount, 1)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cb *correctnessIntegrator) CheckpointSet() export.CheckpointSet {
|
func (ci *correctnessIntegrator) CheckpointSet() export.CheckpointSet {
|
||||||
cb.t.Fatal("Should not be called")
|
ci.t.Fatal("Should not be called")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*correctnessIntegrator) FinishedCollection() {
|
func (*correctnessIntegrator) FinishedCollection() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cb *correctnessIntegrator) Process(_ context.Context, record export.Record) error {
|
func (ci *correctnessIntegrator) Process(_ context.Context, record export.Record) error {
|
||||||
cb.records = append(cb.records, record)
|
ci.records = append(ci.records, record)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -313,19 +333,37 @@ func TestObserverCollection(t *testing.T) {
|
|||||||
result.Observe(1, kv.String("A", "B"))
|
result.Observe(1, kv.String("A", "B"))
|
||||||
result.Observe(1)
|
result.Observe(1)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
_ = Must(meter).RegisterFloat64SumObserver("float.sumobserver", func(result metric.Float64ObserverResult) {
|
||||||
|
result.Observe(1, kv.String("A", "B"))
|
||||||
|
result.Observe(2, kv.String("A", "B"))
|
||||||
|
result.Observe(1, kv.String("C", "D"))
|
||||||
|
})
|
||||||
|
_ = Must(meter).RegisterInt64SumObserver("int.sumobserver", func(result metric.Int64ObserverResult) {
|
||||||
|
result.Observe(2, kv.String("A", "B"))
|
||||||
|
result.Observe(1)
|
||||||
|
// last value wins
|
||||||
|
result.Observe(1, kv.String("A", "B"))
|
||||||
|
result.Observe(1)
|
||||||
|
})
|
||||||
|
|
||||||
_ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(result metric.Int64ObserverResult) {
|
_ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(result metric.Int64ObserverResult) {
|
||||||
})
|
})
|
||||||
|
|
||||||
collected := sdk.Collect(ctx)
|
collected := sdk.Collect(ctx)
|
||||||
|
|
||||||
require.Equal(t, 4, collected)
|
require.Equal(t, 8, collected)
|
||||||
require.Equal(t, 4, len(integrator.records))
|
require.Equal(t, 8, len(integrator.records))
|
||||||
|
|
||||||
out := batchTest.NewOutput(label.DefaultEncoder())
|
out := batchTest.NewOutput(label.DefaultEncoder())
|
||||||
for _, rec := range integrator.records {
|
for _, rec := range integrator.records {
|
||||||
_ = out.AddTo(rec)
|
_ = out.AddTo(rec)
|
||||||
}
|
}
|
||||||
require.EqualValues(t, map[string]float64{
|
require.EqualValues(t, map[string]float64{
|
||||||
|
"float.sumobserver/A=B/R=V": 2,
|
||||||
|
"float.sumobserver/C=D/R=V": 1,
|
||||||
|
"int.sumobserver//R=V": 1,
|
||||||
|
"int.sumobserver/A=B/R=V": 1,
|
||||||
"float.valueobserver/A=B/R=V": -1,
|
"float.valueobserver/A=B/R=V": -1,
|
||||||
"float.valueobserver/C=D/R=V": -1,
|
"float.valueobserver/C=D/R=V": -1,
|
||||||
"int.valueobserver//R=V": 1,
|
"int.valueobserver//R=V": 1,
|
||||||
@ -333,48 +371,88 @@ func TestObserverCollection(t *testing.T) {
|
|||||||
}, out.Map)
|
}, out.Map)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSumObserverInputRange(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
meter, sdk, integrator := newSDK(t)
|
||||||
|
|
||||||
|
_ = Must(meter).RegisterFloat64SumObserver("float.sumobserver", func(result metric.Float64ObserverResult) {
|
||||||
|
result.Observe(-2, kv.String("A", "B"))
|
||||||
|
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
|
||||||
|
result.Observe(-1, kv.String("C", "D"))
|
||||||
|
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
|
||||||
|
})
|
||||||
|
_ = Must(meter).RegisterInt64SumObserver("int.sumobserver", func(result metric.Int64ObserverResult) {
|
||||||
|
result.Observe(-1, kv.String("A", "B"))
|
||||||
|
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
|
||||||
|
result.Observe(-1)
|
||||||
|
require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr())
|
||||||
|
})
|
||||||
|
|
||||||
|
collected := sdk.Collect(ctx)
|
||||||
|
|
||||||
|
require.Equal(t, 0, collected)
|
||||||
|
require.Equal(t, 0, len(integrator.records))
|
||||||
|
|
||||||
|
// check that the error condition was reset
|
||||||
|
require.NoError(t, integrator.sdkErr())
|
||||||
|
}
|
||||||
|
|
||||||
func TestObserverBatch(t *testing.T) {
|
func TestObserverBatch(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
meter, sdk, integrator := newSDK(t)
|
meter, sdk, integrator := newSDK(t)
|
||||||
|
|
||||||
var floatObs metric.Float64ValueObserver
|
var floatValueObs metric.Float64ValueObserver
|
||||||
var intObs metric.Int64ValueObserver
|
var intValueObs metric.Int64ValueObserver
|
||||||
|
var floatSumObs metric.Float64SumObserver
|
||||||
|
var intSumObs metric.Int64SumObserver
|
||||||
|
|
||||||
var batch = Must(meter).NewBatchObserver(
|
var batch = Must(meter).NewBatchObserver(
|
||||||
func(result metric.BatchObserverResult) {
|
func(result metric.BatchObserverResult) {
|
||||||
result.Observe(
|
result.Observe(
|
||||||
[]kv.KeyValue{
|
[]kv.KeyValue{
|
||||||
kv.String("A", "B"),
|
kv.String("A", "B"),
|
||||||
},
|
},
|
||||||
floatObs.Observation(1),
|
floatValueObs.Observation(1),
|
||||||
floatObs.Observation(-1),
|
floatValueObs.Observation(-1),
|
||||||
intObs.Observation(-1),
|
intValueObs.Observation(-1),
|
||||||
intObs.Observation(1),
|
intValueObs.Observation(1),
|
||||||
|
floatSumObs.Observation(1000),
|
||||||
|
intSumObs.Observation(100),
|
||||||
)
|
)
|
||||||
result.Observe(
|
result.Observe(
|
||||||
[]kv.KeyValue{
|
[]kv.KeyValue{
|
||||||
kv.String("C", "D"),
|
kv.String("C", "D"),
|
||||||
},
|
},
|
||||||
floatObs.Observation(-1),
|
floatValueObs.Observation(-1),
|
||||||
|
floatSumObs.Observation(-1),
|
||||||
)
|
)
|
||||||
result.Observe(
|
result.Observe(
|
||||||
nil,
|
nil,
|
||||||
intObs.Observation(1),
|
intValueObs.Observation(1),
|
||||||
intObs.Observation(1),
|
intValueObs.Observation(1),
|
||||||
|
intSumObs.Observation(10),
|
||||||
|
floatSumObs.Observation(1.1),
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
floatObs = batch.RegisterFloat64ValueObserver("float.valueobserver")
|
floatValueObs = batch.RegisterFloat64ValueObserver("float.valueobserver")
|
||||||
intObs = batch.RegisterInt64ValueObserver("int.valueobserver")
|
intValueObs = batch.RegisterInt64ValueObserver("int.valueobserver")
|
||||||
|
floatSumObs = batch.RegisterFloat64SumObserver("float.sumobserver")
|
||||||
|
intSumObs = batch.RegisterInt64SumObserver("int.sumobserver")
|
||||||
|
|
||||||
collected := sdk.Collect(ctx)
|
collected := sdk.Collect(ctx)
|
||||||
|
|
||||||
require.Equal(t, 4, collected)
|
require.Equal(t, 8, collected)
|
||||||
require.Equal(t, 4, len(integrator.records))
|
require.Equal(t, 8, len(integrator.records))
|
||||||
|
|
||||||
out := batchTest.NewOutput(label.DefaultEncoder())
|
out := batchTest.NewOutput(label.DefaultEncoder())
|
||||||
for _, rec := range integrator.records {
|
for _, rec := range integrator.records {
|
||||||
_ = out.AddTo(rec)
|
_ = out.AddTo(rec)
|
||||||
}
|
}
|
||||||
require.EqualValues(t, map[string]float64{
|
require.EqualValues(t, map[string]float64{
|
||||||
|
"float.sumobserver//R=V": 1.1,
|
||||||
|
"float.sumobserver/A=B/R=V": 1000,
|
||||||
|
"int.sumobserver//R=V": 10,
|
||||||
|
"int.sumobserver/A=B/R=V": 100,
|
||||||
"float.valueobserver/A=B/R=V": -1,
|
"float.valueobserver/A=B/R=V": -1,
|
||||||
"float.valueobserver/C=D/R=V": -1,
|
"float.valueobserver/C=D/R=V": -1,
|
||||||
"int.valueobserver//R=V": 1,
|
"int.valueobserver//R=V": 1,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user