1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-06-14 23:45:20 +02:00

Add observer metric ()

* wip: observers

* wip: float observers

* fix copy pasta

* wip: rework observers in sdk

* small fix in global meter

* wip: aggregators and selectors

* wip: monotonicity option for observers

* some refactor

* wip: docs

needs more package docs (especially for api/metric and sdk/metric)

* fix ci

* Fix copy-pasta in docs

Co-Authored-By: Mauricio Vásquez <mauricio@kinvolk.io>

* recycle unused recorders in observers

if a recorder for a labelset is unused for a second collection cycle
in a row, drop it

* unregister

* thread-safe set callback

* Fix docs

* Revert "wip: aggregators and selectors"

This reverts commit 37b7d05aed5dc90f6d5593325b6eb77494e21736.

* update selector

* tests

* Rework number equality

Compare concrete numbers, so we can get actual numbers in the error
message when they are not equal, not some uint64 representation. This
also uses InDelta for comparing floats.

* Ensure that Observers are registered in the same order

* Run observers in fixed order

So the tests can be reproducible - iterating a map made the order of
measurements random.

* Ensure the proper alignment of the delegates

This wasn't checked at all. After adding the checks, the test-386
failed.

* Small tweaks to the global meter test

* Ensure proper alignment of the callback pointer

test-386 was complaining about it

* update docs

* update a TODO

* address review issues

* drop SetCallback

Co-authored-by: Mauricio Vásquez <mauricio@kinvolk.io>
Co-authored-by: Rahul Patel <rghetia@yahoo.com>
This commit is contained in:
Rahul Patel 2020-03-05 12:15:30 -08:00 committed by GitHub
parent 547d584da8
commit a202f16100
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1072 additions and 124 deletions

@ -0,0 +1,26 @@
package internal_test
import (
"os"
"testing"
"go.opentelemetry.io/otel/api/global/internal"
ottest "go.opentelemetry.io/otel/internal/testing"
)
// Ensure struct alignment prior to running tests.
func TestMain(m *testing.M) {
fieldsMap := internal.AtomicFieldOffsets()
fields := make([]ottest.FieldOffset, 0, len(fieldsMap))
for name, offset := range fieldsMap {
fields = append(fields, ottest.FieldOffset{
Name: name,
Offset: offset,
})
}
if !ottest.Aligned8Byte(fields, os.Stderr) {
os.Exit(1)
}
os.Exit(m.Run())
}

@ -40,44 +40,76 @@ const (
) )
type meterProvider struct { type meterProvider struct {
lock sync.Mutex
meters []*meter
delegate metric.Provider delegate metric.Provider
lock sync.Mutex
meters []*meter
} }
type meter struct { type meter struct {
delegate unsafe.Pointer // (*metric.Meter)
provider *meterProvider provider *meterProvider
name string name string
lock sync.Mutex lock sync.Mutex
instruments []*instImpl instruments []*instImpl
liveObservers map[*obsImpl]struct{}
delegate unsafe.Pointer // (*metric.Meter) // orderedObservers slice contains observers in their order of
// registration. It may also contain unregistered
// observers. The liveObservers map should be consulted to
// check if the observer is registered or not.
orderedObservers []*obsImpl
} }
type instImpl struct { type instImpl struct {
delegate unsafe.Pointer // (*metric.InstrumentImpl)
name string name string
mkind metricKind mkind metricKind
nkind core.NumberKind nkind core.NumberKind
opts interface{} opts interface{}
}
delegate unsafe.Pointer // (*metric.InstrumentImpl) type obsImpl struct {
delegate unsafe.Pointer // (*metric.Int64Observer or *metric.Float64Observer)
name string
nkind core.NumberKind
opts []metric.ObserverOptionApplier
meter *meter
callback interface{}
}
type int64ObsImpl struct {
observer *obsImpl
}
type float64ObsImpl struct {
observer *obsImpl
}
// this is a common subset of the metric observers interfaces
type observerUnregister interface {
Unregister()
} }
type labelSet struct { type labelSet struct {
delegate unsafe.Pointer // (* metric.LabelSet)
meter *meter meter *meter
value []core.KeyValue value []core.KeyValue
initialize sync.Once initialize sync.Once
delegate unsafe.Pointer // (* metric.LabelSet)
} }
type instHandle struct { type instHandle struct {
delegate unsafe.Pointer // (*metric.HandleImpl)
inst *instImpl inst *instImpl
labels metric.LabelSet labels metric.LabelSet
initialize sync.Once initialize sync.Once
delegate unsafe.Pointer // (*metric.HandleImpl)
} }
var _ metric.Provider = &meterProvider{} var _ metric.Provider = &meterProvider{}
@ -86,6 +118,10 @@ var _ metric.LabelSet = &labelSet{}
var _ metric.LabelSetDelegate = &labelSet{} var _ metric.LabelSetDelegate = &labelSet{}
var _ metric.InstrumentImpl = &instImpl{} var _ metric.InstrumentImpl = &instImpl{}
var _ metric.BoundInstrumentImpl = &instHandle{} var _ metric.BoundInstrumentImpl = &instHandle{}
var _ metric.Int64Observer = int64ObsImpl{}
var _ metric.Float64Observer = float64ObsImpl{}
var _ observerUnregister = (metric.Int64Observer)(nil)
var _ observerUnregister = (metric.Float64Observer)(nil)
// Provider interface and delegation // Provider interface and delegation
@ -130,6 +166,13 @@ func (m *meter) setDelegate(provider metric.Provider) {
inst.setDelegate(*d) inst.setDelegate(*d)
} }
m.instruments = nil m.instruments = nil
for _, obs := range m.orderedObservers {
if _, ok := m.liveObservers[obs]; ok {
obs.setDelegate(*d)
}
}
m.liveObservers = nil
m.orderedObservers = nil
} }
func (m *meter) newInst(name string, mkind metricKind, nkind core.NumberKind, opts interface{}) metric.InstrumentImpl { func (m *meter) newInst(name string, mkind metricKind, nkind core.NumberKind, opts interface{}) metric.InstrumentImpl {
@ -203,6 +246,68 @@ func (bound *instHandle) Unbind() {
(*implPtr).Unbind() (*implPtr).Unbind()
} }
// Any Observer delegation
func (obs *obsImpl) setDelegate(d metric.Meter) {
if obs.nkind == core.Int64NumberKind {
obs.setInt64Delegate(d)
} else {
obs.setFloat64Delegate(d)
}
}
func (obs *obsImpl) unregister() {
unreg := obs.getUnregister()
if unreg != nil {
unreg.Unregister()
return
}
obs.meter.lock.Lock()
defer obs.meter.lock.Unlock()
delete(obs.meter.liveObservers, obs)
if len(obs.meter.liveObservers) == 0 {
obs.meter.liveObservers = nil
obs.meter.orderedObservers = nil
}
}
func (obs *obsImpl) getUnregister() observerUnregister {
ptr := atomic.LoadPointer(&obs.delegate)
if ptr == nil {
return nil
}
if obs.nkind == core.Int64NumberKind {
return *(*metric.Int64Observer)(ptr)
}
return *(*metric.Float64Observer)(ptr)
}
// Int64Observer delegation
func (obs *obsImpl) setInt64Delegate(d metric.Meter) {
obsPtr := new(metric.Int64Observer)
cb := obs.callback.(metric.Int64ObserverCallback)
*obsPtr = d.RegisterInt64Observer(obs.name, cb, obs.opts...)
atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr))
}
func (obs int64ObsImpl) Unregister() {
obs.observer.unregister()
}
// Float64Observer delegation
func (obs *obsImpl) setFloat64Delegate(d metric.Meter) {
obsPtr := new(metric.Float64Observer)
cb := obs.callback.(metric.Float64ObserverCallback)
*obsPtr = d.RegisterFloat64Observer(obs.name, cb, obs.opts...)
atomic.StorePointer(&obs.delegate, unsafe.Pointer(obsPtr))
}
func (obs float64ObsImpl) Unregister() {
obs.observer.unregister()
}
// Metric updates // Metric updates
func (m *meter) RecordBatch(ctx context.Context, labels metric.LabelSet, measurements ...metric.Measurement) { func (m *meter) RecordBatch(ctx context.Context, labels metric.LabelSet, measurements ...metric.Measurement) {
@ -296,3 +401,64 @@ func (m *meter) NewInt64Measure(name string, opts ...metric.MeasureOptionApplier
func (m *meter) NewFloat64Measure(name string, opts ...metric.MeasureOptionApplier) metric.Float64Measure { func (m *meter) NewFloat64Measure(name string, opts ...metric.MeasureOptionApplier) metric.Float64Measure {
return metric.WrapFloat64MeasureInstrument(m.newInst(name, measureKind, core.Float64NumberKind, opts)) return metric.WrapFloat64MeasureInstrument(m.newInst(name, measureKind, core.Float64NumberKind, opts))
} }
func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Int64Observer {
m.lock.Lock()
defer m.lock.Unlock()
if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).RegisterInt64Observer(name, callback, oos...)
}
obs := &obsImpl{
name: name,
nkind: core.Int64NumberKind,
opts: oos,
meter: m,
callback: callback,
}
m.addObserver(obs)
return int64ObsImpl{
observer: obs,
}
}
func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, oos ...metric.ObserverOptionApplier) metric.Float64Observer {
m.lock.Lock()
defer m.lock.Unlock()
if meterPtr := (*metric.Meter)(atomic.LoadPointer(&m.delegate)); meterPtr != nil {
return (*meterPtr).RegisterFloat64Observer(name, callback, oos...)
}
obs := &obsImpl{
name: name,
nkind: core.Float64NumberKind,
opts: oos,
meter: m,
callback: callback,
}
m.addObserver(obs)
return float64ObsImpl{
observer: obs,
}
}
func (m *meter) addObserver(obs *obsImpl) {
if m.liveObservers == nil {
m.liveObservers = make(map[*obsImpl]struct{})
}
m.liveObservers[obs] = struct{}{}
m.orderedObservers = append(m.orderedObservers, obs)
}
func AtomicFieldOffsets() map[string]uintptr {
return map[string]uintptr{
"meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate),
"meter.delegate": unsafe.Offsetof(meter{}.delegate),
"instImpl.delegate": unsafe.Offsetof(instImpl{}.delegate),
"obsImpl.delegate": unsafe.Offsetof(obsImpl{}.delegate),
"labelSet.delegate": unsafe.Offsetof(labelSet{}.delegate),
"instHandle.delegate": unsafe.Offsetof(instHandle{}.delegate),
}
}

