1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-12 02:28:07 +02:00

Adds metrics Global (#2660)

* WIP: add global API

* WIP

* Add a global meter.

* moved global access out of metric because of loop imports

* fix linting issues

* remove changes from other lint failures.

* Add changelog

* Fixes for comments.

Changed name of global API.
Added stop to all race tests go routine.
Added race tests for other instruments.

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Consolidated instrument tests

* fixed lint, and removed unneeded type checking

* change require's to asserts.

* Update misspelling

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Fix meter race test.

* Copy SetTracerProvider logic.

* Fix global test for panic.

* Fix linting error

* bump testify version

* moved changelog into unreleased

Co-authored-by: Aaron Clawson <MadVikingGod@users.noreply.github.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Aaron Clawson 2022-03-22 10:33:13 -05:00 committed by GitHub
parent 9a51174e63
commit 8a7dcd9650
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1428 additions and 1 deletions

View File

@ -17,6 +17,7 @@ Code instrumented with the `go.opentelemetry.io/otel/metric` will need to be mod
- Add go 1.18 to our compatibility tests. (#2679)
- Allow configuring the Sampler with the `OTEL_TRACES_SAMPLER` and `OTEL_TRACES_SAMPLER_ARG` environment variables. (#2305, #2517)
- Add the `metric/global` for obtaining and setting the global `MeterProvider` (#2660)
### Changed

31
metric/global/global.go Normal file
View File

@ -0,0 +1,31 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package global // import "go.opentelemetry.io/otel/metric/global"
import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/internal/global"
)
// MeterProvider returns the registered global trace provider.
// If none is registered then a No-op MeterProvider is returned.
func MeterProvider() metric.MeterProvider {
return global.MeterProvider()
}
// SetMeterProvider registers `mp` as the global meter provider.
func SetMeterProvider(mp metric.MeterProvider) {
global.SetMeterProvider(mp)
}

View File

@ -2,7 +2,10 @@ module go.opentelemetry.io/otel/metric
go 1.16
require go.opentelemetry.io/otel v1.5.0
require (
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/otel v1.5.0
)
replace go.opentelemetry.io/otel => ../

View File

@ -1,7 +1,9 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
@ -11,6 +13,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,341 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package global // import "go.opentelemetry.io/otel/metric/internal/global"
import (
"context"
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)
type afCounter struct {
name string
opts []instrument.Option
delegate atomic.Value //asyncfloat64.Counter
instrument.Asynchronous
}
func (i *afCounter) setDelegate(m metric.Meter) {
ctr, err := m.AsyncFloat64().Counter(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *afCounter) Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(asyncfloat64.Counter).Observe(ctx, x, attrs...)
}
}
type afUpDownCounter struct {
name string
opts []instrument.Option
delegate atomic.Value //asyncfloat64.UpDownCounter
instrument.Asynchronous
}
func (i *afUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.AsyncFloat64().UpDownCounter(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *afUpDownCounter) Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(asyncfloat64.UpDownCounter).Observe(ctx, x, attrs...)
}
}
type afGauge struct {
name string
opts []instrument.Option
delegate atomic.Value //asyncfloat64.Gauge
instrument.Asynchronous
}
func (i *afGauge) setDelegate(m metric.Meter) {
ctr, err := m.AsyncFloat64().Gauge(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *afGauge) Observe(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(asyncfloat64.Gauge).Observe(ctx, x, attrs...)
}
}
type aiCounter struct {
name string
opts []instrument.Option
delegate atomic.Value //asyncint64.Counter
instrument.Asynchronous
}
func (i *aiCounter) setDelegate(m metric.Meter) {
ctr, err := m.AsyncInt64().Counter(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *aiCounter) Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(asyncint64.Counter).Observe(ctx, x, attrs...)
}
}
type aiUpDownCounter struct {
name string
opts []instrument.Option
delegate atomic.Value //asyncint64.UpDownCounter
instrument.Asynchronous
}
func (i *aiUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.AsyncInt64().UpDownCounter(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *aiUpDownCounter) Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(asyncint64.UpDownCounter).Observe(ctx, x, attrs...)
}
}
type aiGauge struct {
name string
opts []instrument.Option
delegate atomic.Value //asyncint64.Gauge
instrument.Asynchronous
}
func (i *aiGauge) setDelegate(m metric.Meter) {
ctr, err := m.AsyncInt64().Gauge(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *aiGauge) Observe(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(asyncint64.Gauge).Observe(ctx, x, attrs...)
}
}
//Sync Instruments
type sfCounter struct {
name string
opts []instrument.Option
delegate atomic.Value //syncfloat64.Counter
instrument.Synchronous
}
func (i *sfCounter) setDelegate(m metric.Meter) {
ctr, err := m.SyncFloat64().Counter(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *sfCounter) Add(ctx context.Context, incr float64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(syncfloat64.Counter).Add(ctx, incr, attrs...)
}
}
type sfUpDownCounter struct {
name string
opts []instrument.Option
delegate atomic.Value //syncfloat64.UpDownCounter
instrument.Synchronous
}
func (i *sfUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.SyncFloat64().UpDownCounter(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *sfUpDownCounter) Add(ctx context.Context, incr float64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(syncfloat64.UpDownCounter).Add(ctx, incr, attrs...)
}
}
type sfHistogram struct {
name string
opts []instrument.Option
delegate atomic.Value //syncfloat64.Histogram
instrument.Synchronous
}
func (i *sfHistogram) setDelegate(m metric.Meter) {
ctr, err := m.SyncFloat64().Histogram(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *sfHistogram) Record(ctx context.Context, x float64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(syncfloat64.Histogram).Record(ctx, x, attrs...)
}
}
type siCounter struct {
name string
opts []instrument.Option
delegate atomic.Value //syncint64.Counter
instrument.Synchronous
}
func (i *siCounter) setDelegate(m metric.Meter) {
ctr, err := m.SyncInt64().Counter(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *siCounter) Add(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(syncint64.Counter).Add(ctx, x, attrs...)
}
}
type siUpDownCounter struct {
name string
opts []instrument.Option
delegate atomic.Value //syncint64.UpDownCounter
instrument.Synchronous
}
func (i *siUpDownCounter) setDelegate(m metric.Meter) {
ctr, err := m.SyncInt64().UpDownCounter(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *siUpDownCounter) Add(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(syncint64.UpDownCounter).Add(ctx, x, attrs...)
}
}
type siHistogram struct {
name string
opts []instrument.Option
delegate atomic.Value //syncint64.Histogram
instrument.Synchronous
}
func (i *siHistogram) setDelegate(m metric.Meter) {
ctr, err := m.SyncInt64().Histogram(i.name, i.opts...)
if err != nil {
otel.Handle(err)
return
}
i.delegate.Store(ctr)
}
func (i *siHistogram) Record(ctx context.Context, x int64, attrs ...attribute.KeyValue) {
if ctr := i.delegate.Load(); ctr != nil {
ctr.(syncint64.Histogram).Record(ctx, x, attrs...)
}
}

