mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-03-17 20:57:51 +02:00
SetMeterProvider
might miss the delegation for instruments and registries (#5780)
Closes #5757 This PR fixes an issue where `SetMeterProvider` might miss the delegation for instruments and registries. This bug brings a concurrent issue that could possibly make instruments and registries unable to operate correctly, such as recording, after using the `SetMeterProvider` method. The data put on these instruments and registries might be lost.
This commit is contained in:
parent
9e1b015159
commit
b37e8a9860
@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
|
||||
- Fix memory leak in the global `MeterProvider` when identical instruments are repeatedly created. (#5754)
|
||||
- Fix panic instruments creation when setting meter provider. (#5758)
|
||||
- Fix an issue where `SetMeterProvider` in `go.opentelemetry.io/otel` might miss the delegation for instruments and registries. (#5780)
|
||||
|
||||
### Removed
|
||||
|
||||
|
@ -7,7 +7,6 @@ import (
|
||||
"container/list"
|
||||
"reflect"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/embedded"
|
||||
@ -97,7 +96,7 @@ type meter struct {
|
||||
|
||||
registry list.List
|
||||
|
||||
delegate atomic.Value // metric.Meter
|
||||
delegate metric.Meter
|
||||
}
|
||||
|
||||
type delegatedInstrument interface {
|
||||
@ -123,12 +122,12 @@ type instID struct {
|
||||
//
|
||||
// It is guaranteed by the caller that this happens only once.
|
||||
func (m *meter) setDelegate(provider metric.MeterProvider) {
|
||||
meter := provider.Meter(m.name, m.opts...)
|
||||
m.delegate.Store(meter)
|
||||
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
meter := provider.Meter(m.name, m.opts...)
|
||||
m.delegate = meter
|
||||
|
||||
for _, inst := range m.instruments {
|
||||
inst.setDelegate(meter)
|
||||
}
|
||||
@ -141,16 +140,18 @@ func (m *meter) setDelegate(provider metric.MeterProvider) {
|
||||
m.registry.Remove(e)
|
||||
}
|
||||
|
||||
clear(m.instruments)
|
||||
m.instruments = nil
|
||||
m.registry.Init()
|
||||
}
|
||||
|
||||
func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64Counter(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Int64Counter(name, options...)
|
||||
}
|
||||
|
||||
i := &siCounter{name: name, opts: options}
|
||||
cfg := metric.NewInt64CounterConfig(options...)
|
||||
id := instID{
|
||||
@ -164,11 +165,13 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption)
|
||||
}
|
||||
|
||||
func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64UpDownCounter(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Int64UpDownCounter(name, options...)
|
||||
}
|
||||
|
||||
i := &siUpDownCounter{name: name, opts: options}
|
||||
cfg := metric.NewInt64UpDownCounterConfig(options...)
|
||||
id := instID{
|
||||
@ -182,11 +185,13 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
|
||||
}
|
||||
|
||||
func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64Histogram(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Int64Histogram(name, options...)
|
||||
}
|
||||
|
||||
i := &siHistogram{name: name, opts: options}
|
||||
cfg := metric.NewInt64HistogramConfig(options...)
|
||||
id := instID{
|
||||
@ -200,11 +205,13 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
|
||||
}
|
||||
|
||||
func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64Gauge(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Int64Gauge(name, options...)
|
||||
}
|
||||
|
||||
i := &siGauge{name: name, opts: options}
|
||||
cfg := metric.NewInt64GaugeConfig(options...)
|
||||
id := instID{
|
||||
@ -218,11 +225,13 @@ func (m *meter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (met
|
||||
}
|
||||
|
||||
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64ObservableCounter(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Int64ObservableCounter(name, options...)
|
||||
}
|
||||
|
||||
i := &aiCounter{name: name, opts: options}
|
||||
cfg := metric.NewInt64ObservableCounterConfig(options...)
|
||||
id := instID{
|
||||
@ -236,11 +245,13 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser
|
||||
}
|
||||
|
||||
func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64ObservableUpDownCounter(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Int64ObservableUpDownCounter(name, options...)
|
||||
}
|
||||
|
||||
i := &aiUpDownCounter{name: name, opts: options}
|
||||
cfg := metric.NewInt64ObservableUpDownCounterConfig(options...)
|
||||
id := instID{
|
||||
@ -254,11 +265,13 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6
|
||||
}
|
||||
|
||||
func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Int64ObservableGauge(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Int64ObservableGauge(name, options...)
|
||||
}
|
||||
|
||||
i := &aiGauge{name: name, opts: options}
|
||||
cfg := metric.NewInt64ObservableGaugeConfig(options...)
|
||||
id := instID{
|
||||
@ -272,11 +285,13 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa
|
||||
}
|
||||
|
||||
func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64Counter(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Float64Counter(name, options...)
|
||||
}
|
||||
|
||||
i := &sfCounter{name: name, opts: options}
|
||||
cfg := metric.NewFloat64CounterConfig(options...)
|
||||
id := instID{
|
||||
@ -290,11 +305,13 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti
|
||||
}
|
||||
|
||||
func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64UpDownCounter(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Float64UpDownCounter(name, options...)
|
||||
}
|
||||
|
||||
i := &sfUpDownCounter{name: name, opts: options}
|
||||
cfg := metric.NewFloat64UpDownCounterConfig(options...)
|
||||
id := instID{
|
||||
@ -308,11 +325,13 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
|
||||
}
|
||||
|
||||
func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64Histogram(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Float64Histogram(name, options...)
|
||||
}
|
||||
|
||||
i := &sfHistogram{name: name, opts: options}
|
||||
cfg := metric.NewFloat64HistogramConfig(options...)
|
||||
id := instID{
|
||||
@ -326,11 +345,13 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
|
||||
}
|
||||
|
||||
func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption) (metric.Float64Gauge, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64Gauge(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Float64Gauge(name, options...)
|
||||
}
|
||||
|
||||
i := &sfGauge{name: name, opts: options}
|
||||
cfg := metric.NewFloat64GaugeConfig(options...)
|
||||
id := instID{
|
||||
@ -344,11 +365,13 @@ func (m *meter) Float64Gauge(name string, options ...metric.Float64GaugeOption)
|
||||
}
|
||||
|
||||
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64ObservableCounter(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Float64ObservableCounter(name, options...)
|
||||
}
|
||||
|
||||
i := &afCounter{name: name, opts: options}
|
||||
cfg := metric.NewFloat64ObservableCounterConfig(options...)
|
||||
id := instID{
|
||||
@ -362,11 +385,13 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O
|
||||
}
|
||||
|
||||
func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64ObservableUpDownCounter(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Float64ObservableUpDownCounter(name, options...)
|
||||
}
|
||||
|
||||
i := &afUpDownCounter{name: name, opts: options}
|
||||
cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...)
|
||||
id := instID{
|
||||
@ -380,11 +405,13 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl
|
||||
}
|
||||
|
||||
func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
return del.Float64ObservableGauge(name, options...)
|
||||
}
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
return m.delegate.Float64ObservableGauge(name, options...)
|
||||
}
|
||||
|
||||
i := &afGauge{name: name, opts: options}
|
||||
cfg := metric.NewFloat64ObservableGaugeConfig(options...)
|
||||
id := instID{
|
||||
@ -399,14 +426,14 @@ func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64Obs
|
||||
|
||||
// RegisterCallback captures the function that will be called during Collect.
|
||||
func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) (metric.Registration, error) {
|
||||
if del, ok := m.delegate.Load().(metric.Meter); ok {
|
||||
insts = unwrapInstruments(insts)
|
||||
return del.RegisterCallback(f, insts...)
|
||||
}
|
||||
|
||||
m.mtx.Lock()
|
||||
defer m.mtx.Unlock()
|
||||
|
||||
if m.delegate != nil {
|
||||
insts = unwrapInstruments(insts)
|
||||
return m.delegate.RegisterCallback(f, insts...)
|
||||
}
|
||||
|
||||
reg := ®istration{instruments: insts, function: f}
|
||||
e := m.registry.PushBack(reg)
|
||||
reg.unreg = func() error {
|
||||
|
@ -83,6 +83,12 @@ func TestMeterConcurrentSafe(t *testing.T) {
|
||||
mtr.setDelegate(noop.NewMeterProvider())
|
||||
close(finish)
|
||||
<-done
|
||||
|
||||
// No instruments should be left after the meter is replaced.
|
||||
assert.Empty(t, mtr.instruments)
|
||||
|
||||
// No callbacks should be left after the meter is replaced.
|
||||
assert.Zero(t, mtr.registry.Len())
|
||||
}
|
||||
|
||||
func TestUnregisterConcurrentSafe(t *testing.T) {
|
||||
@ -162,8 +168,9 @@ func testSetupAllInstrumentTypes(t *testing.T, m metric.Meter) (metric.Float64Co
|
||||
// This is to emulate a read from an exporter.
|
||||
func testCollect(t *testing.T, m metric.Meter) {
|
||||
if tMeter, ok := m.(*meter); ok {
|
||||
m, ok = tMeter.delegate.Load().(metric.Meter)
|
||||
if !ok {
|
||||
// This changes the input m to the delegate.
|
||||
m = tMeter.delegate
|
||||
if m == nil {
|
||||
t.Error("meter was not delegated")
|
||||
return
|
||||
}
|
||||
@ -261,7 +268,7 @@ func TestMeterDelegatesCalls(t *testing.T) {
|
||||
|
||||
// Calls to Meter methods after setDelegate() should be executed by the delegate
|
||||
require.IsType(t, &meter{}, m)
|
||||
tMeter := m.(*meter).delegate.Load().(*testMeter)
|
||||
tMeter := m.(*meter).delegate.(*testMeter)
|
||||
require.NotNil(t, tMeter)
|
||||
assert.Equal(t, 1, tMeter.afCount)
|
||||
assert.Equal(t, 1, tMeter.afUDCount)
|
||||
@ -309,7 +316,7 @@ func TestMeterDefersDelegations(t *testing.T) {
|
||||
|
||||
// Calls to Meter() before setDelegate() should be the delegated type
|
||||
require.IsType(t, &meter{}, m)
|
||||
tMeter := m.(*meter).delegate.Load().(*testMeter)
|
||||
tMeter := m.(*meter).delegate.(*testMeter)
|
||||
require.NotNil(t, tMeter)
|
||||
assert.Equal(t, 1, tMeter.afCount)
|
||||
assert.Equal(t, 1, tMeter.afUDCount)
|
||||
|
Loading…
x
Reference in New Issue
Block a user