@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/otel/api/global" "go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/global/internal" "go.opentelemetry.io/otel/api/global/internal"
"go.opentelemetry.io/otel/api/key" "go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/stdout" "go.opentelemetry.io/otel/exporters/metric/stdout"
metrictest "go.opentelemetry.io/otel/internal/metric" metrictest "go.opentelemetry.io/otel/internal/metric"
) )
@ -41,6 +42,16 @@ func TestDirect(t *testing.T) {
measure.Record(ctx, 1, labels1) measure.Record(ctx, 1, labels1)
measure.Record(ctx, 2, labels1) measure.Record(ctx, 2, labels1)
_ = meter1.RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) {
result.Observe(1., labels1)
result.Observe(2., labels2)
})
_ = meter1.RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) {
result.Observe(1, labels1)
result.Observe(2, labels2)
})
second := meter2.NewFloat64Measure("test.second") second := meter2.NewFloat64Measure("test.second")
second.Record(ctx, 1, labels3) second.Record(ctx, 1, labels3)
second.Record(ctx, 2, labels3) second.Record(ctx, 2, labels3)
@ -54,45 +65,86 @@ func TestDirect(t *testing.T) {
second.Record(ctx, 3, labels3) second.Record(ctx, 3, labels3)
mock := sdk.Meter("test1").(*metrictest.Meter) mock := sdk.Meter("test1").(*metrictest.Meter)
require.Equal(t, 3, len(mock.MeasurementBatches)) mock.RunObservers()
require.Len(t, mock.MeasurementBatches, 7)
require.Equal(t, map[core.Key]core.Value{ require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value, lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[0].LabelSet.Labels) }, mock.MeasurementBatches[0].LabelSet.Labels)
require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements)) require.Len(t, mock.MeasurementBatches[0].Measurements, 1)
require.Equal(t, core.NewInt64Number(1), require.Equal(t, int64(1),
mock.MeasurementBatches[0].Measurements[0].Number) mock.MeasurementBatches[0].Measurements[0].Number.AsInt64())
require.Equal(t, "test.counter", require.Equal(t, "test.counter",
mock.MeasurementBatches[0].Measurements[0].Instrument.Name) mock.MeasurementBatches[0].Measurements[0].Instrument.Name)
require.Equal(t, map[core.Key]core.Value{ require.Equal(t, map[core.Key]core.Value{
lvals2.Key: lvals2.Value, lvals2.Key: lvals2.Value,
}, mock.MeasurementBatches[1].LabelSet.Labels) }, mock.MeasurementBatches[1].LabelSet.Labels)
require.Equal(t, 1, len(mock.MeasurementBatches[1].Measurements)) require.Len(t, mock.MeasurementBatches[1].Measurements, 1)
require.Equal(t, core.NewInt64Number(3), require.Equal(t, int64(3),
mock.MeasurementBatches[1].Measurements[0].Number) mock.MeasurementBatches[1].Measurements[0].Number.AsInt64())
require.Equal(t, "test.gauge", require.Equal(t, "test.gauge",
mock.MeasurementBatches[1].Measurements[0].Instrument.Name) mock.MeasurementBatches[1].Measurements[0].Instrument.Name)
require.Equal(t, map[core.Key]core.Value{ require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value, lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[2].LabelSet.Labels) }, mock.MeasurementBatches[2].LabelSet.Labels)
require.Equal(t, 1, len(mock.MeasurementBatches[2].Measurements)) require.Len(t, mock.MeasurementBatches[2].Measurements, 1)
require.Equal(t, core.NewFloat64Number(3), require.InDelta(t, float64(3),
mock.MeasurementBatches[2].Measurements[0].Number) mock.MeasurementBatches[2].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.measure", require.Equal(t, "test.measure",
mock.MeasurementBatches[2].Measurements[0].Instrument.Name) mock.MeasurementBatches[2].Measurements[0].Instrument.Name)
require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[3].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[3].Measurements, 1)
require.InDelta(t, float64(1),
mock.MeasurementBatches[3].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.observer.float",
mock.MeasurementBatches[3].Measurements[0].Instrument.Name)
require.Equal(t, map[core.Key]core.Value{
lvals2.Key: lvals2.Value,
}, mock.MeasurementBatches[4].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[4].Measurements, 1)
require.InDelta(t, float64(2),
mock.MeasurementBatches[4].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.observer.float",
mock.MeasurementBatches[4].Measurements[0].Instrument.Name)
require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[5].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[5].Measurements, 1)
require.Equal(t, int64(1),
mock.MeasurementBatches[5].Measurements[0].Number.AsInt64())
require.Equal(t, "test.observer.int",
mock.MeasurementBatches[5].Measurements[0].Instrument.Name)
require.Equal(t, map[core.Key]core.Value{
lvals2.Key: lvals2.Value,
}, mock.MeasurementBatches[6].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[6].Measurements, 1)
require.Equal(t, int64(2),
mock.MeasurementBatches[6].Measurements[0].Number.AsInt64())
require.Equal(t, "test.observer.int",
mock.MeasurementBatches[6].Measurements[0].Instrument.Name)
// This tests the second Meter instance // This tests the second Meter instance
mock = sdk.Meter("test2").(*metrictest.Meter) mock = sdk.Meter("test2").(*metrictest.Meter)
require.Equal(t, 1, len(mock.MeasurementBatches)) require.Len(t, mock.MeasurementBatches, 1)
require.Equal(t, map[core.Key]core.Value{ require.Equal(t, map[core.Key]core.Value{
lvals3.Key: lvals3.Value, lvals3.Key: lvals3.Value,
}, mock.MeasurementBatches[0].LabelSet.Labels) }, mock.MeasurementBatches[0].LabelSet.Labels)
require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements)) require.Len(t, mock.MeasurementBatches[0].Measurements, 1)
require.Equal(t, core.NewFloat64Number(3), require.InDelta(t, float64(3),
mock.MeasurementBatches[0].Measurements[0].Number) mock.MeasurementBatches[0].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.second", require.Equal(t, "test.second",
mock.MeasurementBatches[0].Measurements[0].Instrument.Name) mock.MeasurementBatches[0].Measurements[0].Instrument.Name)
} }
@ -138,8 +190,9 @@ func TestBound(t *testing.T) {
lvals1.Key: lvals1.Value, lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[0].LabelSet.Labels) }, mock.MeasurementBatches[0].LabelSet.Labels)
require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements)) require.Equal(t, 1, len(mock.MeasurementBatches[0].Measurements))
require.Equal(t, core.NewFloat64Number(1), require.InDelta(t, float64(1),
mock.MeasurementBatches[0].Measurements[0].Number) mock.MeasurementBatches[0].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.counter", require.Equal(t, "test.counter",
mock.MeasurementBatches[0].Measurements[0].Instrument.Name) mock.MeasurementBatches[0].Measurements[0].Instrument.Name)
@ -147,8 +200,9 @@ func TestBound(t *testing.T) {
lvals2.Key: lvals2.Value, lvals2.Key: lvals2.Value,
}, mock.MeasurementBatches[1].LabelSet.Labels) }, mock.MeasurementBatches[1].LabelSet.Labels)
require.Equal(t, 1, len(mock.MeasurementBatches[1].Measurements)) require.Equal(t, 1, len(mock.MeasurementBatches[1].Measurements))
require.Equal(t, core.NewFloat64Number(3), require.InDelta(t, float64(3),
mock.MeasurementBatches[1].Measurements[0].Number) mock.MeasurementBatches[1].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.gauge", require.Equal(t, "test.gauge",
mock.MeasurementBatches[1].Measurements[0].Instrument.Name) mock.MeasurementBatches[1].Measurements[0].Instrument.Name)
@ -156,8 +210,8 @@ func TestBound(t *testing.T) {
lvals1.Key: lvals1.Value, lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[2].LabelSet.Labels) }, mock.MeasurementBatches[2].LabelSet.Labels)
require.Equal(t, 1, len(mock.MeasurementBatches[2].Measurements)) require.Equal(t, 1, len(mock.MeasurementBatches[2].Measurements))
require.Equal(t, core.NewInt64Number(3), require.Equal(t, int64(3),
mock.MeasurementBatches[2].Measurements[0].Number) mock.MeasurementBatches[2].Measurements[0].Number.AsInt64())
require.Equal(t, "test.measure", require.Equal(t, "test.measure",
mock.MeasurementBatches[2].Measurements[0].Instrument.Name) mock.MeasurementBatches[2].Measurements[0].Instrument.Name)
@ -185,9 +239,14 @@ func TestUnbind(t *testing.T) {
measure := glob.NewInt64Measure("test.measure") measure := glob.NewInt64Measure("test.measure")
boundM := measure.Bind(labels1) boundM := measure.Bind(labels1)
observerInt := glob.RegisterInt64Observer("test.observer.int", nil)
observerFloat := glob.RegisterFloat64Observer("test.observer.float", nil)
boundC.Unbind() boundC.Unbind()
boundG.Unbind() boundG.Unbind()
boundM.Unbind() boundM.Unbind()
observerInt.Unregister()
observerFloat.Unregister()
} }
func TestDefaultSDK(t *testing.T) { func TestDefaultSDK(t *testing.T) {

@ -81,6 +81,14 @@ type MeasureOptionApplier interface {
ApplyMeasureOption(*Options) ApplyMeasureOption(*Options)
} }
// ObserverOptionApplier is an interface for applying metric options
// that are valid only for observer metrics.
type ObserverOptionApplier interface {
// ApplyObserverOption is used to make some general or
// observer-specific changes in the Options.
ApplyObserverOption(*Options)
}
// Measurement is used for reporting a batch of metric // Measurement is used for reporting a batch of metric
// values. Instances of this type should be created by instruments // values. Instances of this type should be created by instruments
// (e.g., Int64Counter.Measurement()). // (e.g., Int64Counter.Measurement()).
@ -127,10 +135,51 @@ type Meter interface {
// a given name and customized with passed options. // a given name and customized with passed options.
NewFloat64Measure(name string, mos ...MeasureOptionApplier) Float64Measure NewFloat64Measure(name string, mos ...MeasureOptionApplier) Float64Measure
// RegisterInt64Observer creates a new integral observer with a
// given name, running a given callback, and customized with passed
// options. Callback can be nil.
RegisterInt64Observer(name string, callback Int64ObserverCallback, oos ...ObserverOptionApplier) Int64Observer
// RegisterFloat64Observer creates a new floating point observer
// with a given name, running a given callback, and customized with
// passed options. Callback can be nil.
RegisterFloat64Observer(name string, callback Float64ObserverCallback, oos ...ObserverOptionApplier) Float64Observer
// RecordBatch atomically records a batch of measurements. // RecordBatch atomically records a batch of measurements.
RecordBatch(context.Context, LabelSet, ...Measurement) RecordBatch(context.Context, LabelSet, ...Measurement)
} }
// Int64ObserverResult is an interface for reporting integral
// observations.
type Int64ObserverResult interface {
Observe(value int64, labels LabelSet)
}
// Float64ObserverResult is an interface for reporting floating point
// observations.
type Float64ObserverResult interface {
Observe(value float64, labels LabelSet)
}
// Int64ObserverCallback is a type of callback that integral
// observers run.
type Int64ObserverCallback func(result Int64ObserverResult)
// Float64ObserverCallback is a type of callback that floating point
// observers run.
type Float64ObserverCallback func(result Float64ObserverResult)
// Int64Observer is a metric that captures a set of int64 values at a
// point in time.
type Int64Observer interface {
Unregister()
}
// Float64Observer is a metric that captures a set of float64 values
// at a point in time.
type Float64Observer interface {
Unregister()
}
// Option supports specifying the various metric options. // Option supports specifying the various metric options.
type Option func(*Options) type Option func(*Options)
@ -140,16 +189,19 @@ type OptionApplier interface {
CounterOptionApplier CounterOptionApplier
GaugeOptionApplier GaugeOptionApplier
MeasureOptionApplier MeasureOptionApplier
ObserverOptionApplier
// ApplyOption is used to make some general changes in the // ApplyOption is used to make some general changes in the
// Options. // Options.
ApplyOption(*Options) ApplyOption(*Options)
} }
// CounterGaugeOptionApplier is an interface for applying metric // CounterGaugeObserverOptionApplier is an interface for applying
// options that are valid for counter or gauge metrics. // metric options that are valid for counter, gauge or observer
type CounterGaugeOptionApplier interface { // metrics.
type CounterGaugeObserverOptionApplier interface {
CounterOptionApplier CounterOptionApplier
GaugeOptionApplier GaugeOptionApplier
ObserverOptionApplier
} }
type optionWrapper struct { type optionWrapper struct {
@ -168,16 +220,22 @@ type measureOptionWrapper struct {
F Option F Option
} }
type counterGaugeOptionWrapper struct { type observerOptionWrapper struct {
F Option
}
type counterGaugeObserverOptionWrapper struct {
FC Option FC Option
FG Option FG Option
FO Option
} }
var ( var (
_ OptionApplier = optionWrapper{} _ OptionApplier = optionWrapper{}
_ CounterOptionApplier = counterOptionWrapper{} _ CounterOptionApplier = counterOptionWrapper{}
_ GaugeOptionApplier = gaugeOptionWrapper{} _ GaugeOptionApplier = gaugeOptionWrapper{}
_ MeasureOptionApplier = measureOptionWrapper{} _ MeasureOptionApplier = measureOptionWrapper{}
_ ObserverOptionApplier = observerOptionWrapper{}
) )
func (o optionWrapper) ApplyCounterOption(opts *Options) { func (o optionWrapper) ApplyCounterOption(opts *Options) {
@ -192,6 +250,10 @@ func (o optionWrapper) ApplyMeasureOption(opts *Options) {
o.ApplyOption(opts) o.ApplyOption(opts)
} }
func (o optionWrapper) ApplyObserverOption(opts *Options) {
o.ApplyOption(opts)
}
func (o optionWrapper) ApplyOption(opts *Options) { func (o optionWrapper) ApplyOption(opts *Options) {
o.F(opts) o.F(opts)
} }
@ -208,14 +270,22 @@ func (o measureOptionWrapper) ApplyMeasureOption(opts *Options) {
o.F(opts) o.F(opts)
} }
func (o counterGaugeOptionWrapper) ApplyCounterOption(opts *Options) { func (o counterGaugeObserverOptionWrapper) ApplyCounterOption(opts *Options) {
o.FC(opts) o.FC(opts)
} }
func (o counterGaugeOptionWrapper) ApplyGaugeOption(opts *Options) { func (o counterGaugeObserverOptionWrapper) ApplyGaugeOption(opts *Options) {
o.FG(opts) o.FG(opts)
} }
func (o counterGaugeObserverOptionWrapper) ApplyObserverOption(opts *Options) {
o.FO(opts)
}
func (o observerOptionWrapper) ApplyObserverOption(opts *Options) {
o.F(opts)
}
// WithDescription applies provided description. // WithDescription applies provided description.
func WithDescription(desc string) OptionApplier { func WithDescription(desc string) OptionApplier {
return optionWrapper{ return optionWrapper{
@ -244,16 +314,19 @@ func WithKeys(keys ...core.Key) OptionApplier {
} }
} }
// WithMonotonic sets whether a counter or a gauge is not permitted to // WithMonotonic sets whether a counter, a gauge or an observer is not
// go down. // permitted to go down.
func WithMonotonic(monotonic bool) CounterGaugeOptionApplier { func WithMonotonic(monotonic bool) CounterGaugeObserverOptionApplier {
return counterGaugeOptionWrapper{ return counterGaugeObserverOptionWrapper{
FC: func(opts *Options) { FC: func(opts *Options) {
opts.Alternate = !monotonic opts.Alternate = !monotonic
}, },
FG: func(opts *Options) { FG: func(opts *Options) {
opts.Alternate = monotonic opts.Alternate = monotonic
}, },
FO: func(opts *Options) {
opts.Alternate = monotonic
},
} }
} }

@ -26,6 +26,7 @@ import (
mock "go.opentelemetry.io/otel/internal/metric" mock "go.opentelemetry.io/otel/internal/metric"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
) )
func TestCounterOptions(t *testing.T) { func TestCounterOptions(t *testing.T) {
@ -361,6 +362,117 @@ func TestMeasureOptions(t *testing.T) {
} }
} }
func TestObserverOptions(t *testing.T) {
type testcase struct {
name string
opts []metric.ObserverOptionApplier
keys []core.Key
desc string
unit unit.Unit
alt bool
}
testcases := []testcase{
{
name: "no opts",
opts: nil,
keys: nil,
desc: "",
unit: "",
alt: false,
},
{
name: "keys keys keys",
opts: []metric.ObserverOptionApplier{
metric.WithKeys(key.New("foo"), key.New("foo2")),
metric.WithKeys(key.New("bar"), key.New("bar2")),
metric.WithKeys(key.New("baz"), key.New("baz2")),
},
keys: []core.Key{
key.New("foo"), key.New("foo2"),
key.New("bar"), key.New("bar2"),
key.New("baz"), key.New("baz2"),
},
desc: "",
unit: "",
alt: false,
},
{
name: "description",
opts: []metric.ObserverOptionApplier{
metric.WithDescription("stuff"),
},
keys: nil,
desc: "stuff",
unit: "",
alt: false,
},
{
name: "description override",
opts: []metric.ObserverOptionApplier{
metric.WithDescription("stuff"),
metric.WithDescription("things"),
},
keys: nil,
desc: "things",
unit: "",
alt: false,
},
{
name: "unit",
opts: []metric.ObserverOptionApplier{
metric.WithUnit("s"),
},
keys: nil,
desc: "",
unit: "s",
alt: false,
},
{
name: "unit override",
opts: []metric.ObserverOptionApplier{
metric.WithUnit("s"),
metric.WithUnit("h"),
},
keys: nil,
desc: "",
unit: "h",
alt: false,
},
{
name: "monotonic",
opts: []metric.ObserverOptionApplier{
metric.WithMonotonic(true),
},
keys: nil,
desc: "",
unit: "",
alt: true,
},
{
name: "monotonic, but not really",
opts: []metric.ObserverOptionApplier{
metric.WithMonotonic(true),
metric.WithMonotonic(false),
},
keys: nil,
desc: "",
unit: "",
alt: false,
},
}
for idx, tt := range testcases {
t.Logf("Testing observer case %s (%d)", tt.name, idx)
opts := &metric.Options{}
metric.ApplyObserverOptions(opts, tt.opts...)
checkOptions(t, opts, &metric.Options{
Description: tt.desc,
Unit: tt.unit,
Keys: tt.keys,
Alternate: tt.alt,
})
}
}
func checkOptions(t *testing.T, got *metric.Options, expected *metric.Options) { func checkOptions(t *testing.T, got *metric.Options, expected *metric.Options) {
if diff := cmp.Diff(got, expected); diff != "" { if diff := cmp.Diff(got, expected); diff != "" {
t.Errorf("Compare options: -got +want %s", diff) t.Errorf("Compare options: -got +want %s", diff)
@ -448,6 +560,29 @@ func TestMeasure(t *testing.T) {
} }
} }
func TestObserver(t *testing.T) {
{
meter := mock.NewMeter()
labels := meter.Labels()
o := meter.RegisterFloat64Observer("test.observer.float", func(result metric.Float64ObserverResult) {
result.Observe(42, labels)
})
t.Log("Testing float observer")
meter.RunObservers()
checkObserverBatch(t, labels, meter, core.Float64NumberKind, o)
}
{
meter := mock.NewMeter()
labels := meter.Labels()
o := meter.RegisterInt64Observer("test.observer.int", func(result metric.Int64ObserverResult) {
result.Observe(42, labels)
})
t.Log("Testing int observer")
meter.RunObservers()
checkObserverBatch(t, labels, meter, core.Int64NumberKind, o)
}
}
func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, meter *mock.Meter, kind core.NumberKind, instrument metric.InstrumentImpl) { func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, meter *mock.Meter, kind core.NumberKind, instrument metric.InstrumentImpl) {
t.Helper() t.Helper()
if len(meter.MeasurementBatches) != 3 { if len(meter.MeasurementBatches) != 3 {
@ -496,7 +631,31 @@ func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, met
} }
} }
func checkObserverBatch(t *testing.T, labels metric.LabelSet, meter *mock.Meter, kind core.NumberKind, observer interface{}) {
t.Helper()
assert.Len(t, meter.MeasurementBatches, 1)
if len(meter.MeasurementBatches) < 1 {
return
}
o := observer.(*mock.Observer)
if !assert.NotNil(t, o) {
return
}
ourLabelSet := labels.(*mock.LabelSet)
got := meter.MeasurementBatches[0]
assert.Equal(t, ourLabelSet, got.LabelSet)
assert.Len(t, got.Measurements, 1)
if len(got.Measurements) < 1 {
return
}
measurement := got.Measurements[0]
assert.Equal(t, o.Instrument, measurement.Instrument)
ft := fortyTwo(t, kind)
assert.Equal(t, 0, measurement.Number.CompareNumber(kind, ft))
}
func fortyTwo(t *testing.T, kind core.NumberKind) core.Number { func fortyTwo(t *testing.T, kind core.NumberKind) core.Number {
t.Helper()
switch kind { switch kind {
case core.Int64NumberKind: case core.Int64NumberKind:
return core.NewInt64Number(42) return core.NewInt64Number(42)

@ -13,21 +13,22 @@
// limitations under the License. // limitations under the License.
// metric package provides an API for reporting diagnostic // metric package provides an API for reporting diagnostic
// measurements using three basic kinds of instruments (or four, if // measurements using four basic kinds of instruments.
// calling one special case a separate one).
// //
// The three basic kinds are: // The four basic kinds are:
// //
// - counters // - counters
// - gauges // - gauges
// - measures // - measures
// - observers
// //
// All instruments report either float64 or int64 values. // All instruments report either float64 or int64 values.
// //
// The primary object that handles metrics is Meter. The // The primary object that handles metrics is Meter. Meter can be
// implementation of the Meter is provided by SDK. Normally, the Meter // obtained from Provider. The implementations of the Meter and
// is used directly only for the LabelSet generation, batch recording // Provider are provided by SDK. Normally, the Meter is used directly
// and the bound instrument destruction. // only for the instrument creation, LabelSet generation and batch
// recording.
// //
// LabelSet is a set of keys and values that are in a suitable, // LabelSet is a set of keys and values that are in a suitable,
// optimized form to be used by Meter. // optimized form to be used by Meter.
@ -60,11 +61,24 @@
// the New*Measure function - this allows reporting negative values // the New*Measure function - this allows reporting negative values
// too. To report a new value, use the Record function. // too. To report a new value, use the Record function.
// //
// All the basic kinds of instruments also support creating bound // Observers are instruments that are reporting a current state of a
// instruments for a potentially more efficient reporting. The bound // set of values. An example could be voltage or
// instruments have the same function names as the instruments (so a // temperature. Observers can be created with either
// Counter bound instrument has Add, a Gauge bound instrument has Set, // RegisterFloat64Observer or RegisterInt64Observer. Observers by
// and a Measure bound instrument has Record). Bound Instruments can // default have no limitations about reported values - they can be
// be created with the Bind function of the respective // less or greater than the last reported value. This can be changed
// instrument. When done with the bound instrument, call Unbind on it. // with the WithMonotonic option passed to the Register*Observer
// function - this permits the reported values only to go
// up. Reporting of the new values happens asynchronously, with the
// use of a callback passed to the Register*Observer function. The
// callback can report multiple values. To unregister the observer,
// call Unregister on it.
//
// Counters, gauges and measures support creating bound instruments
// for a potentially more efficient reporting. The bound instruments
// have the same function names as the instruments (so a Counter bound
// instrument has Add, a Gauge bound instrument has Set, and a Measure
// bound instrument has Record). Bound Instruments can be created
// with the Bind function of the respective instrument. When done with
// the bound instrument, call Unbind on it.
package metric // import "go.opentelemetry.io/otel/api/metric" package metric // import "go.opentelemetry.io/otel/api/metric"

@ -11,12 +11,16 @@ type NoopMeter struct{}
type noopBoundInstrument struct{} type noopBoundInstrument struct{}
type noopLabelSet struct{} type noopLabelSet struct{}
type noopInstrument struct{} type noopInstrument struct{}
type noopInt64Observer struct{}
type noopFloat64Observer struct{}
var _ Provider = NoopProvider{} var _ Provider = NoopProvider{}
var _ Meter = NoopMeter{} var _ Meter = NoopMeter{}
var _ InstrumentImpl = noopInstrument{} var _ InstrumentImpl = noopInstrument{}
var _ BoundInstrumentImpl = noopBoundInstrument{} var _ BoundInstrumentImpl = noopBoundInstrument{}
var _ LabelSet = noopLabelSet{} var _ LabelSet = noopLabelSet{}
var _ Int64Observer = noopInt64Observer{}
var _ Float64Observer = noopFloat64Observer{}
func (NoopProvider) Meter(name string) Meter { func (NoopProvider) Meter(name string) Meter {
return NoopMeter{} return NoopMeter{}
@ -39,6 +43,12 @@ func (noopInstrument) Meter() Meter {
return NoopMeter{} return NoopMeter{}
} }
func (noopInt64Observer) Unregister() {
}
func (noopFloat64Observer) Unregister() {
}
func (NoopMeter) Labels(...core.KeyValue) LabelSet { func (NoopMeter) Labels(...core.KeyValue) LabelSet {
return noopLabelSet{} return noopLabelSet{}
} }
@ -69,3 +79,11 @@ func (NoopMeter) NewFloat64Measure(name string, mos ...MeasureOptionApplier) Flo
func (NoopMeter) RecordBatch(context.Context, LabelSet, ...Measurement) { func (NoopMeter) RecordBatch(context.Context, LabelSet, ...Measurement) {
} }
func (NoopMeter) RegisterInt64Observer(name string, callback Int64ObserverCallback, oos ...ObserverOptionApplier) Int64Observer {
return noopInt64Observer{}
}
func (NoopMeter) RegisterFloat64Observer(name string, callback Float64ObserverCallback, oos ...ObserverOptionApplier) Float64Observer {
return noopFloat64Observer{}
}

@ -122,3 +122,11 @@ func ApplyMeasureOptions(opts *Options, mos ...MeasureOptionApplier) {
o.ApplyMeasureOption(opts) o.ApplyMeasureOption(opts)
} }
} }
// ApplyObserverOptions is a helper that applies all the observer
// options to passed opts.
func ApplyObserverOptions(opts *Options, mos ...ObserverOptionApplier) {
for _, o := range mos {
o.ApplyObserverOption(opts)
}
}

@ -54,6 +54,10 @@ type (
Meter struct { Meter struct {
MeasurementBatches []Batch MeasurementBatches []Batch
// Observers contains also unregistered
// observers. Check the Dead field of the Observer to
// figure out its status.
Observers []*Observer
} }
Kind int8 Kind int8
@ -63,21 +67,63 @@ type (
Number core.Number Number core.Number
Instrument *Instrument Instrument *Instrument
} }
observerResult struct {
instrument *Instrument
}
int64ObserverResult struct {
result observerResult
}
float64ObserverResult struct {
result observerResult
}
observerCallback func(observerResult)
Observer struct {
Instrument *Instrument
Meter *Meter
Dead bool
callback observerCallback
}
) )
var ( var (
_ apimetric.InstrumentImpl = &Instrument{} _ apimetric.InstrumentImpl = &Instrument{}
_ apimetric.BoundInstrumentImpl = &Handle{} _ apimetric.BoundInstrumentImpl = &Handle{}
_ apimetric.LabelSet = &LabelSet{} _ apimetric.LabelSet = &LabelSet{}
_ apimetric.Meter = &Meter{} _ apimetric.Meter = &Meter{}
_ apimetric.Int64Observer = &Observer{}
_ apimetric.Float64Observer = &Observer{}
_ apimetric.Int64ObserverResult = int64ObserverResult{}
_ apimetric.Float64ObserverResult = float64ObserverResult{}
) )
const ( const (
KindCounter Kind = iota KindCounter Kind = iota
KindGauge KindGauge
KindMeasure KindMeasure
KindObserver
) )
func (o *Observer) Unregister() {
o.Dead = true
}
func (r int64ObserverResult) Observe(value int64, labels apimetric.LabelSet) {
r.result.observe(core.NewInt64Number(value), labels)
}
func (r float64ObserverResult) Observe(value float64, labels apimetric.LabelSet) {
r.result.observe(core.NewFloat64Number(value), labels)
}
func (r observerResult) observe(number core.Number, labels apimetric.LabelSet) {
r.instrument.RecordOne(context.Background(), number, labels)
}
func (i *Instrument) Bind(labels apimetric.LabelSet) apimetric.BoundInstrumentImpl { func (i *Instrument) Bind(labels apimetric.LabelSet) apimetric.BoundInstrumentImpl {
if ld, ok := labels.(apimetric.LabelSetDelegate); ok { if ld, ok := labels.(apimetric.LabelSetDelegate); ok {
labels = ld.Delegate() labels = ld.Delegate()
@ -209,6 +255,58 @@ func (m *Meter) newMeasureInstrument(name string, numberKind core.NumberKind, mo
} }
} }
func (m *Meter) RegisterInt64Observer(name string, callback apimetric.Int64ObserverCallback, oos ...apimetric.ObserverOptionApplier) apimetric.Int64Observer {
wrappedCallback := wrapInt64ObserverCallback(callback)
return m.newObserver(name, wrappedCallback, core.Int64NumberKind, oos...)
}
func wrapInt64ObserverCallback(callback apimetric.Int64ObserverCallback) observerCallback {
if callback == nil {
return func(result observerResult) {}
}
return func(result observerResult) {
typeSafeResult := int64ObserverResult{
result: result,
}
callback(typeSafeResult)
}
}
func (m *Meter) RegisterFloat64Observer(name string, callback apimetric.Float64ObserverCallback, oos ...apimetric.ObserverOptionApplier) apimetric.Float64Observer {
wrappedCallback := wrapFloat64ObserverCallback(callback)
return m.newObserver(name, wrappedCallback, core.Float64NumberKind, oos...)
}
func wrapFloat64ObserverCallback(callback apimetric.Float64ObserverCallback) observerCallback {
if callback == nil {
return func(result observerResult) {}
}
return func(result observerResult) {
typeSafeResult := float64ObserverResult{
result: result,
}
callback(typeSafeResult)
}
}
func (m *Meter) newObserver(name string, callback observerCallback, numberKind core.NumberKind, oos ...apimetric.ObserverOptionApplier) *Observer {
opts := apimetric.Options{}
apimetric.ApplyObserverOptions(&opts, oos...)
obs := &Observer{
Instrument: &Instrument{
Name: name,
Kind: KindObserver,
NumberKind: numberKind,
Opts: opts,
},
Meter: m,
Dead: false,
callback: callback,
}
m.Observers = append(m.Observers, obs)
return obs
}
func (m *Meter) RecordBatch(ctx context.Context, labels apimetric.LabelSet, measurements ...apimetric.Measurement) { func (m *Meter) RecordBatch(ctx context.Context, labels apimetric.LabelSet, measurements ...apimetric.Measurement) {
ourLabelSet := labels.(*LabelSet) ourLabelSet := labels.(*LabelSet)
mm := make([]Measurement, len(measurements)) mm := make([]Measurement, len(measurements))
@ -229,3 +327,14 @@ func (m *Meter) recordMockBatch(ctx context.Context, labelSet *LabelSet, measure
Measurements: measurements, Measurements: measurements,
}) })
} }
func (m *Meter) RunObservers() {
for _, observer := range m.Observers {
if observer.Dead {
continue
}
observer.callback(observerResult{
instrument: observer.Instrument,
})
}
}

