1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

Batch Observer callback support (#717)

* api/metric changes from jmacd:jmacd/batch_obs_2

* Add an SDK test

* Use a single collector method

* Two fixes

* Comments; embed AsyncRunner

* Comments

* Comment fix

* More comments

* Renaming for clarity

* Renaming for clarity (fix)

* Lint
This commit is contained in:
Joshua MacDonald
2020-05-13 16:27:52 -07:00
committed by GitHub
parent 587cde3352
commit fefdf59a0b
11 changed files with 612 additions and 88 deletions
+54
View File
@@ -324,6 +324,60 @@ func TestObserverCollection(t *testing.T) {
}, out.Map)
}
func TestObserverBatch(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
t: t,
}
sdk := metricsdk.NewAccumulator(integrator)
meter := metric.WrapMeterImpl(sdk, "test")
var floatObs metric.Float64Observer
var intObs metric.Int64Observer
var batch = Must(meter).NewBatchObserver(
func(result metric.BatchObserverResult) {
result.Observe(
[]kv.KeyValue{
kv.String("A", "B"),
},
floatObs.Observation(1),
floatObs.Observation(-1),
intObs.Observation(-1),
intObs.Observation(1),
)
result.Observe(
[]kv.KeyValue{
kv.String("C", "D"),
},
floatObs.Observation(-1),
)
result.Observe(
nil,
intObs.Observation(1),
intObs.Observation(1),
)
})
floatObs = batch.RegisterFloat64Observer("float.observer")
intObs = batch.RegisterInt64Observer("int.observer")
collected := sdk.Collect(ctx)
require.Equal(t, 4, collected)
require.Equal(t, 4, len(integrator.records))
out := batchTest.NewOutput(label.DefaultEncoder())
for _, rec := range integrator.records {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"float.observer/A=B": -1,
"float.observer/C=D": -1,
"int.observer/": 1,
"int.observer/A=B": 1,
}, out.Map)
}
func TestRecordBatch(t *testing.T) {
ctx := context.Background()
integrator := &correctnessIntegrator{
+49 -30
View File
@@ -26,6 +26,7 @@ import (
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
api "go.opentelemetry.io/otel/api/metric"
internal "go.opentelemetry.io/otel/internal/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
@@ -46,7 +47,9 @@ type (
// asyncInstruments is a set of
// `*asyncInstrument` instances
asyncInstruments sync.Map
asyncLock sync.Mutex
asyncInstruments *internal.AsyncInstrumentState
asyncContext context.Context
// currentEpoch is the current epoch number. It is
// incremented in `Collect()`.
@@ -129,8 +132,6 @@ type (
// recorders maps ordered labels to the pair of
// labelset and recorder
recorders map[label.Distinct]*labeledRecorder
callback func(func(api.Number, []kv.KeyValue))
}
labeledRecorder struct {
@@ -161,7 +162,7 @@ func (s *syncInstrument) Implementation() interface{} {
return s
}
func (a *asyncInstrument) observe(number api.Number, labels []kv.KeyValue) {
func (a *asyncInstrument) observe(number api.Number, labels *label.Set) {
if err := aggregator.RangeTest(number, &a.descriptor); err != nil {
a.meter.errorHandler(err)
return
@@ -178,12 +179,7 @@ func (a *asyncInstrument) observe(number api.Number, labels []kv.KeyValue) {
}
}
func (a *asyncInstrument) getRecorder(kvs []kv.KeyValue) export.Aggregator {
// We are in a single-threaded context. Note: this assumption
// could be violated if the user added concurrency within
// their callback.
labels := label.NewSetWithSortable(kvs, &a.meter.asyncSortSlice)
func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator {
lrec, ok := a.recorders[labels.Equivalent()]
if ok {
if lrec.observedEpoch == a.meter.currentEpoch {
@@ -205,7 +201,7 @@ func (a *asyncInstrument) getRecorder(kvs []kv.KeyValue) export.Aggregator {
// but will be revisited later.
a.recorders[labels.Equivalent()] = &labeledRecorder{
recorder: rec,
labels: &labels,
labels: labels,
observedEpoch: a.meter.currentEpoch,
}
return rec
@@ -318,15 +314,19 @@ func NewAccumulator(integrator export.Integrator, opts ...Option) *Accumulator {
}
return &Accumulator{
integrator: integrator,
errorHandler: c.ErrorHandler,
integrator: integrator,
errorHandler: c.ErrorHandler,
asyncInstruments: internal.NewAsyncInstrumentState(c.ErrorHandler),
}
}
// DefaultErrorHandler is used when the user does not configure an
// error handler. Prints messages to os.Stderr.
func DefaultErrorHandler(err error) {
fmt.Fprintln(os.Stderr, "Metrics Accumulator error:", err)
}
// NewSyncInstrument implements api.MetricImpl.
func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) {
return &syncInstrument{
instrument: instrument{
@@ -336,15 +336,17 @@ func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl
}, nil
}
func (m *Accumulator) NewAsyncInstrument(descriptor api.Descriptor, callback func(func(api.Number, []kv.KeyValue))) (api.AsyncImpl, error) {
// NewAsyncInstrument implements api.MetricImpl.
func (m *Accumulator) NewAsyncInstrument(descriptor api.Descriptor, runner metric.AsyncRunner) (api.AsyncImpl, error) {
a := &asyncInstrument{
instrument: instrument{
descriptor: descriptor,
meter: m,
},
callback: callback,
}
m.asyncInstruments.Store(a, nil)
m.asyncLock.Lock()
defer m.asyncLock.Unlock()
m.asyncInstruments.Register(a, runner)
return a, nil
}
@@ -360,13 +362,13 @@ func (m *Accumulator) Collect(ctx context.Context) int {
m.collectLock.Lock()
defer m.collectLock.Unlock()
checkpointed := m.collectRecords(ctx)
checkpointed += m.collectAsync(ctx)
checkpointed := m.collectSyncInstruments(ctx)
checkpointed += m.observeAsyncInstruments(ctx)
m.currentEpoch++
return checkpointed
}
func (m *Accumulator) collectRecords(ctx context.Context) int {
func (m *Accumulator) collectSyncInstruments(ctx context.Context) int {
checkpointed := 0
m.current.Range(func(key interface{}, value interface{}) bool {
@@ -409,24 +411,39 @@ func (m *Accumulator) collectRecords(ctx context.Context) int {
return checkpointed
}
func (m *Accumulator) collectAsync(ctx context.Context) int {
checkpointed := 0
// CollectAsync implements internal.AsyncCollector.
func (m *Accumulator) CollectAsync(kv []kv.KeyValue, obs ...metric.Observation) {
labels := label.NewSetWithSortable(kv, &m.asyncSortSlice)
m.asyncInstruments.Range(func(key, value interface{}) bool {
a := key.(*asyncInstrument)
a.callback(a.observe)
checkpointed += m.checkpointAsync(ctx, a)
return true
})
for _, ob := range obs {
a := ob.AsyncImpl().Implementation().(*asyncInstrument)
a.observe(ob.Number(), &labels)
}
}
return checkpointed
func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int {
m.asyncLock.Lock()
defer m.asyncLock.Unlock()
asyncCollected := 0
m.asyncContext = ctx
m.asyncInstruments.Run(m)
m.asyncContext = nil
for _, inst := range m.asyncInstruments.Instruments() {
a := inst.Implementation().(*asyncInstrument)
asyncCollected += m.checkpointAsync(a)
}
return asyncCollected
}
func (m *Accumulator) checkpointRecord(ctx context.Context, r *record) int {
return m.checkpoint(ctx, &r.inst.descriptor, r.recorder, r.labels)
}
func (m *Accumulator) checkpointAsync(ctx context.Context, a *asyncInstrument) int {
func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
if len(a.recorders) == 0 {
return 0
}
@@ -435,7 +452,7 @@ func (m *Accumulator) checkpointAsync(ctx context.Context, a *asyncInstrument) i
lrec := lrec
epochDiff := m.currentEpoch - lrec.observedEpoch
if epochDiff == 0 {
checkpointed += m.checkpoint(ctx, &a.descriptor, lrec.recorder, lrec.labels)
checkpointed += m.checkpoint(m.asyncContext, &a.descriptor, lrec.recorder, lrec.labels)
} else if epochDiff > 1 {
// This is second collection cycle with no
// observations for this labelset. Remove the
@@ -485,6 +502,7 @@ func (m *Accumulator) RecordBatch(ctx context.Context, kvs []kv.KeyValue, measur
}
}
// RecordOne implements api.SyncImpl.
func (r *record) RecordOne(ctx context.Context, number api.Number) {
if r.recorder == nil {
// The instrument is disabled according to the AggregationSelector.
@@ -503,6 +521,7 @@ func (r *record) RecordOne(ctx context.Context, number api.Number) {
atomic.AddInt64(&r.updateCount, 1)
}
// Unbind implements api.SyncImpl.
func (r *record) Unbind() {
r.refMapped.unref()
}