From 1fab21ddbf29b4aa9729db035bb49718b2a6364c Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 19 May 2020 21:33:10 -0700 Subject: [PATCH] Support use of synchronous instruments in async callbacks (#725) * Support use of synchronous instruments in async callbacks * Add a test --- api/global/internal/meter_test.go | 6 ++-- api/global/internal/registry_test.go | 5 ++-- api/metric/api_test.go | 12 ++++---- api/metric/async.go | 28 +++++++++-------- api/metric/registry/registry_test.go | 5 ++-- example/basic/main.go | 2 +- example/prometheus/main.go | 2 +- exporters/otlp/otlp_integration_test.go | 4 +-- internal/metric/async.go | 7 +++-- internal/metric/mock.go | 2 +- sdk/metric/benchmark_test.go | 6 ++-- sdk/metric/correct_test.go | 40 ++++++++++++++++++++----- sdk/metric/sdk.go | 6 ++-- 13 files changed, 78 insertions(+), 47 deletions(-) diff --git a/api/global/internal/meter_test.go b/api/global/internal/meter_test.go index 5d9188c96..95438259c 100644 --- a/api/global/internal/meter_test.go +++ b/api/global/internal/meter_test.go @@ -86,12 +86,12 @@ func TestDirect(t *testing.T) { valuerecorder.Record(ctx, 1, labels1...) valuerecorder.Record(ctx, 2, labels1...) - _ = Must(meter1).RegisterFloat64ValueObserver("test.valueobserver.float", func(result metric.Float64ObserverResult) { + _ = Must(meter1).RegisterFloat64ValueObserver("test.valueobserver.float", func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(1., labels1...) result.Observe(2., labels2...) }) - _ = Must(meter1).RegisterInt64ValueObserver("test.valueobserver.int", func(result metric.Int64ObserverResult) { + _ = Must(meter1).RegisterInt64ValueObserver("test.valueobserver.int", func(_ context.Context, result metric.Int64ObserverResult) { result.Observe(1, labels1...) result.Observe(2, labels2...) }) @@ -333,7 +333,7 @@ func TestImplementationIndirection(t *testing.T) { // Async: no SDK yet valueobserver := Must(meter1).RegisterFloat64ValueObserver( "interface.valueobserver", - func(result metric.Float64ObserverResult) {}, + func(_ context.Context, result metric.Float64ObserverResult) {}, ) ival = valueobserver.AsyncImpl().Implementation() diff --git a/api/global/internal/registry_test.go b/api/global/internal/registry_test.go index 76144bf5b..a37ec22fd 100644 --- a/api/global/internal/registry_test.go +++ b/api/global/internal/registry_test.go @@ -15,6 +15,7 @@ package internal import ( + "context" "errors" "testing" @@ -43,10 +44,10 @@ var ( return unwrap(MeterProvider().Meter(libraryName).NewFloat64ValueRecorder(name)) }, "valueobserver.int64": func(name, libraryName string) (metric.InstrumentImpl, error) { - return unwrap(MeterProvider().Meter(libraryName).RegisterInt64ValueObserver(name, func(metric.Int64ObserverResult) {})) + return unwrap(MeterProvider().Meter(libraryName).RegisterInt64ValueObserver(name, func(context.Context, metric.Int64ObserverResult) {})) }, "valueobserver.float64": func(name, libraryName string) (metric.InstrumentImpl, error) { - return unwrap(MeterProvider().Meter(libraryName).RegisterFloat64ValueObserver(name, func(metric.Float64ObserverResult) {})) + return unwrap(MeterProvider().Meter(libraryName).RegisterFloat64ValueObserver(name, func(context.Context, metric.Float64ObserverResult) {})) }, } ) diff --git a/api/metric/api_test.go b/api/metric/api_test.go index 0650269f1..369baa661 100644 --- a/api/metric/api_test.go +++ b/api/metric/api_test.go @@ -183,7 +183,7 @@ func TestObserverInstruments(t *testing.T) { t.Run("float valueobserver", func(t *testing.T) { labels := []kv.KeyValue{kv.String("O", "P")} mockSDK, meter := mockTest.NewMeter() - o := Must(meter).RegisterFloat64ValueObserver("test.valueobserver.float", func(result metric.Float64ObserverResult) { + o := Must(meter).RegisterFloat64ValueObserver("test.valueobserver.float", func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(42.1, labels...) }) mockSDK.RunAsyncInstruments() @@ -194,7 +194,7 @@ func TestObserverInstruments(t *testing.T) { t.Run("int valueobserver", func(t *testing.T) { labels := []kv.KeyValue{} mockSDK, meter := mockTest.NewMeter() - o := Must(meter).RegisterInt64ValueObserver("test.observer.int", func(result metric.Int64ObserverResult) { + o := Must(meter).RegisterInt64ValueObserver("test.observer.int", func(_ context.Context, result metric.Int64ObserverResult) { result.Observe(-142, labels...) }) mockSDK.RunAsyncInstruments() @@ -205,7 +205,7 @@ func TestObserverInstruments(t *testing.T) { t.Run("float sumobserver", func(t *testing.T) { labels := []kv.KeyValue{kv.String("O", "P")} mockSDK, meter := mockTest.NewMeter() - o := Must(meter).RegisterFloat64SumObserver("test.sumobserver.float", func(result metric.Float64ObserverResult) { + o := Must(meter).RegisterFloat64SumObserver("test.sumobserver.float", func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(42.1, labels...) }) mockSDK.RunAsyncInstruments() @@ -216,7 +216,7 @@ func TestObserverInstruments(t *testing.T) { t.Run("int sumobserver", func(t *testing.T) { labels := []kv.KeyValue{} mockSDK, meter := mockTest.NewMeter() - o := Must(meter).RegisterInt64SumObserver("test.observer.int", func(result metric.Int64ObserverResult) { + o := Must(meter).RegisterInt64SumObserver("test.observer.int", func(_ context.Context, result metric.Int64ObserverResult) { result.Observe(-142, labels...) }) mockSDK.RunAsyncInstruments() @@ -280,7 +280,7 @@ func TestBatchObserverInstruments(t *testing.T) { } cb := Must(meter).NewBatchObserver( - func(result metric.BatchObserverResult) { + func(_ context.Context, result metric.BatchObserverResult) { result.Observe(labels, obs1.Observation(42), obs2.Observation(42.0), @@ -372,7 +372,7 @@ func TestWrappedInstrumentError(t *testing.T) { require.Equal(t, err, metric.ErrSDKReturnedNilImpl) require.NotNil(t, valuerecorder.SyncImpl()) - observer, err := meter.RegisterInt64ValueObserver("test.observer", func(result metric.Int64ObserverResult) {}) + observer, err := meter.RegisterInt64ValueObserver("test.observer", func(_ context.Context, result metric.Int64ObserverResult) {}) require.NotNil(t, err) require.NotNil(t, observer.AsyncImpl()) diff --git a/api/metric/async.go b/api/metric/async.go index e54f0cf0d..c82fdc409 100644 --- a/api/metric/async.go +++ b/api/metric/async.go @@ -14,7 +14,11 @@ package metric -import "go.opentelemetry.io/otel/api/kv" +import ( + "context" + + "go.opentelemetry.io/otel/api/kv" +) // The file is organized as follows: // @@ -38,16 +42,16 @@ type Observation struct { // Int64ObserverCallback is a type of callback that integral // observers run. -type Int64ObserverCallback func(Int64ObserverResult) +type Int64ObserverCallback func(context.Context, Int64ObserverResult) // Float64ObserverCallback is a type of callback that floating point // observers run. -type Float64ObserverCallback func(Float64ObserverResult) +type Float64ObserverCallback func(context.Context, Float64ObserverResult) // BatchObserverCallback is a callback argument for use with any // Observer instrument that will be reported as a batch of // observations. -type BatchObserverCallback func(BatchObserverResult) +type BatchObserverCallback func(context.Context, BatchObserverResult) // Int64ObserverResult is passed to an observer callback to capture // observations for one asynchronous integer metric instrument. @@ -110,7 +114,7 @@ type AsyncSingleRunner interface { // receives one captured observation. (The function accepts // multiple observations so the same implementation can be // used for batch runners.) - Run(single AsyncImpl, capture func([]kv.KeyValue, ...Observation)) + Run(ctx context.Context, single AsyncImpl, capture func([]kv.KeyValue, ...Observation)) AsyncRunner } @@ -120,7 +124,7 @@ type AsyncSingleRunner interface { type AsyncBatchRunner interface { // Run accepts a function for capturing observations of // multiple instruments. - Run(capture func([]kv.KeyValue, ...Observation)) + Run(ctx context.Context, capture func([]kv.KeyValue, ...Observation)) AsyncRunner } @@ -154,24 +158,24 @@ func (*Float64ObserverCallback) AnyRunner() {} func (*BatchObserverCallback) AnyRunner() {} // Run implements AsyncSingleRunner. -func (i *Int64ObserverCallback) Run(impl AsyncImpl, function func([]kv.KeyValue, ...Observation)) { - (*i)(Int64ObserverResult{ +func (i *Int64ObserverCallback) Run(ctx context.Context, impl AsyncImpl, function func([]kv.KeyValue, ...Observation)) { + (*i)(ctx, Int64ObserverResult{ instrument: impl, function: function, }) } // Run implements AsyncSingleRunner. -func (f *Float64ObserverCallback) Run(impl AsyncImpl, function func([]kv.KeyValue, ...Observation)) { - (*f)(Float64ObserverResult{ +func (f *Float64ObserverCallback) Run(ctx context.Context, impl AsyncImpl, function func([]kv.KeyValue, ...Observation)) { + (*f)(ctx, Float64ObserverResult{ instrument: impl, function: function, }) } // Run implements AsyncBatchRunner. -func (b *BatchObserverCallback) Run(function func([]kv.KeyValue, ...Observation)) { - (*b)(BatchObserverResult{ +func (b *BatchObserverCallback) Run(ctx context.Context, function func([]kv.KeyValue, ...Observation)) { + (*b)(ctx, BatchObserverResult{ function: function, }) } diff --git a/api/metric/registry/registry_test.go b/api/metric/registry/registry_test.go index 4f5c10a33..e80e23f39 100644 --- a/api/metric/registry/registry_test.go +++ b/api/metric/registry/registry_test.go @@ -15,6 +15,7 @@ package registry_test import ( + "context" "errors" "testing" @@ -44,10 +45,10 @@ var ( return unwrap(m.NewFloat64ValueRecorder(name)) }, "valueobserver.int64": func(m metric.Meter, name string) (metric.InstrumentImpl, error) { - return unwrap(m.RegisterInt64ValueObserver(name, func(metric.Int64ObserverResult) {})) + return unwrap(m.RegisterInt64ValueObserver(name, func(context.Context, metric.Int64ObserverResult) {})) }, "valueobserver.float64": func(m metric.Meter, name string) (metric.InstrumentImpl, error) { - return unwrap(m.RegisterFloat64ValueObserver(name, func(metric.Float64ObserverResult) {})) + return unwrap(m.RegisterFloat64ValueObserver(name, func(context.Context, metric.Float64ObserverResult) {})) }, } ) diff --git a/example/basic/main.go b/example/basic/main.go index 04c4f8e49..fee62818a 100644 --- a/example/basic/main.go +++ b/example/basic/main.go @@ -73,7 +73,7 @@ func main() { commonLabels := []kv.KeyValue{lemonsKey.Int(10), kv.String("A", "1"), kv.String("B", "2"), kv.String("C", "3")} - oneMetricCB := func(result metric.Float64ObserverResult) { + oneMetricCB := func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(1, commonLabels...) } _ = metric.Must(meter).RegisterFloat64ValueObserver("ex.com.one", oneMetricCB, diff --git a/example/prometheus/main.go b/example/prometheus/main.go index 378eb98e6..117a2b13f 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -52,7 +52,7 @@ func main() { observerLock := new(sync.RWMutex) observerValueToReport := new(float64) observerLabelsToReport := new([]kv.KeyValue) - cb := func(result metric.Float64ObserverResult) { + cb := func(_ context.Context, result metric.Float64ObserverResult) { (*observerLock).RLock() value := *observerValueToReport labels := *observerLabelsToReport diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index e4f61fdb6..04d17dbd8 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -155,12 +155,12 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) switch data.nKind { case metricapi.Int64NumberKind: callback := func(v int64) metricapi.Int64ObserverCallback { - return metricapi.Int64ObserverCallback(func(result metricapi.Int64ObserverResult) { result.Observe(v, labels...) }) + return metricapi.Int64ObserverCallback(func(_ context.Context, result metricapi.Int64ObserverResult) { result.Observe(v, labels...) }) }(data.val) metricapi.Must(meter).RegisterInt64ValueObserver(name, callback) case metricapi.Float64NumberKind: callback := func(v float64) metricapi.Float64ObserverCallback { - return metricapi.Float64ObserverCallback(func(result metricapi.Float64ObserverResult) { result.Observe(v, labels...) }) + return metricapi.Float64ObserverCallback(func(_ context.Context, result metricapi.Float64ObserverResult) { result.Observe(v, labels...) }) }(float64(data.val)) metricapi.Must(meter).RegisterFloat64ValueObserver(name, callback) default: diff --git a/internal/metric/async.go b/internal/metric/async.go index 07b7e01df..1be4ea361 100644 --- a/internal/metric/async.go +++ b/internal/metric/async.go @@ -15,6 +15,7 @@ package metric import ( + "context" "errors" "fmt" "os" @@ -133,7 +134,7 @@ func (a *AsyncInstrumentState) Register(inst metric.AsyncImpl, runner metric.Asy } // Run executes the complete set of observer callbacks. -func (a *AsyncInstrumentState) Run(collector AsyncCollector) { +func (a *AsyncInstrumentState) Run(ctx context.Context, collector AsyncCollector) { a.lock.Lock() runners := a.runners a.lock.Unlock() @@ -144,12 +145,12 @@ func (a *AsyncInstrumentState) Run(collector AsyncCollector) { // interface has un-exported methods. if singleRunner, ok := rp.runner.(metric.AsyncSingleRunner); ok { - singleRunner.Run(rp.inst, collector.CollectAsync) + singleRunner.Run(ctx, rp.inst, collector.CollectAsync) continue } if multiRunner, ok := rp.runner.(metric.AsyncBatchRunner); ok { - multiRunner.Run(collector.CollectAsync) + multiRunner.Run(ctx, collector.CollectAsync) continue } diff --git a/internal/metric/mock.go b/internal/metric/mock.go index 985ea7fc0..320d83053 100644 --- a/internal/metric/mock.go +++ b/internal/metric/mock.go @@ -187,5 +187,5 @@ func (m *MeterImpl) collect(ctx context.Context, labels []kv.KeyValue, measureme } func (m *MeterImpl) RunAsyncInstruments() { - m.asyncInstruments.Run(m) + m.asyncInstruments.Run(context.Background(), m) } diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 3a6b9888d..f4a6b315f 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -425,7 +425,7 @@ func BenchmarkObserverRegistration(b *testing.B) { for i := 0; i < b.N; i++ { names = append(names, fmt.Sprintf("test.valueobserver.%d", i)) } - cb := func(result metric.Int64ObserverResult) {} + cb := func(_ context.Context, result metric.Int64ObserverResult) {} b.ResetTimer() @@ -438,7 +438,7 @@ func BenchmarkValueObserverObservationInt64(b *testing.B) { ctx := context.Background() fix := newFixture(b) labs := makeLabels(1) - _ = fix.meter.RegisterInt64ValueObserver("test.valueobserver", func(result metric.Int64ObserverResult) { + _ = fix.meter.RegisterInt64ValueObserver("test.valueobserver", func(_ context.Context, result metric.Int64ObserverResult) { for i := 0; i < b.N; i++ { result.Observe((int64)(i), labs...) } @@ -453,7 +453,7 @@ func BenchmarkValueObserverObservationFloat64(b *testing.B) { ctx := context.Background() fix := newFixture(b) labs := makeLabels(1) - _ = fix.meter.RegisterFloat64ValueObserver("test.valueobserver", func(result metric.Float64ObserverResult) { + _ = fix.meter.RegisterFloat64ValueObserver("test.valueobserver", func(_ context.Context, result metric.Float64ObserverResult) { for i := 0; i < b.N; i++ { result.Observe((float64)(i), labs...) } diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 8eccc0fa0..d2c8a173d 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -320,13 +320,13 @@ func TestObserverCollection(t *testing.T) { ctx := context.Background() meter, sdk, integrator := newSDK(t) - _ = Must(meter).RegisterFloat64ValueObserver("float.valueobserver", func(result metric.Float64ObserverResult) { + _ = Must(meter).RegisterFloat64ValueObserver("float.valueobserver", func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(1, kv.String("A", "B")) // last value wins result.Observe(-1, kv.String("A", "B")) result.Observe(-1, kv.String("C", "D")) }) - _ = Must(meter).RegisterInt64ValueObserver("int.valueobserver", func(result metric.Int64ObserverResult) { + _ = Must(meter).RegisterInt64ValueObserver("int.valueobserver", func(_ context.Context, result metric.Int64ObserverResult) { result.Observe(-1, kv.String("A", "B")) result.Observe(1) // last value wins @@ -334,12 +334,12 @@ func TestObserverCollection(t *testing.T) { result.Observe(1) }) - _ = Must(meter).RegisterFloat64SumObserver("float.sumobserver", func(result metric.Float64ObserverResult) { + _ = Must(meter).RegisterFloat64SumObserver("float.sumobserver", func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(1, kv.String("A", "B")) result.Observe(2, kv.String("A", "B")) result.Observe(1, kv.String("C", "D")) }) - _ = Must(meter).RegisterInt64SumObserver("int.sumobserver", func(result metric.Int64ObserverResult) { + _ = Must(meter).RegisterInt64SumObserver("int.sumobserver", func(_ context.Context, result metric.Int64ObserverResult) { result.Observe(2, kv.String("A", "B")) result.Observe(1) // last value wins @@ -347,7 +347,7 @@ func TestObserverCollection(t *testing.T) { result.Observe(1) }) - _ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(result metric.Int64ObserverResult) { + _ = Must(meter).RegisterInt64ValueObserver("empty.valueobserver", func(_ context.Context, result metric.Int64ObserverResult) { }) collected := sdk.Collect(ctx) @@ -375,13 +375,13 @@ func TestSumObserverInputRange(t *testing.T) { ctx := context.Background() meter, sdk, integrator := newSDK(t) - _ = Must(meter).RegisterFloat64SumObserver("float.sumobserver", func(result metric.Float64ObserverResult) { + _ = Must(meter).RegisterFloat64SumObserver("float.sumobserver", func(_ context.Context, result metric.Float64ObserverResult) { result.Observe(-2, kv.String("A", "B")) require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr()) result.Observe(-1, kv.String("C", "D")) require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr()) }) - _ = Must(meter).RegisterInt64SumObserver("int.sumobserver", func(result metric.Int64ObserverResult) { + _ = Must(meter).RegisterInt64SumObserver("int.sumobserver", func(_ context.Context, result metric.Int64ObserverResult) { result.Observe(-1, kv.String("A", "B")) require.Equal(t, aggregator.ErrNegativeInput, integrator.sdkErr()) result.Observe(-1) @@ -407,7 +407,7 @@ func TestObserverBatch(t *testing.T) { var intSumObs metric.Int64SumObserver var batch = Must(meter).NewBatchObserver( - func(result metric.BatchObserverResult) { + func(_ context.Context, result metric.BatchObserverResult) { result.Observe( []kv.KeyValue{ kv.String("A", "B"), @@ -514,3 +514,27 @@ func TestRecordPersistence(t *testing.T) { require.Equal(t, int64(2), integrator.newAggCount) } + +func TestSyncInAsync(t *testing.T) { + ctx := context.Background() + meter, sdk, integrator := newSDK(t) + + counter := Must(meter).NewFloat64Counter("counter") + _ = Must(meter).RegisterInt64ValueObserver("observer", + func(ctx context.Context, result metric.Int64ObserverResult) { + result.Observe(10) + counter.Add(ctx, 100) + }, + ) + + sdk.Collect(ctx) + + out := batchTest.NewOutput(label.DefaultEncoder()) + for _, rec := range integrator.records { + _ = out.AddTo(rec) + } + require.EqualValues(t, map[string]float64{ + "counter//R=V": 100, + "observer//R=V": 10, + }, out.Map) +} diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index f3939a41d..ff0f3853c 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -367,8 +367,8 @@ func (m *Accumulator) Collect(ctx context.Context) int { m.collectLock.Lock() defer m.collectLock.Unlock() - checkpointed := m.collectSyncInstruments(ctx) - checkpointed += m.observeAsyncInstruments(ctx) + checkpointed := m.observeAsyncInstruments(ctx) + checkpointed += m.collectSyncInstruments(ctx) m.currentEpoch++ return checkpointed @@ -434,7 +434,7 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int { asyncCollected := 0 m.asyncContext = ctx - m.asyncInstruments.Run(m) + m.asyncInstruments.Run(context.Background(), m) m.asyncContext = nil for _, inst := range m.asyncInstruments.Instruments() {