View File

@ -0,0 +1,171 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package global
import (
"context"
"testing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/nonrecording"
)
func testFloat64Race(interact func(context.Context, float64, ...attribute.KeyValue), setDelegate func(metric.Meter)) {
finish := make(chan struct{})
go func() {
for {
interact(context.Background(), 1)
select {
case <-finish:
return
default:
}
}
}()
setDelegate(nonrecording.NewNoopMeter())
close(finish)
}
func testInt64Race(interact func(context.Context, int64, ...attribute.KeyValue), setDelegate func(metric.Meter)) {
finish := make(chan struct{})
go func() {
for {
interact(context.Background(), 1)
select {
case <-finish:
return
default:
}
}
}()
setDelegate(nonrecording.NewNoopMeter())
close(finish)
}
func TestAsyncInstrumentSetDelegateRace(t *testing.T) {
// Float64 Instruments
t.Run("Float64", func(t *testing.T) {
t.Run("Counter", func(t *testing.T) {
delegate := &afCounter{}
testFloat64Race(delegate.Observe, delegate.setDelegate)
})
t.Run("UpDownCounter", func(t *testing.T) {
delegate := &afUpDownCounter{}
testFloat64Race(delegate.Observe, delegate.setDelegate)
})
t.Run("Gauge", func(t *testing.T) {
delegate := &afGauge{}
testFloat64Race(delegate.Observe, delegate.setDelegate)
})
})
// Int64 Instruments
t.Run("Int64", func(t *testing.T) {
t.Run("Counter", func(t *testing.T) {
delegate := &aiCounter{}
testInt64Race(delegate.Observe, delegate.setDelegate)
})
t.Run("UpDownCounter", func(t *testing.T) {
delegate := &aiUpDownCounter{}
testInt64Race(delegate.Observe, delegate.setDelegate)
})
t.Run("Gauge", func(t *testing.T) {
delegate := &aiGauge{}
testInt64Race(delegate.Observe, delegate.setDelegate)
})
})
}
func TestSyncInstrumentSetDelegateRace(t *testing.T) {
// Float64 Instruments
t.Run("Float64", func(t *testing.T) {
t.Run("Counter", func(t *testing.T) {
delegate := &sfCounter{}
testFloat64Race(delegate.Add, delegate.setDelegate)
})
t.Run("UpDownCounter", func(t *testing.T) {
delegate := &sfUpDownCounter{}
testFloat64Race(delegate.Add, delegate.setDelegate)
})
t.Run("Histogram", func(t *testing.T) {
delegate := &sfHistogram{}
testFloat64Race(delegate.Record, delegate.setDelegate)
})
})
// Int64 Instruments
t.Run("Int64", func(t *testing.T) {
t.Run("Counter", func(t *testing.T) {
delegate := &siCounter{}
testInt64Race(delegate.Add, delegate.setDelegate)
})
t.Run("UpDownCounter", func(t *testing.T) {
delegate := &siUpDownCounter{}
testInt64Race(delegate.Add, delegate.setDelegate)
})
t.Run("Histogram", func(t *testing.T) {
delegate := &siHistogram{}
testInt64Race(delegate.Record, delegate.setDelegate)
})
})
}
type testCountingFloatInstrument struct {
count int
instrument.Asynchronous
instrument.Synchronous
}
func (i *testCountingFloatInstrument) Observe(context.Context, float64, ...attribute.KeyValue) {
i.count++
}
func (i *testCountingFloatInstrument) Add(context.Context, float64, ...attribute.KeyValue) {
i.count++
}
func (i *testCountingFloatInstrument) Record(context.Context, float64, ...attribute.KeyValue) {
i.count++
}
type testCountingIntInstrument struct {
count int
instrument.Asynchronous
instrument.Synchronous
}
func (i *testCountingIntInstrument) Observe(context.Context, int64, ...attribute.KeyValue) {
i.count++
}
func (i *testCountingIntInstrument) Add(context.Context, int64, ...attribute.KeyValue) {
i.count++
}
func (i *testCountingIntInstrument) Record(context.Context, int64, ...attribute.KeyValue) {
i.count++
}

