1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-02-01 13:07:51 +02:00

Unique instrument checking (#580)

* Add skeleton uniqueness checker

* Fix the build w/ new code in place

* Add sync tests

* More test

* Implement global uniqueness checking

* Set the library name

* Ensure ordered global initialization

* Use proper require statement for errors

* Comment

* Apply feedback fixes

* Comment and rename from feedback
This commit is contained in:
Joshua MacDonald 2020-03-24 10:54:08 -07:00 committed by GitHub
parent dff6265dc5
commit 06f833e2ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 651 additions and 178 deletions

View File

@ -49,8 +49,9 @@ func newFixture(b *testing.B) *benchFixture {
bf := &benchFixture{
B: b,
}
bf.sdk = sdk.New(bf)
bf.meter = metric.WrapMeterImpl(bf.sdk)
bf.meter = metric.WrapMeterImpl(bf.sdk, "test")
return bf
}

View File

@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
)
// This file contains the forwarding implementation of metric.Provider
@ -44,12 +45,15 @@ import (
// Bound instrument operations are implemented by delegating to the
// instrument after it is registered, with a sync.Once initializer to
// protect against races with Release().
//
// Metric uniqueness checking is implemented by calling the exported
// methods of the api/metric/registry package.
type meterProvider struct {
delegate metric.Provider
lock sync.Mutex
meters []*meter
meters map[string]*meter
}
type meter struct {
@ -59,8 +63,9 @@ type meter struct {
name string
lock sync.Mutex
registry map[string]metric.InstrumentImpl
syncInsts []*syncImpl
asyncInsts []*obsImpl
asyncInsts []*asyncImpl
}
type instrument struct {
@ -75,7 +80,7 @@ type syncImpl struct {
constructor func(metric.Meter) (metric.SyncImpl, error)
}
type obsImpl struct {
type asyncImpl struct {
delegate unsafe.Pointer // (*metric.AsyncImpl)
instrument
@ -119,7 +124,7 @@ var _ metric.LabelSet = &labelSet{}
var _ metric.LabelSetDelegate = &labelSet{}
var _ metric.InstrumentImpl = &syncImpl{}
var _ metric.BoundSyncImpl = &syncHandle{}
var _ metric.AsyncImpl = &obsImpl{}
var _ metric.AsyncImpl = &asyncImpl{}
func (inst *instrument) Descriptor() metric.Descriptor {
return inst.descriptor
@ -127,6 +132,12 @@ func (inst *instrument) Descriptor() metric.Descriptor {
// Provider interface and delegation
func newMeterProvider() *meterProvider {
return &meterProvider{
meters: map[string]*meter{},
}
}
func (p *meterProvider) setDelegate(provider metric.Provider) {
p.lock.Lock()
defer p.lock.Unlock()
@ -146,11 +157,18 @@ func (p *meterProvider) Meter(name string) metric.Meter {
return p.delegate.Meter(name)
}
m := &meter{
provider: p,
name: name,
if exm, ok := p.meters[name]; ok {
return exm
}
p.meters = append(p.meters, m)
m := &meter{
provider: p,
name: name,
registry: map[string]metric.InstrumentImpl{},
syncInsts: []*syncImpl{},
asyncInsts: []*asyncImpl{},
}
p.meters[name] = m
return m
}
@ -182,6 +200,13 @@ func (m *meter) newSync(desc metric.Descriptor, constructor func(metric.Meter) (
return constructor(*meterPtr)
}
if ex, ok := m.registry[desc.Name()]; ok {
if !registry.Compatible(desc, ex.Descriptor()) {
return nil, registry.NewMetricKindMismatchError(ex.Descriptor())
}
return ex.(metric.SyncImpl), nil
}
inst := &syncImpl{
instrument: instrument{
descriptor: desc,
@ -189,6 +214,7 @@ func (m *meter) newSync(desc metric.Descriptor, constructor func(metric.Meter) (
constructor: constructor,
}
m.syncInsts = append(m.syncInsts, inst)
m.registry[desc.Name()] = inst
return inst, nil
}
@ -260,17 +286,25 @@ func (m *meter) newAsync(desc metric.Descriptor, constructor func(metric.Meter)
return constructor(*meterPtr)
}
inst := &obsImpl{
if ex, ok := m.registry[desc.Name()]; ok {
if !registry.Compatible(desc, ex.Descriptor()) {
return nil, registry.NewMetricKindMismatchError(ex.Descriptor())
}
return ex.(metric.AsyncImpl), nil
}
inst := &asyncImpl{
instrument: instrument{
descriptor: desc,
},
constructor: constructor,
}
m.asyncInsts = append(m.asyncInsts, inst)
m.registry[desc.Name()] = inst
return inst, nil
}
func (obs *obsImpl) Implementation() interface{} {
func (obs *asyncImpl) Implementation() interface{} {
if implPtr := (*metric.AsyncImpl)(atomic.LoadPointer(&obs.delegate)); implPtr != nil {
return (*implPtr).Implementation()
}
@ -287,7 +321,7 @@ func asyncCheck(has AsyncImpler, err error) (metric.AsyncImpl, error) {
return nil, err
}
func (obs *obsImpl) setDelegate(d metric.Meter) {
func (obs *asyncImpl) setDelegate(d metric.Meter) {
implPtr := new(metric.AsyncImpl)
var err error
@ -374,9 +408,13 @@ func (labels *labelSet) Delegate() metric.LabelSet {
// Constructors
func (m *meter) withName(opts []metric.Option) []metric.Option {
return append(opts, metric.WithLibraryName(m.name))
}
func (m *meter) NewInt64Counter(name string, opts ...metric.Option) (metric.Int64Counter, error) {
return metric.WrapInt64CounterInstrument(m.newSync(
metric.NewDescriptor(name, metric.CounterKind, core.Int64NumberKind, opts...),
metric.NewDescriptor(name, metric.CounterKind, core.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewInt64Counter(name, opts...))
}))
@ -384,7 +422,7 @@ func (m *meter) NewInt64Counter(name string, opts ...metric.Option) (metric.Int6
func (m *meter) NewFloat64Counter(name string, opts ...metric.Option) (metric.Float64Counter, error) {
return metric.WrapFloat64CounterInstrument(m.newSync(
metric.NewDescriptor(name, metric.CounterKind, core.Float64NumberKind, opts...),
metric.NewDescriptor(name, metric.CounterKind, core.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewFloat64Counter(name, opts...))
}))
@ -392,7 +430,7 @@ func (m *meter) NewFloat64Counter(name string, opts ...metric.Option) (metric.Fl
func (m *meter) NewInt64Measure(name string, opts ...metric.Option) (metric.Int64Measure, error) {
return metric.WrapInt64MeasureInstrument(m.newSync(
metric.NewDescriptor(name, metric.MeasureKind, core.Int64NumberKind, opts...),
metric.NewDescriptor(name, metric.MeasureKind, core.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewInt64Measure(name, opts...))
}))
@ -400,7 +438,7 @@ func (m *meter) NewInt64Measure(name string, opts ...metric.Option) (metric.Int6
func (m *meter) NewFloat64Measure(name string, opts ...metric.Option) (metric.Float64Measure, error) {
return metric.WrapFloat64MeasureInstrument(m.newSync(
metric.NewDescriptor(name, metric.MeasureKind, core.Float64NumberKind, opts...),
metric.NewDescriptor(name, metric.MeasureKind, core.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.SyncImpl, error) {
return syncCheck(other.NewFloat64Measure(name, opts...))
}))
@ -408,7 +446,7 @@ func (m *meter) NewFloat64Measure(name string, opts ...metric.Option) (metric.Fl
func (m *meter) RegisterInt64Observer(name string, callback metric.Int64ObserverCallback, opts ...metric.Option) (metric.Int64Observer, error) {
return metric.WrapInt64ObserverInstrument(m.newAsync(
metric.NewDescriptor(name, metric.ObserverKind, core.Int64NumberKind, opts...),
metric.NewDescriptor(name, metric.ObserverKind, core.Int64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.AsyncImpl, error) {
return asyncCheck(other.RegisterInt64Observer(name, callback, opts...))
}))
@ -416,7 +454,7 @@ func (m *meter) RegisterInt64Observer(name string, callback metric.Int64Observer
func (m *meter) RegisterFloat64Observer(name string, callback metric.Float64ObserverCallback, opts ...metric.Option) (metric.Float64Observer, error) {
return metric.WrapFloat64ObserverInstrument(m.newAsync(
metric.NewDescriptor(name, metric.ObserverKind, core.Float64NumberKind, opts...),
metric.NewDescriptor(name, metric.ObserverKind, core.Float64NumberKind, m.withName(opts)...),
func(other metric.Meter) (metric.AsyncImpl, error) {
return asyncCheck(other.RegisterFloat64Observer(name, callback, opts...))
}))
@ -427,7 +465,7 @@ func AtomicFieldOffsets() map[string]uintptr {
"meterProvider.delegate": unsafe.Offsetof(meterProvider{}.delegate),
"meter.delegate": unsafe.Offsetof(meter{}.delegate),
"syncImpl.delegate": unsafe.Offsetof(syncImpl{}.delegate),
"obsImpl.delegate": unsafe.Offsetof(obsImpl{}.delegate),
"asyncImpl.delegate": unsafe.Offsetof(asyncImpl{}.delegate),
"labelSet.delegate": unsafe.Offsetof(labelSet{}.delegate),
"syncHandle.delegate": unsafe.Offsetof(syncHandle{}.delegate),
}

View File

@ -32,6 +32,40 @@ import (
metrictest "go.opentelemetry.io/otel/internal/metric"
)
// Note: Maybe this should be factored into ../../../internal/metric?
type measured struct {
Name string
LibraryName string
Labels map[core.Key]core.Value
Number core.Number
}
func asStructs(batches []metrictest.Batch) []measured {
var r []measured
for _, batch := range batches {
for _, m := range batch.Measurements {
r = append(r, measured{
Name: m.Instrument.Descriptor().Name(),
LibraryName: m.Instrument.Descriptor().LibraryName(),
Labels: batch.LabelSet.Labels,
Number: m.Number,
})
}
}
return r
}
func asMap(kvs ...core.KeyValue) map[core.Key]core.Value {
m := map[core.Key]core.Value{}
for _, kv := range kvs {
m[kv.Key] = kv.Value
}
return m
}
var asInt = core.NewInt64Number
var asFloat = core.NewFloat64Number
func TestDirect(t *testing.T) {
internal.ResetForTest()
@ -67,89 +101,64 @@ func TestDirect(t *testing.T) {
second.Record(ctx, 1, labels3)
second.Record(ctx, 2, labels3)
sdk := metrictest.NewProvider()
global.SetMeterProvider(sdk)
mock, provider := metrictest.NewProvider()
global.SetMeterProvider(provider)
counter.Add(ctx, 1, labels1)
measure.Record(ctx, 3, labels1)
second.Record(ctx, 3, labels3)
mockImpl, _ := metric.UnwrapImpl(sdk.Meter("test1"))
mock := mockImpl.(*metrictest.Meter)
mock.RunAsyncInstruments()
require.Len(t, mock.MeasurementBatches, 6)
require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[0].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[0].Measurements, 1)
require.Equal(t, int64(1),
mock.MeasurementBatches[0].Measurements[0].Number.AsInt64())
require.Equal(t, "test.counter",
mock.MeasurementBatches[0].Measurements[0].Instrument.Descriptor().Name())
measurements := asStructs(mock.MeasurementBatches)
require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[1].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[1].Measurements, 1)
require.InDelta(t, float64(3),
mock.MeasurementBatches[1].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.measure",
mock.MeasurementBatches[1].Measurements[0].Instrument.Descriptor().Name())
require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[2].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[2].Measurements, 1)
require.InDelta(t, float64(1),
mock.MeasurementBatches[2].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.observer.float",
mock.MeasurementBatches[2].Measurements[0].Instrument.Descriptor().Name())
require.Equal(t, map[core.Key]core.Value{
lvals2.Key: lvals2.Value,
}, mock.MeasurementBatches[3].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[3].Measurements, 1)
require.InDelta(t, float64(2),
mock.MeasurementBatches[3].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.observer.float",
mock.MeasurementBatches[3].Measurements[0].Instrument.Descriptor().Name())
require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[4].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[4].Measurements, 1)
require.Equal(t, int64(1),
mock.MeasurementBatches[4].Measurements[0].Number.AsInt64())
require.Equal(t, "test.observer.int",
mock.MeasurementBatches[4].Measurements[0].Instrument.Descriptor().Name())
require.Equal(t, map[core.Key]core.Value{
lvals2.Key: lvals2.Value,
}, mock.MeasurementBatches[5].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[5].Measurements, 1)
require.Equal(t, int64(2),
mock.MeasurementBatches[5].Measurements[0].Number.AsInt64())
require.Equal(t, "test.observer.int",
mock.MeasurementBatches[5].Measurements[0].Instrument.Descriptor().Name())
// This tests the second Meter instance
mockImpl, _ = metric.UnwrapImpl(sdk.Meter("test2"))
mock = mockImpl.(*metrictest.Meter)
require.Len(t, mock.MeasurementBatches, 1)
require.Equal(t, map[core.Key]core.Value{
lvals3.Key: lvals3.Value,
}, mock.MeasurementBatches[0].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[0].Measurements, 1)
require.InDelta(t, float64(3),
mock.MeasurementBatches[0].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.second",
mock.MeasurementBatches[0].Measurements[0].Instrument.Descriptor().Name())
require.EqualValues(t,
[]measured{
{
Name: "test.counter",
LibraryName: "test1",
Labels: asMap(lvals1),
Number: asInt(1),
},
{
Name: "test.measure",
LibraryName: "test1",
Labels: asMap(lvals1),
Number: asFloat(3),
},
{
Name: "test.second",
LibraryName: "test2",
Labels: asMap(lvals3),
Number: asFloat(3),
},
{
Name: "test.observer.float",
LibraryName: "test1",
Labels: asMap(lvals1),
Number: asFloat(1),
},
{
Name: "test.observer.float",
LibraryName: "test1",
Labels: asMap(lvals2),
Number: asFloat(2),
},
{
Name: "test.observer.int",
LibraryName: "test1",
Labels: asMap(lvals1),
Number: asInt(1),
},
{
Name: "test.observer.int",
LibraryName: "test1",
Labels: asMap(lvals2),
Number: asInt(2),
},
},
measurements,
)
}
func TestBound(t *testing.T) {
@ -172,34 +181,28 @@ func TestBound(t *testing.T) {
boundM.Record(ctx, 1)
boundM.Record(ctx, 2)
sdk := metrictest.NewProvider()
global.SetMeterProvider(sdk)
mock, provider := metrictest.NewProvider()
global.SetMeterProvider(provider)
boundC.Add(ctx, 1)
boundM.Record(ctx, 3)
mockImpl, _ := metric.UnwrapImpl(sdk.Meter("test"))
mock := mockImpl.(*metrictest.Meter)
require.Len(t, mock.MeasurementBatches, 2)
require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[0].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[0].Measurements, 1)
require.InDelta(t, float64(1),
mock.MeasurementBatches[0].Measurements[0].Number.AsFloat64(),
0.01)
require.Equal(t, "test.counter",
mock.MeasurementBatches[0].Measurements[0].Instrument.Descriptor().Name())
require.Equal(t, map[core.Key]core.Value{
lvals1.Key: lvals1.Value,
}, mock.MeasurementBatches[1].LabelSet.Labels)
require.Len(t, mock.MeasurementBatches[1].Measurements, 1)
require.Equal(t, int64(3),
mock.MeasurementBatches[1].Measurements[0].Number.AsInt64())
require.Equal(t, "test.measure",
mock.MeasurementBatches[1].Measurements[0].Instrument.Descriptor().Name())
require.EqualValues(t,
[]measured{
{
Name: "test.counter",
LibraryName: "test",
Labels: asMap(lvals1),
Number: asFloat(1),
},
{
Name: "test.measure",
LibraryName: "test",
Labels: asMap(lvals1),
Number: asInt(3),
},
},
asStructs(mock.MeasurementBatches))
boundC.Unbind()
boundM.Unbind()
@ -263,19 +266,17 @@ func TestUnbindThenRecordOne(t *testing.T) {
internal.ResetForTest()
ctx := context.Background()
sdk := metrictest.NewProvider()
mock, provider := metrictest.NewProvider()
meter := global.Meter("test")
counter := Must(meter).NewInt64Counter("test.counter")
boundC := counter.Bind(meter.Labels())
global.SetMeterProvider(sdk)
global.SetMeterProvider(provider)
boundC.Unbind()
require.NotPanics(t, func() {
boundC.Add(ctx, 1)
})
mockImpl, _ := metric.UnwrapImpl(global.Meter("test"))
mock := mockImpl.(*metrictest.Meter)
require.Equal(t, 0, len(mock.MeasurementBatches))
}
@ -304,7 +305,8 @@ func TestErrorInDeferredConstructor(t *testing.T) {
c1 := Must(meter).NewInt64Counter("test")
c2 := Must(meter).NewInt64Counter("test")
sdk := &meterProviderWithConstructorError{metrictest.NewProvider()}
_, provider := metrictest.NewProvider()
sdk := &meterProviderWithConstructorError{provider}
require.Panics(t, func() {
global.SetMeterProvider(sdk)
@ -345,8 +347,8 @@ func TestImplementationIndirection(t *testing.T) {
require.False(t, ok)
// Register the SDK
sdk := metrictest.NewProvider()
global.SetMeterProvider(sdk)
_, provider := metrictest.NewProvider()
global.SetMeterProvider(provider)
// Repeat the above tests

View File

@ -0,0 +1,114 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"errors"
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
)
type (
newFunc func(name, libraryName string) (metric.InstrumentImpl, error)
)
var (
allNew = map[string]newFunc{
"counter.int64": func(name, libraryName string) (metric.InstrumentImpl, error) {
return unwrap(MeterProvider().Meter(libraryName).NewInt64Counter(name))
},
"counter.float64": func(name, libraryName string) (metric.InstrumentImpl, error) {
return unwrap(MeterProvider().Meter(libraryName).NewFloat64Counter(name))
},
"measure.int64": func(name, libraryName string) (metric.InstrumentImpl, error) {
return unwrap(MeterProvider().Meter(libraryName).NewInt64Measure(name))
},
"measure.float64": func(name, libraryName string) (metric.InstrumentImpl, error) {
return unwrap(MeterProvider().Meter(libraryName).NewFloat64Measure(name))
},
"observer.int64": func(name, libraryName string) (metric.InstrumentImpl, error) {
return unwrap(MeterProvider().Meter(libraryName).RegisterInt64Observer(name, func(metric.Int64ObserverResult) {}))
},
"observer.float64": func(name, libraryName string) (metric.InstrumentImpl, error) {
return unwrap(MeterProvider().Meter(libraryName).RegisterFloat64Observer(name, func(metric.Float64ObserverResult) {}))
},
}
)
func unwrap(impl interface{}, err error) (metric.InstrumentImpl, error) {
if impl == nil {
return nil, err
}
if s, ok := impl.(interface {
SyncImpl() metric.SyncImpl
}); ok {
return s.SyncImpl(), err
}
if a, ok := impl.(interface {
AsyncImpl() metric.AsyncImpl
}); ok {
return a.AsyncImpl(), err
}
return nil, err
}
func TestRegistrySameInstruments(t *testing.T) {
for _, nf := range allNew {
ResetForTest()
inst1, err1 := nf("this", "meter")
inst2, err2 := nf("this", "meter")
require.NoError(t, err1)
require.NoError(t, err2)
require.Equal(t, inst1, inst2)
}
}
func TestRegistryDifferentNamespace(t *testing.T) {
for _, nf := range allNew {
ResetForTest()
inst1, err1 := nf("this", "meter1")
inst2, err2 := nf("this", "meter2")
require.NoError(t, err1)
require.NoError(t, err2)
require.NotEqual(t, inst1, inst2)
}
}
func TestRegistryDiffInstruments(t *testing.T) {
for origName, origf := range allNew {
ResetForTest()
_, err := origf("this", "super")
require.NoError(t, err)
for newName, nf := range allNew {
if newName == origName {
continue
}
other, err := nf("this", "super")
require.Error(t, err)
require.NotNil(t, other)
require.True(t, errors.Is(err, registry.ErrMetricKindMismatch))
require.Contains(t, err.Error(), "super")
}
}
}

View File

@ -109,7 +109,7 @@ func defaultTracerValue() *atomic.Value {
func defaultMeterValue() *atomic.Value {
v := &atomic.Value{}
v.Store(meterProviderHolder{mp: &meterProvider{}})
v.Store(meterProviderHolder{mp: newMeterProvider()})
return v
}

View File

@ -48,6 +48,9 @@ type Config struct {
Keys []core.Key
// Resource describes the entity for which measurements are made.
Resource resource.Resource
// LibraryName is the name given to the Meter that created
// this instrument. See `Provider`.
LibraryName string
}
// Option is an interface for applying metric options.
@ -150,6 +153,12 @@ func (d Descriptor) Resource() resource.Resource {
return d.config.Resource
}
// LibraryName returns the metric instrument's library name, typically
// given via a call to Provider.Meter().
func (d Descriptor) LibraryName() string {
return d.config.LibraryName
}
// Meter is an interface to the metrics portion of the OpenTelemetry SDK.
type Meter interface {
// Labels returns a reference to a set of labels that cannot
@ -234,3 +243,21 @@ type resourceOption resource.Resource
func (r resourceOption) Apply(config *Config) {
config.Resource = resource.Resource(r)
}
// WithLibraryName applies provided library name. This is meant for
// use in `Provider` implementations that have not used
// `WrapMeterImpl`. Implementations built using `WrapMeterImpl` have
// instrument descriptors taken care of through this package.
//
// This option will have no effect when supplied by the user.
// Provider implementations are expected to append this option after
// the user-supplied options when building instrument descriptors.
func WithLibraryName(name string) Option {
return libraryNameOption(name)
}
type libraryNameOption string
func (r libraryNameOption) Apply(config *Config) {
config.LibraryName = string(r)
}

View File

@ -212,7 +212,7 @@ func TestObserver(t *testing.T) {
}
}
func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, mock *mockTest.Meter, kind core.NumberKind, instrument metric.InstrumentImpl) {
func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, mock *mockTest.MeterImpl, kind core.NumberKind, instrument metric.InstrumentImpl) {
t.Helper()
if len(mock.MeasurementBatches) != 3 {
t.Errorf("Expected 3 recorded measurement batches, got %d", len(mock.MeasurementBatches))
@ -261,7 +261,7 @@ func checkBatches(t *testing.T, ctx context.Context, labels metric.LabelSet, moc
}
}
func checkObserverBatch(t *testing.T, labels metric.LabelSet, mock *mockTest.Meter, kind core.NumberKind, observer metric.AsyncImpl) {
func checkObserverBatch(t *testing.T, labels metric.LabelSet, mock *mockTest.MeterImpl, kind core.NumberKind, observer metric.AsyncImpl) {
t.Helper()
assert.Len(t, mock.MeasurementBatches, 1)
if len(mock.MeasurementBatches) < 1 {
@ -318,7 +318,7 @@ func (testWrappedMeter) NewAsyncInstrument(_ metric.Descriptor, _ func(func(core
func TestWrappedInstrumentError(t *testing.T) {
impl := &testWrappedMeter{}
meter := metric.WrapMeterImpl(impl)
meter := metric.WrapMeterImpl(impl, "test")
measure, err := meter.NewInt64Measure("test.measure")

View File

@ -1,4 +1,4 @@
// Copyright 2020, OpenTelemetry Authors
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.

View File

@ -0,0 +1,152 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry // import "go.opentelemetry.io/otel/api/metric/registry"
import (
"context"
"fmt"
"sync"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/metric"
)
// uniqueInstrumentMeterImpl implements the metric.MeterImpl interface, adding
// uniqueness checking for instrument descriptors. Use NewUniqueInstrumentMeter
// to wrap an implementation with uniqueness checking.
type uniqueInstrumentMeterImpl struct {
lock sync.Mutex
impl metric.MeterImpl
state map[key]metric.InstrumentImpl
}
type key struct {
name string
libraryName string
}
// ErrMetricKindMismatch is the standard error for mismatched metric
// instrument definitions.
var ErrMetricKindMismatch = fmt.Errorf(
"A metric was already registered by this name with another kind or number type")
var _ metric.MeterImpl = (*uniqueInstrumentMeterImpl)(nil)
// NewUniqueInstrumentMeterImpl returns a wrapped metric.MeterImpl with
// the addition of uniqueness checking.
func NewUniqueInstrumentMeterImpl(impl metric.MeterImpl) metric.MeterImpl {
return &uniqueInstrumentMeterImpl{
impl: impl,
state: map[key]metric.InstrumentImpl{},
}
}
// Labels implements metric.MeterImpl.
func (u *uniqueInstrumentMeterImpl) Labels(kvs ...core.KeyValue) metric.LabelSet {
return u.impl.Labels(kvs...)
}
// RecordBatch implements metric.MeterImpl.
func (u *uniqueInstrumentMeterImpl) RecordBatch(ctx context.Context, labels metric.LabelSet, ms ...metric.Measurement) {
u.impl.RecordBatch(ctx, labels, ms...)
}
func keyOf(descriptor metric.Descriptor) key {
return key{
descriptor.Name(),
descriptor.LibraryName(),
}
}
// NewMetricKindMismatchError formats an error that describes a
// mismatched metric instrument definition.
func NewMetricKindMismatchError(desc metric.Descriptor) error {
return fmt.Errorf("Metric was %s (%s) registered as a %s %s: %w",
desc.Name(),
desc.LibraryName(),
desc.NumberKind(),
desc.MetricKind(),
ErrMetricKindMismatch)
}
// Compatible determines whether two metric.Descriptors are considered
// the same for the purpose of uniqueness checking.
func Compatible(candidate, existing metric.Descriptor) bool {
return candidate.MetricKind() == existing.MetricKind() &&
candidate.NumberKind() == existing.NumberKind()
}
// checkUniqueness returns an ErrMetricKindMismatch error if there is
// a conflict between a descriptor that was already registered and the
// `descriptor` argument. If there is an existing compatible
// registration, this returns the already-registered instrument. If
// there is no conflict and no prior registration, returns (nil, nil).
func (u *uniqueInstrumentMeterImpl) checkUniqueness(descriptor metric.Descriptor) (metric.InstrumentImpl, error) {
impl, ok := u.state[keyOf(descriptor)]
if !ok {
return nil, nil
}
if !Compatible(descriptor, impl.Descriptor()) {
return nil, NewMetricKindMismatchError(impl.Descriptor())
}
return impl, nil
}
// NewSyncInstrument implements metric.MeterImpl.
func (u *uniqueInstrumentMeterImpl) NewSyncInstrument(descriptor metric.Descriptor) (metric.SyncImpl, error) {
u.lock.Lock()
defer u.lock.Unlock()
impl, err := u.checkUniqueness(descriptor)
if err != nil {
return nil, err
} else if impl != nil {
return impl.(metric.SyncImpl), nil
}
syncInst, err := u.impl.NewSyncInstrument(descriptor)
if err != nil {
return nil, err
}
u.state[keyOf(descriptor)] = syncInst
return syncInst, nil
}
// NewAsyncInstrument implements metric.MeterImpl.
func (u *uniqueInstrumentMeterImpl) NewAsyncInstrument(
descriptor metric.Descriptor,
callback func(func(core.Number, metric.LabelSet)),
) (metric.AsyncImpl, error) {
u.lock.Lock()
defer u.lock.Unlock()
impl, err := u.checkUniqueness(descriptor)
if err != nil {
return nil, err
} else if impl != nil {
return impl.(metric.AsyncImpl), nil
}
asyncInst, err := u.impl.NewAsyncInstrument(descriptor, callback)
if err != nil {
return nil, err
}
u.state[keyOf(descriptor)] = asyncInst
return asyncInst, nil
}

View File

@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry_test
import (
"errors"
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
mockTest "go.opentelemetry.io/otel/internal/metric"
)
type (
newFunc func(m metric.Meter, name string) (metric.InstrumentImpl, error)
)
var (
allNew = map[string]newFunc{
"counter.int64": func(m metric.Meter, name string) (metric.InstrumentImpl, error) {
return unwrap(m.NewInt64Counter(name))
},
"counter.float64": func(m metric.Meter, name string) (metric.InstrumentImpl, error) {
return unwrap(m.NewFloat64Counter(name))
},
"measure.int64": func(m metric.Meter, name string) (metric.InstrumentImpl, error) {
return unwrap(m.NewInt64Measure(name))
},
"measure.float64": func(m metric.Meter, name string) (metric.InstrumentImpl, error) {
return unwrap(m.NewFloat64Measure(name))
},
"observer.int64": func(m metric.Meter, name string) (metric.InstrumentImpl, error) {
return unwrap(m.RegisterInt64Observer(name, func(metric.Int64ObserverResult) {}))
},
"observer.float64": func(m metric.Meter, name string) (metric.InstrumentImpl, error) {
return unwrap(m.RegisterFloat64Observer(name, func(metric.Float64ObserverResult) {}))
},
}
)
func unwrap(impl interface{}, err error) (metric.InstrumentImpl, error) {
if impl == nil {
return nil, err
}
if s, ok := impl.(interface {
SyncImpl() metric.SyncImpl
}); ok {
return s.SyncImpl(), err
}
if a, ok := impl.(interface {
AsyncImpl() metric.AsyncImpl
}); ok {
return a.AsyncImpl(), err
}
return nil, err
}
func TestRegistrySameInstruments(t *testing.T) {
for _, nf := range allNew {
_, provider := mockTest.NewProvider()
meter := provider.Meter("meter")
inst1, err1 := nf(meter, "this")
inst2, err2 := nf(meter, "this")
require.NoError(t, err1)
require.NoError(t, err2)
require.Equal(t, inst1, inst2)
}
}
func TestRegistryDifferentNamespace(t *testing.T) {
for _, nf := range allNew {
_, provider := mockTest.NewProvider()
meter1 := provider.Meter("meter1")
meter2 := provider.Meter("meter2")
inst1, err1 := nf(meter1, "this")
inst2, err2 := nf(meter2, "this")
require.NoError(t, err1)
require.NoError(t, err2)
require.NotEqual(t, inst1, inst2)
}
}
func TestRegistryDiffInstruments(t *testing.T) {
for origName, origf := range allNew {
_, provider := mockTest.NewProvider()
meter := provider.Meter("meter")
_, err := origf(meter, "this")
require.NoError(t, err)
for newName, nf := range allNew {
if newName == origName {
continue
}
other, err := nf(meter, "this")
require.Error(t, err)
require.NotNil(t, other)
require.True(t, errors.Is(err, registry.ErrMetricKindMismatch))
}
}
}

View File

@ -104,7 +104,8 @@ type AsyncImpl interface {
// wrappedMeterImpl implements the `Meter` interface given a
// `MeterImpl` implementation.
type wrappedMeterImpl struct {
impl MeterImpl
impl MeterImpl
libraryName string
}
// int64ObserverResult is an adapter for int64-valued asynchronous
@ -159,21 +160,13 @@ func insertResource(impl MeterImpl, opts []Option) []Option {
// WrapMeterImpl constructs a `Meter` implementation from a
// `MeterImpl` implementation.
func WrapMeterImpl(impl MeterImpl) Meter {
func WrapMeterImpl(impl MeterImpl, libraryName string) Meter {
return &wrappedMeterImpl{
impl: impl,
impl: impl,
libraryName: libraryName,
}
}
// UnwrapImpl returns a `MeterImpl` given a `Meter` that was
// constructed using `WrapMeterImpl`.
func UnwrapImpl(meter Meter) (MeterImpl, bool) {
if wrap, ok := meter.(*wrappedMeterImpl); ok {
return wrap.impl, true
}
return nil, false
}
func (m *wrappedMeterImpl) Labels(labels ...core.KeyValue) LabelSet {
return m.impl.Labels(labels...)
}
@ -184,7 +177,9 @@ func (m *wrappedMeterImpl) RecordBatch(ctx context.Context, ls LabelSet, ms ...M
func (m *wrappedMeterImpl) newSync(name string, metricKind Kind, numberKind core.NumberKind, opts []Option) (SyncImpl, error) {
opts = insertResource(m.impl, opts)
return m.impl.NewSyncInstrument(NewDescriptor(name, metricKind, numberKind, opts...))
desc := NewDescriptor(name, metricKind, numberKind, opts...)
desc.config.LibraryName = m.libraryName
return m.impl.NewSyncInstrument(desc)
}
func (m *wrappedMeterImpl) NewInt64Counter(name string, opts ...Option) (Int64Counter, error) {
@ -245,9 +240,9 @@ func WrapFloat64MeasureInstrument(syncInst SyncImpl, err error) (Float64Measure,
func (m *wrappedMeterImpl) newAsync(name string, mkind Kind, nkind core.NumberKind, opts []Option, callback func(func(core.Number, LabelSet))) (AsyncImpl, error) {
opts = insertResource(m.impl, opts)
return m.impl.NewAsyncInstrument(
NewDescriptor(name, mkind, nkind, opts...),
callback)
desc := NewDescriptor(name, mkind, nkind, opts...)
desc.config.LibraryName = m.libraryName
return m.impl.NewAsyncInstrument(desc, callback)
}
func (m *wrappedMeterImpl) RegisterInt64Observer(name string, callback Int64ObserverCallback, opts ...Option) (Int64Observer, error) {

View File

@ -21,6 +21,7 @@ import (
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/metric"
apimetric "go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
)
type (
@ -30,8 +31,8 @@ type (
}
LabelSet struct {
TheMeter *Meter
Labels map[core.Key]core.Value
Impl *MeterImpl
Labels map[core.Key]core.Value
}
Batch struct {
@ -39,14 +40,17 @@ type (
Measurements []Measurement
Ctx context.Context
LabelSet *LabelSet
LibraryName string
}
MeterProvider struct {
lock sync.Mutex
impl *MeterImpl
unique metric.MeterImpl
registered map[string]apimetric.Meter
}
Meter struct {
MeterImpl struct {
MeasurementBatches []Batch
AsyncInstruments []*Async
}
@ -58,7 +62,7 @@ type (
}
Instrument struct {
meter *Meter
meter *MeterImpl
descriptor apimetric.Descriptor
}
@ -77,7 +81,7 @@ var (
_ apimetric.SyncImpl = &Sync{}
_ apimetric.BoundSyncImpl = &Handle{}
_ apimetric.LabelSet = &LabelSet{}
_ apimetric.MeterImpl = &Meter{}
_ apimetric.MeterImpl = &MeterImpl{}
_ apimetric.AsyncImpl = &Async{}
)
@ -117,17 +121,21 @@ func (h *Handle) RecordOne(ctx context.Context, number core.Number) {
func (h *Handle) Unbind() {
}
func (m *Meter) doRecordSingle(ctx context.Context, labelSet *LabelSet, instrument apimetric.InstrumentImpl, number core.Number) {
func (m *MeterImpl) doRecordSingle(ctx context.Context, labelSet *LabelSet, instrument apimetric.InstrumentImpl, number core.Number) {
m.recordMockBatch(ctx, labelSet, Measurement{
Instrument: instrument,
Number: number,
})
}
func NewProvider() *MeterProvider {
return &MeterProvider{
func NewProvider() (*MeterImpl, apimetric.Provider) {
impl := &MeterImpl{}
p := &MeterProvider{
impl: impl,
unique: registry.NewUniqueInstrumentMeterImpl(impl),
registered: map[string]apimetric.Meter{},
}
return impl, p
}
func (p *MeterProvider) Meter(name string) apimetric.Meter {
@ -137,28 +145,28 @@ func (p *MeterProvider) Meter(name string) apimetric.Meter {
if lookup, ok := p.registered[name]; ok {
return lookup
}
_, m := NewMeter()
m := apimetric.WrapMeterImpl(p.unique, name)
p.registered[name] = m
return m
}
func NewMeter() (*Meter, apimetric.Meter) {
mock := &Meter{}
return mock, apimetric.WrapMeterImpl(mock)
func NewMeter() (*MeterImpl, apimetric.Meter) {
impl, p := NewProvider()
return impl, p.Meter("mock")
}
func (m *Meter) Labels(labels ...core.KeyValue) apimetric.LabelSet {
func (m *MeterImpl) Labels(labels ...core.KeyValue) apimetric.LabelSet {
ul := make(map[core.Key]core.Value)
for _, kv := range labels {
ul[kv.Key] = kv.Value
}
return &LabelSet{
TheMeter: m,
Labels: ul,
Impl: m,
Labels: ul,
}
}
func (m *Meter) NewSyncInstrument(descriptor metric.Descriptor) (apimetric.SyncImpl, error) {
func (m *MeterImpl) NewSyncInstrument(descriptor metric.Descriptor) (apimetric.SyncImpl, error) {
return &Sync{
Instrument{
descriptor: descriptor,
@ -167,7 +175,7 @@ func (m *Meter) NewSyncInstrument(descriptor metric.Descriptor) (apimetric.SyncI
}, nil
}
func (m *Meter) NewAsyncInstrument(descriptor metric.Descriptor, callback func(func(core.Number, apimetric.LabelSet))) (apimetric.AsyncImpl, error) {
func (m *MeterImpl) NewAsyncInstrument(descriptor metric.Descriptor, callback func(func(core.Number, apimetric.LabelSet))) (apimetric.AsyncImpl, error) {
a := &Async{
Instrument: Instrument{
descriptor: descriptor,
@ -179,7 +187,7 @@ func (m *Meter) NewAsyncInstrument(descriptor metric.Descriptor, callback func(f
return a, nil
}
func (m *Meter) RecordBatch(ctx context.Context, labels apimetric.LabelSet, measurements ...apimetric.Measurement) {
func (m *MeterImpl) RecordBatch(ctx context.Context, labels apimetric.LabelSet, measurements ...apimetric.Measurement) {
ourLabelSet := labels.(*LabelSet)
mm := make([]Measurement, len(measurements))
for i := 0; i < len(measurements); i++ {
@ -192,7 +200,7 @@ func (m *Meter) RecordBatch(ctx context.Context, labels apimetric.LabelSet, meas
m.recordMockBatch(ctx, ourLabelSet, mm...)
}
func (m *Meter) recordMockBatch(ctx context.Context, labelSet *LabelSet, measurements ...Measurement) {
func (m *MeterImpl) recordMockBatch(ctx context.Context, labelSet *LabelSet, measurements ...Measurement) {
m.MeasurementBatches = append(m.MeasurementBatches, Batch{
Ctx: ctx,
LabelSet: labelSet,
@ -200,7 +208,7 @@ func (m *Meter) recordMockBatch(ctx context.Context, labelSet *LabelSet, measure
})
}
func (m *Meter) RunAsyncInstruments() {
func (m *MeterImpl) RunAsyncInstruments() {
for _, observer := range m.AsyncInstruments {
observer.callback(func(n core.Number, labels apimetric.LabelSet) {

View File

@ -46,8 +46,9 @@ func newFixture(b *testing.B) *benchFixture {
bf := &benchFixture{
B: b,
}
bf.sdk = sdk.New(bf)
bf.meter = metric.Must(metric.WrapMeterImpl(bf.sdk))
bf.meter = metric.Must(metric.WrapMeterImpl(bf.sdk, "benchmarks"))
return bf
}

View File

@ -20,6 +20,7 @@ import (
"time"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/metric/registry"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
)
@ -29,7 +30,8 @@ type Controller struct {
lock sync.Mutex
collectLock sync.Mutex
sdk *sdk.SDK
meter metric.Meter
uniq metric.MeterImpl
named map[string]metric.Meter
errorHandler sdk.ErrorHandler
batcher export.Batcher
exporter export.Exporter
@ -78,7 +80,8 @@ func New(batcher export.Batcher, exporter export.Exporter, period time.Duration,
impl := sdk.New(batcher, sdk.WithResource(c.Resource), sdk.WithErrorHandler(c.ErrorHandler))
return &Controller{
sdk: impl,
meter: metric.WrapMeterImpl(impl),
uniq: registry.NewUniqueInstrumentMeterImpl(impl),
named: map[string]metric.Meter{},
errorHandler: c.ErrorHandler,
batcher: batcher,
exporter: exporter,
@ -105,8 +108,17 @@ func (c *Controller) SetErrorHandler(errorHandler sdk.ErrorHandler) {
// Meter returns a named Meter, satisifying the metric.Provider
// interface.
func (c *Controller) Meter(_ string) metric.Meter {
return c.meter
func (c *Controller) Meter(name string) metric.Meter {
c.lock.Lock()
defer c.lock.Unlock()
if meter, ok := c.named[name]; ok {
return meter
}
meter := metric.WrapMeterImpl(c.uniq, name)
c.named[name] = meter
return meter
}
// Start begins a ticker that periodically collects and exports

View File

@ -73,7 +73,7 @@ func TestInputRangeTestCounter(t *testing.T) {
t: t,
}
sdk := metricsdk.New(batcher)
meter := metric.WrapMeterImpl(sdk)
meter := metric.WrapMeterImpl(sdk, "test")
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
@ -108,7 +108,7 @@ func TestInputRangeTestMeasure(t *testing.T) {
t: t,
}
sdk := metricsdk.New(batcher)
meter := metric.WrapMeterImpl(sdk)
meter := metric.WrapMeterImpl(sdk, "test")
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
@ -146,7 +146,8 @@ func TestDisabledInstrument(t *testing.T) {
t: t,
}
sdk := metricsdk.New(batcher)
meter := metric.WrapMeterImpl(sdk)
meter := metric.WrapMeterImpl(sdk, "test")
measure := Must(meter).NewFloat64Measure("name.disabled")
measure.Record(ctx, -1, sdk.Labels())
@ -161,8 +162,9 @@ func TestRecordNaN(t *testing.T) {
batcher := &correctnessBatcher{
t: t,
}
sdk := metricsdk.New(batcher)
meter := metric.WrapMeterImpl(sdk)
meter := metric.WrapMeterImpl(sdk, "test")
var sdkErr error
sdk.SetErrorHandler(func(handleErr error) {
@ -181,7 +183,7 @@ func TestSDKLabelsDeduplication(t *testing.T) {
t: t,
}
sdk := metricsdk.New(batcher)
meter := metric.WrapMeterImpl(sdk)
meter := metric.WrapMeterImpl(sdk, "test")
counter := Must(meter).NewInt64Counter("counter")
@ -278,8 +280,9 @@ func TestObserverCollection(t *testing.T) {
batcher := &correctnessBatcher{
t: t,
}
sdk := metricsdk.New(batcher)
meter := metric.WrapMeterImpl(sdk)
meter := metric.WrapMeterImpl(sdk, "test")
_ = Must(meter).RegisterFloat64Observer("float.observer", func(result metric.Float64ObserverResult) {
// TODO: The spec says the last-value wins in observer

View File

@ -297,7 +297,7 @@ func stressTest(t *testing.T, impl testImpl) {
}
cc := concurrency()
sdk := New(fixture)
meter := metric.WrapMeterImpl(sdk)
meter := metric.WrapMeterImpl(sdk, "stress_test")
fixture.wg.Add(cc + 1)
for i := 0; i < cc; i++ {