@ -11,11 +11,12 @@ func _() {
_ = x[CounterKind-0] _ = x[CounterKind-0]
_ = x[GaugeKind-1] _ = x[GaugeKind-1]
_ = x[MeasureKind-2] _ = x[MeasureKind-2]
_ = x[ObserverKind-3]
} }
const _Kind_name = "CounterKindGaugeKindMeasureKind" const _Kind_name = "CounterKindGaugeKindMeasureKindObserverKind"
var _Kind_index = [...]uint8{0, 11, 20, 31} var _Kind_index = [...]uint8{0, 11, 20, 31, 43}
func (i Kind) String() string { func (i Kind) String() string {
if i < 0 || i >= Kind(len(_Kind_index)-1) { if i < 0 || i >= Kind(len(_Kind_index)-1) {

@ -295,6 +295,9 @@ const (
// Measure kind indicates a measure instrument. // Measure kind indicates a measure instrument.
MeasureKind MeasureKind
// Observer kind indicates an observer instrument
ObserverKind
) )
// Descriptor describes a metric instrument to the exporter. // Descriptor describes a metric instrument to the exporter.

@ -52,6 +52,8 @@ func (*benchFixture) AggregatorFor(descriptor *export.Descriptor) export.Aggrega
return counter.New() return counter.New()
case export.GaugeKind: case export.GaugeKind:
return gauge.New() return gauge.New()
case export.ObserverKind:
fallthrough
case export.MeasureKind: case export.MeasureKind:
if strings.HasSuffix(descriptor.Name(), "minmaxsumcount") { if strings.HasSuffix(descriptor.Name(), "minmaxsumcount") {
return minmaxsumcount.New(descriptor) return minmaxsumcount.New(descriptor)
@ -357,6 +359,89 @@ func benchmarkFloat64MeasureHandleAdd(b *testing.B, name string) {
} }
} }
// Observers
func BenchmarkObserverRegistration(b *testing.B) {
fix := newFixture(b)
names := make([]string, 0, b.N)
for i := 0; i < b.N; i++ {
names = append(names, fmt.Sprintf("test.observer.%d", i))
}
cb := func(result metric.Int64ObserverResult) {}
b.ResetTimer()
for i := 0; i < b.N; i++ {
fix.sdk.RegisterInt64Observer(names[i], cb)
}
}
func BenchmarkObserverRegistrationUnregistration(b *testing.B) {
fix := newFixture(b)
names := make([]string, 0, b.N)
for i := 0; i < b.N; i++ {
names = append(names, fmt.Sprintf("test.observer.%d", i))
}
cb := func(result metric.Int64ObserverResult) {}
b.ResetTimer()
for i := 0; i < b.N; i++ {
fix.sdk.RegisterInt64Observer(names[i], cb).Unregister()
}
}
func BenchmarkObserverRegistrationUnregistrationBatched(b *testing.B) {
fix := newFixture(b)
names := make([]string, 0, b.N)
for i := 0; i < b.N; i++ {
names = append(names, fmt.Sprintf("test.observer.%d", i))
}
observers := make([]metric.Int64Observer, 0, b.N)
cb := func(result metric.Int64ObserverResult) {}
b.ResetTimer()
for i := 0; i < b.N; i++ {
observers = append(observers, fix.sdk.RegisterInt64Observer(names[i], cb))
}
for i := 0; i < b.N; i++ {
observers[i].Unregister()
}
}
func BenchmarkObserverObservationInt64(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
_ = fix.sdk.RegisterInt64Observer("test.observer", func(result metric.Int64ObserverResult) {
b.StartTimer()
defer b.StopTimer()
for i := 0; i < b.N; i++ {
result.Observe((int64)(i), labs)
}
})
b.StopTimer()
b.ResetTimer()
fix.sdk.Collect(ctx)
}
func BenchmarkObserverObservationFloat64(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
_ = fix.sdk.RegisterFloat64Observer("test.observer", func(result metric.Float64ObserverResult) {
b.StartTimer()
defer b.StopTimer()
for i := 0; i < b.N; i++ {
result.Observe((float64)(i), labs)
}
})
b.StopTimer()
b.ResetTimer()
fix.sdk.Collect(ctx)
}
// MaxSumCount // MaxSumCount
func BenchmarkInt64MaxSumCountAdd(b *testing.B) { func BenchmarkInt64MaxSumCountAdd(b *testing.B) {

@ -18,10 +18,10 @@ supports configurable metrics export behavior through a collection of
export interfaces that support various export strategies, described below. export interfaces that support various export strategies, described below.
The metric.Meter API consists of methods for constructing each of the The metric.Meter API consists of methods for constructing each of the
basic kinds of metric instrument. There are six types of instrument basic kinds of metric instrument. There are eight types of instrument
available to the end user, comprised of three basic kinds of metric available to the end user, comprised of four basic kinds of metric
instrument (Counter, Gauge, Measure) crossed with two kinds of number instrument (Counter, Gauge, Measure, Observer) crossed with two kinds
(int64, float64). of number (int64, float64).
The API assists the SDK by consolidating the variety of metric instruments The API assists the SDK by consolidating the variety of metric instruments
into a narrower interface, allowing the SDK to avoid repetition of into a narrower interface, allowing the SDK to avoid repetition of
@ -31,17 +31,25 @@ numerical value.
To this end, the API uses a core.Number type to represent either an int64 To this end, the API uses a core.Number type to represent either an int64
or a float64, depending on the instrument's definition. A single or a float64, depending on the instrument's definition. A single
implementation interface is used for instruments, metric.InstrumentImpl, implementation interface is used for counter, gauge and measure
and a single implementation interface is used for handles, instruments, metric.InstrumentImpl, and a single implementation interface
metric.HandleImpl. is used for their handles, metric.HandleImpl. For observers, the API
defines interfaces, for which the SDK provides an implementation.
There are three entry points for events in the Metrics API: via instrument There are four entry points for events in the Metrics API - three for
handles, via direct instrument calls, and via BatchRecord. The SDK is synchronous instruments (counters, gauges and measures) and one for
designed with handles as the primary entry point, the other two entry asynchronous instruments (observers). The entry points for synchronous
points are implemented in terms of short-lived handles. For example, the instruments are: via instrument handles, via direct instrument calls, and
implementation of a direct call allocates a handle, operates on the via BatchRecord. The SDK is designed with handles as the primary entry
handle, and releases the handle. Similarly, the implementation of point, the other two entry points are implemented in terms of short-lived
RecordBatch uses a short-lived handle for each measurement in the batch. handles. For example, the implementation of a direct call allocates a
handle, operates on the handle, and releases the handle. Similarly, the
implementation of RecordBatch uses a short-lived handle for each
measurement in the batch. The entry point for asynchronous instruments is
via observer callbacks. Observer callbacks behave like a set of instrument
handles - one for each observation for a distinct label set. The observer
handles are alive as long as they are used. If the callback stops
reporting values for a certain label set, the associated handle is dropped.
Internal Structure Internal Structure
@ -51,6 +59,10 @@ user-level code or a short-lived device, there exists an internal record
managed by the SDK. Each internal record corresponds to a specific managed by the SDK. Each internal record corresponds to a specific
instrument and label set combination. instrument and label set combination.
Each observer also has its own kind of record stored in the SDK. This
record contains a set of recorders for every specific label set used in the
callback.
A sync.Map maintains the mapping of current instruments and label sets to A sync.Map maintains the mapping of current instruments and label sets to
internal records. To create a new handle, the SDK consults the Map to internal records. To create a new handle, the SDK consults the Map to
locate an existing record, otherwise it constructs a new record. The SDK locate an existing record, otherwise it constructs a new record. The SDK
@ -61,31 +73,18 @@ from the user's perspective.
Metric collection is performed via a single-threaded call to Collect that Metric collection is performed via a single-threaded call to Collect that
sweeps through all records in the SDK, checkpointing their state. When a sweeps through all records in the SDK, checkpointing their state. When a
record is discovered that has no references and has not been updated since record is discovered that has no references and has not been updated since
the prior collection pass, it is marked for reclamation and removed from the prior collection pass, it is removed from the Map.
the Map. There exists, at this moment, a race condition since another
goroutine could, in the same instant, obtain a reference to the handle.
The SDK is designed to tolerate this sort of race condition, in the name
of reducing lock contention. It is possible for more than one record with
identical instrument and label set to exist simultaneously, though only
one can be linked from the Map at a time. To avoid lost updates, the SDK
maintains two additional linked lists of records, one managed by the
collection code path and one managed by the instrumentation code path.
The SDK maintains a current epoch number, corresponding to the number of The SDK maintains a current epoch number, corresponding to the number of
completed collections. Each record contains the last epoch during which completed collections. Each recorder of an observer record contains the
it was collected and updated. These variables allow the collection code last epoch during which it was updated. This variable allows the collection
path to detect stale records while allowing the instrumentation code path code path to detect stale recorders and remove them.
to detect potential reclamations. When the instrumentation code path
detects a potential reclamation, it adds itself to the second linked list,
where records are saved from reclamation.
Each record has an associated aggregator, which maintains the current Each record of a handle and recorder of an observer has an associated
state resulting from all metric events since its last checkpoint. aggregator, which maintains the current state resulting from all metric
Aggregators may be lock-free or they may use locking, but they should events since its last checkpoint. Aggregators may be lock-free or they may
expect to be called concurrently. Because of the tolerated race condition use locking, but they should expect to be called concurrently. Aggregators
described above, aggregators must be capable of merging with another must be capable of merging with another aggregator of the same type.
aggregator of the same type.
Export Pipeline Export Pipeline

@ -43,6 +43,9 @@ type (
// current maps `mapkey` to *record. // current maps `mapkey` to *record.
current sync.Map current sync.Map
// observers is a set of `*observer` instances
observers sync.Map
// empty is the (singleton) result of Labels() // empty is the (singleton) result of Labels()
// w/ zero arguments. // w/ zero arguments.
empty labels empty labels
@ -115,16 +118,121 @@ type (
recorder export.Aggregator recorder export.Aggregator
} }
observerResult struct {
observer *observer
}
int64ObserverResult struct {
result observerResult
}
float64ObserverResult struct {
result observerResult
}
observerCallback func(result observerResult)
observer struct {
meter *SDK
descriptor *export.Descriptor
// recorders maps encoded labelset to the pair of
// labelset and recorder
recorders map[string]labeledRecorder
callback observerCallback
}
labeledRecorder struct {
recorder export.Aggregator
labels *labels
modifiedEpoch int64
}
int64Observer struct {
observer *observer
}
float64Observer struct {
observer *observer
}
ErrorHandler func(error) ErrorHandler func(error)
) )
var ( var (
_ api.Meter = &SDK{} _ api.Meter = &SDK{}
_ api.LabelSet = &labels{} _ api.LabelSet = &labels{}
_ api.InstrumentImpl = &instrument{} _ api.InstrumentImpl = &instrument{}
_ api.BoundInstrumentImpl = &record{} _ api.BoundInstrumentImpl = &record{}
_ api.Int64Observer = int64Observer{}
_ api.Float64Observer = float64Observer{}
_ api.Int64ObserverResult = int64ObserverResult{}
_ api.Float64ObserverResult = float64ObserverResult{}
) )
func (r observerResult) observe(number core.Number, ls api.LabelSet) {
r.observer.recordOne(number, ls)
}
func (o *observer) recordOne(number core.Number, ls api.LabelSet) {
if err := aggregator.RangeTest(number, o.descriptor); err != nil {
o.meter.errorHandler(err)
return
}
recorder := o.getRecorder(ls)
if recorder == nil {
// The instrument is disabled according to the
// AggregationSelector.
return
}
if err := recorder.Update(context.Background(), number, o.descriptor); err != nil {
o.meter.errorHandler(err)
return
}
}
func (o *observer) getRecorder(ls api.LabelSet) export.Aggregator {
labels := o.meter.labsFor(ls)
lrec, ok := o.recorders[labels.encoded]
if ok {
lrec.modifiedEpoch = o.meter.currentEpoch
o.recorders[labels.encoded] = lrec
return lrec.recorder
}
rec := o.meter.batcher.AggregatorFor(o.descriptor)
if o.recorders == nil {
o.recorders = make(map[string]labeledRecorder)
}
// This may store nil recorder in the map, thus disabling the
// observer for the labelset for good. This is intentional,
// but will be revisited later.
o.recorders[labels.encoded] = labeledRecorder{
recorder: rec,
labels: labels,
modifiedEpoch: o.meter.currentEpoch,
}
return rec
}
func (o *observer) unregister() {
o.meter.observers.Delete(o)
}
func (r int64ObserverResult) Observe(value int64, labels api.LabelSet) {
r.result.observe(core.NewInt64Number(value), labels)
}
func (r float64ObserverResult) Observe(value float64, labels api.LabelSet) {
r.result.observe(core.NewFloat64Number(value), labels)
}
func (o int64Observer) Unregister() {
o.observer.unregister()
}
func (o float64Observer) Unregister() {
o.observer.unregister()
}
func (i *instrument) Meter() api.Meter { func (i *instrument) Meter() api.Meter {
return i.meter return i.meter
} }
@ -275,8 +383,8 @@ func (m *SDK) labsFor(ls api.LabelSet) *labels {
return &m.empty return &m.empty
} }
func (m *SDK) newInstrument(name string, metricKind export.Kind, numberKind core.NumberKind, opts *api.Options) *instrument { func newDescriptor(name string, metricKind export.Kind, numberKind core.NumberKind, opts *api.Options) *export.Descriptor {
descriptor := export.NewDescriptor( return export.NewDescriptor(
name, name,
metricKind, metricKind,
opts.Keys, opts.Keys,
@ -284,6 +392,10 @@ func (m *SDK) newInstrument(name string, metricKind export.Kind, numberKind core
opts.Unit, opts.Unit,
numberKind, numberKind,
opts.Alternate) opts.Alternate)
}
func (m *SDK) newInstrument(name string, metricKind export.Kind, numberKind core.NumberKind, opts *api.Options) *instrument {
descriptor := newDescriptor(name, metricKind, numberKind, opts)
return &instrument{ return &instrument{
descriptor: descriptor, descriptor: descriptor,
meter: m, meter: m,
@ -332,8 +444,66 @@ func (m *SDK) NewFloat64Measure(name string, mos ...api.MeasureOptionApplier) ap
return api.WrapFloat64MeasureInstrument(m.newMeasureInstrument(name, core.Float64NumberKind, mos...)) return api.WrapFloat64MeasureInstrument(m.newMeasureInstrument(name, core.Float64NumberKind, mos...))
} }
// Collect traverses the list of active records and exports data for func (m *SDK) RegisterInt64Observer(name string, callback api.Int64ObserverCallback, oos ...api.ObserverOptionApplier) api.Int64Observer {
// each active instrument. Collect() may not be called concurrently. if callback == nil {
return api.NoopMeter{}.RegisterInt64Observer("", nil)
}
opts := api.Options{}
api.ApplyObserverOptions(&opts, oos...)
descriptor := newDescriptor(name, export.ObserverKind, core.Int64NumberKind, &opts)
cb := wrapInt64ObserverCallback(callback)
obs := m.newObserver(descriptor, cb)
return int64Observer{
observer: obs,
}
}
func wrapInt64ObserverCallback(callback api.Int64ObserverCallback) observerCallback {
return func(result observerResult) {
typeSafeResult := int64ObserverResult{
result: result,
}
callback(typeSafeResult)
}
}
func (m *SDK) RegisterFloat64Observer(name string, callback api.Float64ObserverCallback, oos ...api.ObserverOptionApplier) api.Float64Observer {
if callback == nil {
return api.NoopMeter{}.RegisterFloat64Observer("", nil)
}
opts := api.Options{}
api.ApplyObserverOptions(&opts, oos...)
descriptor := newDescriptor(name, export.ObserverKind, core.Float64NumberKind, &opts)
cb := wrapFloat64ObserverCallback(callback)
obs := m.newObserver(descriptor, cb)
return float64Observer{
observer: obs,
}
}
func wrapFloat64ObserverCallback(callback api.Float64ObserverCallback) observerCallback {
return func(result observerResult) {
typeSafeResult := float64ObserverResult{
result: result,
}
callback(typeSafeResult)
}
}
func (m *SDK) newObserver(descriptor *export.Descriptor, callback observerCallback) *observer {
obs := &observer{
meter: m,
descriptor: descriptor,
recorders: nil,
callback: callback,
}
m.observers.Store(obs, nil)
return obs
}
// Collect traverses the list of active records and observers and
// exports data for each active instrument. Collect() may not be
// called concurrently.
// //
// During the collection pass, the export.Batcher will receive // During the collection pass, the export.Batcher will receive
// one Export() call per current aggregation. // one Export() call per current aggregation.
@ -343,6 +513,13 @@ func (m *SDK) Collect(ctx context.Context) int {
m.collectLock.Lock() m.collectLock.Lock()
defer m.collectLock.Unlock() defer m.collectLock.Unlock()
checkpointed := m.collectRecords(ctx)
checkpointed += m.collectObservers(ctx)
m.currentEpoch++
return checkpointed
}
func (m *SDK) collectRecords(ctx context.Context) int {
checkpointed := 0 checkpointed := 0
m.current.Range(func(key interface{}, value interface{}) bool { m.current.Range(func(key interface{}, value interface{}) bool {
@ -358,25 +535,66 @@ func (m *SDK) Collect(ctx context.Context) int {
// TODO: Reconsider this logic. // TODO: Reconsider this logic.
if inuse.refMapped.inUse() || atomic.LoadInt64(&inuse.modified) != 0 { if inuse.refMapped.inUse() || atomic.LoadInt64(&inuse.modified) != 0 {
atomic.StoreInt64(&inuse.modified, 0) atomic.StoreInt64(&inuse.modified, 0)
checkpointed += m.checkpoint(ctx, inuse) checkpointed += m.checkpointRecord(ctx, inuse)
} }
// Always continue to iterate over the entire map. // Always continue to iterate over the entire map.
return true return true
}) })
m.currentEpoch++
return checkpointed return checkpointed
} }
func (m *SDK) checkpoint(ctx context.Context, r *record) int { func (m *SDK) collectObservers(ctx context.Context) int {
if r.recorder == nil { checkpointed := 0
m.observers.Range(func(key, value interface{}) bool {
obs := key.(*observer)
result := observerResult{
observer: obs,
}
obs.callback(result)
checkpointed += m.checkpointObserver(ctx, obs)
return true
})
return checkpointed
}
func (m *SDK) checkpointRecord(ctx context.Context, r *record) int {
return m.checkpoint(ctx, r.descriptor, r.recorder, r.labels)
}
func (m *SDK) checkpointObserver(ctx context.Context, obs *observer) int {
if len(obs.recorders) == 0 {
return 0 return 0
} }
r.recorder.Checkpoint(ctx, r.descriptor) checkpointed := 0
labels := export.NewLabels(r.labels.sorted, r.labels.encoded, m.labelEncoder) for encodedLabels, lrec := range obs.recorders {
err := m.batcher.Process(ctx, export.NewRecord(r.descriptor, labels, r.recorder)) epochDiff := m.currentEpoch - lrec.modifiedEpoch
if epochDiff == 0 {
checkpointed += m.checkpoint(ctx, obs.descriptor, lrec.recorder, lrec.labels)
} else if epochDiff > 1 {
// This is second collection cycle with no
// observations for this labelset. Remove the
// recorder.
delete(obs.recorders, encodedLabels)
}
}
if len(obs.recorders) == 0 {
obs.recorders = nil
}
return checkpointed
}
func (m *SDK) checkpoint(ctx context.Context, descriptor *export.Descriptor, recorder export.Aggregator, labels *labels) int {
if recorder == nil {
return 0
}
recorder.Checkpoint(ctx, descriptor)
exportLabels := export.NewLabels(labels.sorted, labels.encoded, m.labelEncoder)
exportRecord := export.NewRecord(descriptor, exportLabels, recorder)
err := m.batcher.Process(ctx, exportRecord)
if err != nil { if err != nil {
m.errorHandler(err) m.errorHandler(err)
} }

@ -38,19 +38,19 @@ var (
) )
// NewWithInexpensiveMeasure returns a simple aggregation selector // NewWithInexpensiveMeasure returns a simple aggregation selector
// that uses counter, gauge, and minmaxsumcount aggregators for the three // that uses counter, gauge, and minmaxsumcount aggregators for the
// kinds of metric. This selector is faster and uses less memory than // four kinds of metric. This selector is faster and uses less memory
// the others because minmaxsumcount does not aggregate quantile // than the others because minmaxsumcount does not aggregate quantile
// information. // information.
func NewWithInexpensiveMeasure() export.AggregationSelector { func NewWithInexpensiveMeasure() export.AggregationSelector {
return selectorInexpensive{} return selectorInexpensive{}
} }
// NewWithSketchMeasure returns a simple aggregation selector that // NewWithSketchMeasure returns a simple aggregation selector that
// uses counter, gauge, and ddsketch aggregators for the three kinds // uses counter, gauge, and ddsketch aggregators for the four kinds of
// of metric. This selector uses more cpu and memory than the // metric. This selector uses more cpu and memory than the
// NewWithInexpensiveMeasure because it uses one DDSketch per distinct // NewWithInexpensiveMeasure because it uses one DDSketch per distinct
// measure and labelset. // measure/observer and labelset.
func NewWithSketchMeasure(config *ddsketch.Config) export.AggregationSelector { func NewWithSketchMeasure(config *ddsketch.Config) export.AggregationSelector {
return selectorSketch{ return selectorSketch{
config: config, config: config,
@ -58,7 +58,7 @@ func NewWithSketchMeasure(config *ddsketch.Config) export.AggregationSelector {
} }
// NewWithExactMeasure returns a simple aggregation selector that uses // NewWithExactMeasure returns a simple aggregation selector that uses
// counter, gauge, and array behavior for the three kinds of metric. // counter, gauge, and array aggregators for the four kinds of metric.
// This selector uses more memory than the NewWithSketchMeasure // This selector uses more memory than the NewWithSketchMeasure
// because it aggregates an array of all values, therefore is able to // because it aggregates an array of all values, therefore is able to
// compute exact quantiles. // compute exact quantiles.
@ -70,6 +70,8 @@ func (selectorInexpensive) AggregatorFor(descriptor *export.Descriptor) export.A
switch descriptor.MetricKind() { switch descriptor.MetricKind() {
case export.GaugeKind: case export.GaugeKind:
return gauge.New() return gauge.New()
case export.ObserverKind:
fallthrough
case export.MeasureKind: case export.MeasureKind:
return minmaxsumcount.New(descriptor) return minmaxsumcount.New(descriptor)
default: default:
@ -81,6 +83,8 @@ func (s selectorSketch) AggregatorFor(descriptor *export.Descriptor) export.Aggr
switch descriptor.MetricKind() { switch descriptor.MetricKind() {
case export.GaugeKind: case export.GaugeKind:
return gauge.New() return gauge.New()
case export.ObserverKind:
fallthrough
case export.MeasureKind: case export.MeasureKind:
return ddsketch.New(s.config, descriptor) return ddsketch.New(s.config, descriptor)
default: default:
@ -92,6 +96,8 @@ func (selectorExact) AggregatorFor(descriptor *export.Descriptor) export.Aggrega
switch descriptor.MetricKind() { switch descriptor.MetricKind() {
case export.GaugeKind: case export.GaugeKind:
return gauge.New() return gauge.New()
case export.ObserverKind:
fallthrough
case export.MeasureKind: case export.MeasureKind:
return array.New() return array.New()
default: default:

@ -30,9 +30,10 @@ import (
) )
var ( var (
testGaugeDesc = export.NewDescriptor("gauge", export.GaugeKind, nil, "", "", core.Int64NumberKind, false) testGaugeDesc = export.NewDescriptor("gauge", export.GaugeKind, nil, "", "", core.Int64NumberKind, false)
testCounterDesc = export.NewDescriptor("counter", export.CounterKind, nil, "", "", core.Int64NumberKind, false) testCounterDesc = export.NewDescriptor("counter", export.CounterKind, nil, "", "", core.Int64NumberKind, false)
testMeasureDesc = export.NewDescriptor("measure", export.MeasureKind, nil, "", "", core.Int64NumberKind, false) testMeasureDesc = export.NewDescriptor("measure", export.MeasureKind, nil, "", "", core.Int64NumberKind, false)
testObserverDesc = export.NewDescriptor("observer", export.ObserverKind, nil, "", "", core.Int64NumberKind, false)
) )
func TestInexpensiveMeasure(t *testing.T) { func TestInexpensiveMeasure(t *testing.T) {
@ -40,6 +41,7 @@ func TestInexpensiveMeasure(t *testing.T) {
require.NotPanics(t, func() { _ = inex.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) }) require.NotPanics(t, func() { _ = inex.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) })
require.NotPanics(t, func() { _ = inex.AggregatorFor(testCounterDesc).(*counter.Aggregator) }) require.NotPanics(t, func() { _ = inex.AggregatorFor(testCounterDesc).(*counter.Aggregator) })
require.NotPanics(t, func() { _ = inex.AggregatorFor(testMeasureDesc).(*minmaxsumcount.Aggregator) }) require.NotPanics(t, func() { _ = inex.AggregatorFor(testMeasureDesc).(*minmaxsumcount.Aggregator) })
require.NotPanics(t, func() { _ = inex.AggregatorFor(testObserverDesc).(*minmaxsumcount.Aggregator) })
} }
func TestSketchMeasure(t *testing.T) { func TestSketchMeasure(t *testing.T) {
@ -47,6 +49,7 @@ func TestSketchMeasure(t *testing.T) {
require.NotPanics(t, func() { _ = sk.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) }) require.NotPanics(t, func() { _ = sk.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) })
require.NotPanics(t, func() { _ = sk.AggregatorFor(testCounterDesc).(*counter.Aggregator) }) require.NotPanics(t, func() { _ = sk.AggregatorFor(testCounterDesc).(*counter.Aggregator) })
require.NotPanics(t, func() { _ = sk.AggregatorFor(testMeasureDesc).(*ddsketch.Aggregator) }) require.NotPanics(t, func() { _ = sk.AggregatorFor(testMeasureDesc).(*ddsketch.Aggregator) })
require.NotPanics(t, func() { _ = sk.AggregatorFor(testObserverDesc).(*ddsketch.Aggregator) })
} }
func TestExactMeasure(t *testing.T) { func TestExactMeasure(t *testing.T) {
@ -54,4 +57,5 @@ func TestExactMeasure(t *testing.T) {
require.NotPanics(t, func() { _ = ex.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) }) require.NotPanics(t, func() { _ = ex.AggregatorFor(testGaugeDesc).(*gauge.Aggregator) })
require.NotPanics(t, func() { _ = ex.AggregatorFor(testCounterDesc).(*counter.Aggregator) }) require.NotPanics(t, func() { _ = ex.AggregatorFor(testCounterDesc).(*counter.Aggregator) })
require.NotPanics(t, func() { _ = ex.AggregatorFor(testMeasureDesc).(*array.Aggregator) }) require.NotPanics(t, func() { _ = ex.AggregatorFor(testMeasureDesc).(*array.Aggregator) })
require.NotPanics(t, func() { _ = ex.AggregatorFor(testObserverDesc).(*array.Aggregator) })
} }