View File

@ -0,0 +1,327 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package global // import "go.opentelemetry.io/otel/metric/internal/global"
import (
"context"
"sync"
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)
// meterProvider is a placeholder for a configured SDK MeterProvider.
//
// All MeterProvider functionality is forwarded to a delegate once
// configured.
type meterProvider struct {
mtx sync.Mutex
meters map[il]*meter
delegate metric.MeterProvider
}
type il struct {
name string
version string
}
// setDelegate configures p to delegate all MeterProvider functionality to
// provider.
//
// All Meters provided prior to this function call are switched out to be
// Meters provided by provider. All instruments and callbacks are recreated and
// delegated.
//
// It is guaranteed by the caller that this happens only once.
func (p *meterProvider) setDelegate(provider metric.MeterProvider) {
p.mtx.Lock()
defer p.mtx.Unlock()
p.delegate = provider
if len(p.meters) == 0 {
return
}
for _, meter := range p.meters {
meter.setDelegate(provider)
}
p.meters = nil
}
// Meter implements MeterProvider.
func (p *meterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter {
p.mtx.Lock()
defer p.mtx.Unlock()
if p.delegate != nil {
return p.delegate.Meter(name, opts...)
}
// At this moment it is guaranteed that no sdk is installed, save the meter in the meters map.
c := metric.NewMeterConfig(opts...)
key := il{
name: name,
version: c.InstrumentationVersion(),
}
if p.meters == nil {
p.meters = make(map[il]*meter)
}
if val, ok := p.meters[key]; ok {
return val
}
t := &meter{name: name, opts: opts}
p.meters[key] = t
return t
}
// meter is a placeholder for a metric.Meter.
//
// All Meter functionality is forwarded to a delegate once configured.
// Otherwise, all functionality is forwarded to a NoopMeter.
type meter struct {
name string
opts []metric.MeterOption
mtx sync.Mutex
instruments []delegatedInstrument
callbacks []delegatedCallback
delegate atomic.Value // metric.Meter
}
type delegatedInstrument interface {
setDelegate(metric.Meter)
}
// setDelegate configures m to delegate all Meter functionality to Meters
// created by provider.
//
// All subsequent calls to the Meter methods will be passed to the delegate.
//
// It is guaranteed by the caller that this happens only once.
func (m *meter) setDelegate(provider metric.MeterProvider) {
meter := provider.Meter(m.name, m.opts...)
m.delegate.Store(meter)
m.mtx.Lock()
defer m.mtx.Unlock()
for _, inst := range m.instruments {
inst.setDelegate(meter)
}
for _, callback := range m.callbacks {
callback.setDelegate(meter)
}
m.instruments = nil
m.callbacks = nil
}
// AsyncInt64 is the namespace for the Asynchronous Integer instruments.
//
// To Observe data with instruments it must be registered in a callback.
func (m *meter) AsyncInt64() asyncint64.InstrumentProvider {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.AsyncInt64()
}
return (*aiInstProvider)(m)
}
// AsyncFloat64 is the namespace for the Asynchronous Float instruments.
//
// To Observe data with instruments it must be registered in a callback.
func (m *meter) AsyncFloat64() asyncfloat64.InstrumentProvider {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.AsyncFloat64()
}
return (*afInstProvider)(m)
}
// RegisterCallback captures the function that will be called during Collect.
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *meter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.RegisterCallback(insts, function)
}
m.mtx.Lock()
defer m.mtx.Unlock()
m.callbacks = append(m.callbacks, delegatedCallback{
instruments: insts,
function: function,
})
return nil
}
// SyncInt64 is the namespace for the Synchronous Integer instruments
func (m *meter) SyncInt64() syncint64.InstrumentProvider {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.SyncInt64()
}
return (*siInstProvider)(m)
}
// SyncFloat64 is the namespace for the Synchronous Float instruments
func (m *meter) SyncFloat64() syncfloat64.InstrumentProvider {
if del, ok := m.delegate.Load().(metric.Meter); ok {
return del.SyncFloat64()
}
return (*sfInstProvider)(m)
}
type delegatedCallback struct {
instruments []instrument.Asynchronous
function func(context.Context)
}
func (c *delegatedCallback) setDelegate(m metric.Meter) {
err := m.RegisterCallback(c.instruments, c.function)
if err != nil {
otel.Handle(err)
}
}
type afInstProvider meter
// Counter creates an instrument for recording increasing values.
func (ip *afInstProvider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &afCounter{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
// UpDownCounter creates an instrument for recording changes of a value.
func (ip *afInstProvider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &afUpDownCounter{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
// Gauge creates an instrument for recording the current value.
func (ip *afInstProvider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &afGauge{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
type aiInstProvider meter
// Counter creates an instrument for recording increasing values.
func (ip *aiInstProvider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &aiCounter{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
// UpDownCounter creates an instrument for recording changes of a value.
func (ip *aiInstProvider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &aiUpDownCounter{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
// Gauge creates an instrument for recording the current value.
func (ip *aiInstProvider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &aiGauge{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
type sfInstProvider meter
// Counter creates an instrument for recording increasing values.
func (ip *sfInstProvider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &sfCounter{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
// UpDownCounter creates an instrument for recording changes of a value.
func (ip *sfInstProvider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &sfUpDownCounter{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
// Histogram creates an instrument for recording a distribution of values.
func (ip *sfInstProvider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &sfHistogram{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
type siInstProvider meter
// Counter creates an instrument for recording increasing values.
func (ip *siInstProvider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &siCounter{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
// UpDownCounter creates an instrument for recording changes of a value.
func (ip *siInstProvider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &siUpDownCounter{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}
// Histogram creates an instrument for recording a distribution of values.
func (ip *siInstProvider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) {
ip.mtx.Lock()
defer ip.mtx.Unlock()
ctr := &siHistogram{name: name, opts: opts}
ip.instruments = append(ip.instruments, ctr)
return ctr, nil
}

View File

@ -0,0 +1,265 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package global // import "go.opentelemetry.io/otel/metric/internal/global"
import (
"context"
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/nonrecording"
)
func TestMeterProviderRace(t *testing.T) {
mp := &meterProvider{}
finish := make(chan struct{})
go func() {
for i := 0; ; i++ {
mp.Meter(fmt.Sprintf("a%d", i))
select {
case <-finish:
return
default:
}
}
}()
mp.setDelegate(nonrecording.NewNoopMeterProvider())
close(finish)
}
func TestMeterRace(t *testing.T) {
mtr := &meter{}
wg := &sync.WaitGroup{}
wg.Add(1)
finish := make(chan struct{})
go func() {
for i, once := 0, false; ; i++ {
name := fmt.Sprintf("a%d", i)
_, _ = mtr.AsyncFloat64().Counter(name)
_, _ = mtr.AsyncFloat64().UpDownCounter(name)
_, _ = mtr.AsyncFloat64().Gauge(name)
_, _ = mtr.AsyncInt64().Counter(name)
_, _ = mtr.AsyncInt64().UpDownCounter(name)
_, _ = mtr.AsyncInt64().Gauge(name)
_, _ = mtr.SyncFloat64().Counter(name)
_, _ = mtr.SyncFloat64().UpDownCounter(name)
_, _ = mtr.SyncFloat64().Histogram(name)
_, _ = mtr.SyncInt64().Counter(name)
_, _ = mtr.SyncInt64().UpDownCounter(name)
_, _ = mtr.SyncInt64().Histogram(name)
_ = mtr.RegisterCallback(nil, func(ctx context.Context) {})
if !once {
wg.Done()
once = true
}
select {
case <-finish:
return
default:
}
}
}()
wg.Wait()
mtr.setDelegate(nonrecording.NewNoopMeterProvider())
close(finish)
}
func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (syncfloat64.Counter, asyncfloat64.Counter) {
afcounter, err := m.AsyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)
_, err = m.AsyncFloat64().UpDownCounter("test_Async_UpDownCounter")
assert.NoError(t, err)
_, err = m.AsyncFloat64().Gauge("test_Async_Gauge")
assert.NoError(t, err)
_, err = m.AsyncInt64().Counter("test_Async_Counter")
assert.NoError(t, err)
_, err = m.AsyncInt64().UpDownCounter("test_Async_UpDownCounter")
assert.NoError(t, err)
_, err = m.AsyncInt64().Gauge("test_Async_Gauge")
assert.NoError(t, err)
require.NoError(t, m.RegisterCallback([]instrument.Asynchronous{afcounter}, func(ctx context.Context) {
afcounter.Observe(ctx, 3)
}))
sfcounter, err := m.SyncFloat64().Counter("test_Async_Counter")
require.NoError(t, err)
_, err = m.SyncFloat64().UpDownCounter("test_Async_UpDownCounter")
assert.NoError(t, err)
_, err = m.SyncFloat64().Histogram("test_Async_Histogram")
assert.NoError(t, err)
_, err = m.SyncInt64().Counter("test_Async_Counter")
assert.NoError(t, err)
_, err = m.SyncInt64().UpDownCounter("test_Async_UpDownCounter")
assert.NoError(t, err)
_, err = m.SyncInt64().Histogram("test_Async_Histogram")
assert.NoError(t, err)
return sfcounter, afcounter
}
// This is to emulate a read from an exporter.
func testCollect(t *testing.T, m metric.Meter) {
if tMeter, ok := m.(*meter); ok {
m, ok = tMeter.delegate.Load().(metric.Meter)
if !ok {
t.Error("meter was not delegated")
return
}
}
tMeter, ok := m.(*testMeter)
if !ok {
t.Error("collect called on non-test Meter")
return
}
tMeter.collect()
}
func TestMeterProviderDelegatesCalls(t *testing.T) {
// The global MeterProvider should directly call the underlying MeterProvider
// if it is set prior to Meter() being called.
// globalMeterProvider := otel.GetMeterProvider
globalMeterProvider := &meterProvider{}
mp := &testMeterProvider{}
// otel.SetMeterProvider(mp)
globalMeterProvider.setDelegate(mp)
assert.Equal(t, 0, mp.count)
meter := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test")
ctr, actr := testSetupAllInstrumentTypes(t, meter)
ctr.Add(context.Background(), 5)
testCollect(t, meter) // This is a hacky way to emulate a read from an exporter
// Calls to Meter() after setDelegate() should be executed by the delegate
require.IsType(t, &testMeter{}, meter)
tMeter := meter.(*testMeter)
assert.Equal(t, 3, tMeter.afCount)
assert.Equal(t, 3, tMeter.aiCount)
assert.Equal(t, 3, tMeter.sfCount)
assert.Equal(t, 3, tMeter.siCount)
assert.Equal(t, 1, len(tMeter.callbacks))
// Because the Meter was provided by testmeterProvider it should also return our test instrument
require.IsType(t, &testCountingFloatInstrument{}, ctr, "the meter did not delegate calls to the meter")
assert.Equal(t, 1, ctr.(*testCountingFloatInstrument).count)
require.IsType(t, &testCountingFloatInstrument{}, actr, "the meter did not delegate calls to the meter")
assert.Equal(t, 1, actr.(*testCountingFloatInstrument).count)
assert.Equal(t, 1, mp.count)
}
func TestMeterDelegatesCalls(t *testing.T) {
// The global MeterProvider should directly provide a Meter instance that
// can be updated. If the SetMeterProvider is called after a Meter was
// obtained, but before instruments only the instrument should be generated
// by the delegated type.
globalMeterProvider := &meterProvider{}
mp := &testMeterProvider{}
assert.Equal(t, 0, mp.count)
m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test")
globalMeterProvider.setDelegate(mp)
ctr, actr := testSetupAllInstrumentTypes(t, m)
ctr.Add(context.Background(), 5)
testCollect(t, m) // This is a hacky way to emulate a read from an exporter
// Calls to Meter methods after setDelegate() should be executed by the delegate
require.IsType(t, &meter{}, m)
tMeter := m.(*meter).delegate.Load().(*testMeter)
require.NotNil(t, tMeter)
assert.Equal(t, 3, tMeter.afCount)
assert.Equal(t, 3, tMeter.aiCount)
assert.Equal(t, 3, tMeter.sfCount)
assert.Equal(t, 3, tMeter.siCount)
// Because the Meter was provided by testmeterProvider it should also return our test instrument
require.IsType(t, &testCountingFloatInstrument{}, ctr, "the meter did not delegate calls to the meter")
assert.Equal(t, 1, ctr.(*testCountingFloatInstrument).count)
// Because the Meter was provided by testmeterProvider it should also return our test instrument
require.IsType(t, &testCountingFloatInstrument{}, actr, "the meter did not delegate calls to the meter")
assert.Equal(t, 1, actr.(*testCountingFloatInstrument).count)
assert.Equal(t, 1, mp.count)
}
func TestMeterDefersDelegations(t *testing.T) {
// If SetMeterProvider is called after instruments are registered, the
// instruments should be recreated with the new meter.
// globalMeterProvider := otel.GetMeterProvider
globalMeterProvider := &meterProvider{}
m := globalMeterProvider.Meter("go.opentelemetry.io/otel/metric/internal/global/meter_test")
ctr, actr := testSetupAllInstrumentTypes(t, m)
ctr.Add(context.Background(), 5)
mp := &testMeterProvider{}
// otel.SetMeterProvider(mp)
globalMeterProvider.setDelegate(mp)
testCollect(t, m) // This is a hacky way to emulate a read from an exporter
// Calls to Meter() before setDelegate() should be the delegated type
require.IsType(t, &meter{}, m)
tMeter := m.(*meter).delegate.Load().(*testMeter)
require.NotNil(t, tMeter)
assert.Equal(t, 3, tMeter.afCount)
assert.Equal(t, 3, tMeter.aiCount)
assert.Equal(t, 3, tMeter.sfCount)
assert.Equal(t, 3, tMeter.siCount)
// Because the Meter was a delegate it should return a delegated instrument
assert.IsType(t, &sfCounter{}, ctr)
assert.IsType(t, &afCounter{}, actr)
assert.Equal(t, 1, mp.count)
}

View File

@ -0,0 +1,158 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package global // import "go.opentelemetry.io/otel/metric/internal/global"
import (
"context"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)
type testMeterProvider struct {
count int
}
func (p *testMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter {
p.count++
return &testMeter{}
}
type testMeter struct {
afCount int
aiCount int
sfCount int
siCount int
callbacks []func(context.Context)
}
// AsyncInt64 is the namespace for the Asynchronous Integer instruments.
//
// To Observe data with instruments it must be registered in a callback.
func (m *testMeter) AsyncInt64() asyncint64.InstrumentProvider {
m.aiCount++
return &testAIInstrumentProvider{}
}
// AsyncFloat64 is the namespace for the Asynchronous Float instruments
//
// To Observe data with instruments it must be registered in a callback.
func (m *testMeter) AsyncFloat64() asyncfloat64.InstrumentProvider {
m.afCount++
return &testAFInstrumentProvider{}
}
// RegisterCallback captures the function that will be called during Collect.
//
// It is only valid to call Observe within the scope of the passed function,
// and only on the instruments that were registered with this call.
func (m *testMeter) RegisterCallback(insts []instrument.Asynchronous, function func(context.Context)) error {
m.callbacks = append(m.callbacks, function)
return nil
}
// SyncInt64 is the namespace for the Synchronous Integer instruments
func (m *testMeter) SyncInt64() syncint64.InstrumentProvider {
m.siCount++
return &testSIInstrumentProvider{}
}
// SyncFloat64 is the namespace for the Synchronous Float instruments
func (m *testMeter) SyncFloat64() syncfloat64.InstrumentProvider {
m.sfCount++
return &testSFInstrumentProvider{}
}
// This enables async collection
func (m *testMeter) collect() {
ctx := context.Background()
for _, f := range m.callbacks {
f(ctx)
}
}
type testAFInstrumentProvider struct{}
// Counter creates an instrument for recording increasing values.
func (ip testAFInstrumentProvider) Counter(name string, opts ...instrument.Option) (asyncfloat64.Counter, error) {
return &testCountingFloatInstrument{}, nil
}
// UpDownCounter creates an instrument for recording changes of a value.
func (ip testAFInstrumentProvider) UpDownCounter(name string, opts ...instrument.Option) (asyncfloat64.UpDownCounter, error) {
return &testCountingFloatInstrument{}, nil
}
// Gauge creates an instrument for recording the current value.
func (ip testAFInstrumentProvider) Gauge(name string, opts ...instrument.Option) (asyncfloat64.Gauge, error) {
return &testCountingFloatInstrument{}, nil
}
type testAIInstrumentProvider struct{}
// Counter creates an instrument for recording increasing values.
func (ip testAIInstrumentProvider) Counter(name string, opts ...instrument.Option) (asyncint64.Counter, error) {
return &testCountingIntInstrument{}, nil
}
// UpDownCounter creates an instrument for recording changes of a value.
func (ip testAIInstrumentProvider) UpDownCounter(name string, opts ...instrument.Option) (asyncint64.UpDownCounter, error) {
return &testCountingIntInstrument{}, nil
}
// Gauge creates an instrument for recording the current value.
func (ip testAIInstrumentProvider) Gauge(name string, opts ...instrument.Option) (asyncint64.Gauge, error) {
return &testCountingIntInstrument{}, nil
}
type testSFInstrumentProvider struct{}
// Counter creates an instrument for recording increasing values.
func (ip testSFInstrumentProvider) Counter(name string, opts ...instrument.Option) (syncfloat64.Counter, error) {
return &testCountingFloatInstrument{}, nil
}
// UpDownCounter creates an instrument for recording changes of a value.
func (ip testSFInstrumentProvider) UpDownCounter(name string, opts ...instrument.Option) (syncfloat64.UpDownCounter, error) {
return &testCountingFloatInstrument{}, nil
}
// Histogram creates an instrument for recording a distribution of values.
func (ip testSFInstrumentProvider) Histogram(name string, opts ...instrument.Option) (syncfloat64.Histogram, error) {
return &testCountingFloatInstrument{}, nil
}
type testSIInstrumentProvider struct{}
// Counter creates an instrument for recording increasing values.
func (ip testSIInstrumentProvider) Counter(name string, opts ...instrument.Option) (syncint64.Counter, error) {
return &testCountingIntInstrument{}, nil
}
// UpDownCounter creates an instrument for recording changes of a value.
func (ip testSIInstrumentProvider) UpDownCounter(name string, opts ...instrument.Option) (syncint64.UpDownCounter, error) {
return &testCountingIntInstrument{}, nil
}
// Histogram creates an instrument for recording a distribution of values.
func (ip testSIInstrumentProvider) Histogram(name string, opts ...instrument.Option) (syncint64.Histogram, error) {
return &testCountingIntInstrument{}, nil
}

View File

@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// htmp://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package global // import "go.opentelemetry.io/otel/metric/internal/global"
import (
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/metric"
)
var (
globalMeterProvider = defaultMeterProvider()
delegateMeterOnce sync.Once
)
type meterProviderHolder struct {
mp metric.MeterProvider
}
// MeterProvider is the internal implementation for global.MeterProvider.
func MeterProvider() metric.MeterProvider {
return globalMeterProvider.Load().(meterProviderHolder).mp
}
// SetMeterProvider is the internal implementation for global.SetMeterProvider.
func SetMeterProvider(mp metric.MeterProvider) {
delegateMeterOnce.Do(func() {
current := MeterProvider()
if current == mp {
// Setting the provider to the prior default is nonsense, panic.
// Panic is acceptable because we are likely still early in the
// process lifetime.
panic("invalid MeterProvider, the global instance cannot be reinstalled")
} else if def, ok := current.(*meterProvider); ok {
def.setDelegate(mp)
}
})
globalMeterProvider.Store(meterProviderHolder{mp: mp})
}
func defaultMeterProvider() *atomic.Value {
v := &atomic.Value{}
v.Store(meterProviderHolder{mp: &meterProvider{}})
return v
}

View File

@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// htmp://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package global // import "go.opentelemetry.io/otel/metric/internal/global"
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/metric/nonrecording"
)
func resetGlobalMeterProvider() {
globalMeterProvider = defaultMeterProvider()
delegateMeterOnce = sync.Once{}
}
func TestSetMeterProvider(t *testing.T) {
t.Cleanup(resetGlobalMeterProvider)
t.Run("Set With default panics", func(t *testing.T) {
resetGlobalMeterProvider()
assert.Panics(t, func() {
SetMeterProvider(MeterProvider())
})
})
t.Run("First Set() should replace the delegate", func(t *testing.T) {
resetGlobalMeterProvider()
SetMeterProvider(nonrecording.NewNoopMeterProvider())
_, ok := MeterProvider().(*meterProvider)
if ok {
t.Error("Global Meter Provider was not changed")
return
}
})
t.Run("Set() should delegate existing Meter Providers", func(t *testing.T) {
resetGlobalMeterProvider()
mp := MeterProvider()
SetMeterProvider(nonrecording.NewNoopMeterProvider())
dmp := mp.(*meterProvider)
if dmp.delegate == nil {
t.Error("The delegated meter providers should have a delegate")
}
})
}