1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-03 22:52:30 +02:00

Cache instruments so repeatedly creating identical instruments doesn't leak memory (#4820)

* cache instruments to avoid leaking memory

* add cacheWithErr to simplify error handling

* add wanring on repeated obserbable instrument creation with callbacks

* documentation for new behavior

* address feedback

---------

Co-authored-by: Damien Mathieu <damien.mathieu@elastic.co>
This commit is contained in:
David Ashpole 2024-01-24 10:42:43 -05:00 committed by GitHub
parent cef39a1e17
commit 1978044c55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 284 additions and 50 deletions

View File

@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Fixed
- Fix `ContainerID` resource detection on systemd when cgroup path has a colon. (#4449)
- Fix `go.opentelemetry.io/otel/sdk/metric` to cache instruments to avoid leaking memory when the same instrument is created multiple times. (#4820)
## [1.23.0-rc.1] 2024-01-18

View File

@ -52,3 +52,43 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.data[key] = val
return val
}
// HasKey returns true if Lookup has previously been called with that key
//
// HasKey is safe to call concurrently.
func (c *cache[K, V]) HasKey(key K) bool {
c.Lock()
defer c.Unlock()
_, ok := c.data[key]
return ok
}
// cacheWithErr is a locking storage used to quickly return already computed values and an error.
//
// The zero value of a cacheWithErr is empty and ready to use.
//
// A cacheWithErr must not be copied after first use.
//
// All methods of a cacheWithErr are safe to call concurrently.
type cacheWithErr[K comparable, V any] struct {
cache[K, valAndErr[V]]
}
type valAndErr[V any] struct {
val V
err error
}
// Lookup returns the value stored in the cacheWithErr with the associated key
// if it exists. Otherwise, f is called and its returned value is set in the
// cacheWithErr for key and returned.
//
// Lookup is safe to call concurrently. It will hold the cacheWithErr lock, so f
// should not block excessively.
func (c *cacheWithErr[K, V]) Lookup(key K, f func() (V, error)) (V, error) {
combined := c.cache.Lookup(key, func() valAndErr[V] {
val, err := f()
return valAndErr[V]{val: val, err: err}
})
return combined.val, combined.err
}

View File

@ -41,6 +41,11 @@ type meter struct {
scope instrumentation.Scope
pipes pipelines
int64Insts *cacheWithErr[instID, *int64Inst]
float64Insts *cacheWithErr[instID, *float64Inst]
int64ObservableInsts *cacheWithErr[instID, int64Observable]
float64ObservableInsts *cacheWithErr[instID, float64Observable]
int64Resolver resolver[int64]
float64Resolver resolver[float64]
}
@ -50,11 +55,20 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
// meter is asked to create are logged to the user.
var viewCache cache[string, instID]
var int64Insts cacheWithErr[instID, *int64Inst]
var float64Insts cacheWithErr[instID, *float64Inst]
var int64ObservableInsts cacheWithErr[instID, int64Observable]
var float64ObservableInsts cacheWithErr[instID, float64Observable]
return &meter{
scope: s,
pipes: p,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
scope: s,
pipes: p,
int64Insts: &int64Insts,
float64Insts: &float64Insts,
int64ObservableInsts: &int64ObservableInsts,
float64ObservableInsts: &float64ObservableInsts,
int64Resolver: newResolver[int64](p, &viewCache),
float64Resolver: newResolver[float64](p, &viewCache),
}
}
@ -108,32 +122,48 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti
// int64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) int64ObservableInstrument(id Instrument, callbacks []metric.Int64Callback) (int64Observable, error) {
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
key := instID{
Name: id.Name,
Description: id.Description,
Unit: id.Unit,
Kind: id.Kind,
}
return inst, validateInstrumentName(id.Name)
if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 {
warnRepeatedObservableCallbacks(id)
}
return m.int64ObservableInsts.Lookup(key, func() (int64Observable, error) {
inst := newInt64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.int64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := int64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
})
}
// Int64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing int64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
//
// If Int64ObservableCounter is invoked repeatedly with the same Name,
// Description, and Unit, only the first set of callbacks provided are used.
// Use meter.RegisterCallback and Registration.Unregister to manage callbacks
// if instrumentation can be created multiple times with different callbacks.
func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
cfg := metric.NewInt64ObservableCounterConfig(options...)
id := Instrument{
@ -225,32 +255,48 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram
// float64ObservableInstrument returns a new observable identified by the Instrument.
// It registers callbacks for each reader's pipeline.
func (m *meter) float64ObservableInstrument(id Instrument, callbacks []metric.Float64Callback) (float64Observable, error) {
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.float64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
key := instID{
Name: id.Name,
Description: id.Description,
Unit: id.Unit,
Kind: id.Kind,
}
return inst, validateInstrumentName(id.Name)
if m.int64ObservableInsts.HasKey(key) && len(callbacks) > 0 {
warnRepeatedObservableCallbacks(id)
}
return m.float64ObservableInsts.Lookup(key, func() (float64Observable, error) {
inst := newFloat64Observable(m, id.Kind, id.Name, id.Description, id.Unit)
for _, insert := range m.float64Resolver.inserters {
// Connect the measure functions for instruments in this pipeline with the
// callbacks for this pipeline.
in, err := insert.Instrument(id, insert.readerDefaultAggregation(id.Kind))
if err != nil {
return inst, err
}
// Drop aggregation
if len(in) == 0 {
inst.dropAggregation = true
continue
}
inst.appendMeasures(in)
for _, cback := range callbacks {
inst := float64Observer{measures: in}
insert.addCallback(func(ctx context.Context) error { return cback(ctx, inst) })
}
}
return inst, validateInstrumentName(id.Name)
})
}
// Float64ObservableCounter returns a new instrument identified by name and
// configured with options. The instrument is used to asynchronously record
// increasing float64 measurements once per a measurement collection cycle.
// Only the measurements recorded during the collection cycle are exported.
//
// If Float64ObservableCounter is invoked repeatedly with the same Name,
// Description, and Unit, only the first set of callbacks provided are used.
// Use meter.RegisterCallback and Registration.Unregister to manage callbacks
// if instrumentation can be created multiple times with different callbacks.
func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
cfg := metric.NewFloat64ObservableCounterConfig(options...)
id := Instrument{
@ -324,6 +370,16 @@ func isAlphanumeric(c rune) bool {
return isAlpha(c) || ('0' <= c && c <= '9')
}
func warnRepeatedObservableCallbacks(id Instrument) {
inst := fmt.Sprintf(
"Instrument{Name: %q, Description: %q, Kind: %q, Unit: %q}",
id.Name, id.Description, "InstrumentKind"+id.Kind.String(), id.Unit,
)
global.Warn("Repeated observable instrument creation with callbacks. Ignoring new callbacks. Use meter.RegisterCallback and Registration.Unregister to manage callbacks.",
"instrument", inst,
)
}
// RegisterCallback registers f to be called each collection cycle so it will
// make observations for insts during those cycles.
//
@ -529,14 +585,28 @@ func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramC
// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
return p.meter.int64Insts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
})
}
// lookupHistogram returns the resolved instrumentImpl.
func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
return p.meter.int64Insts.Lookup(instID{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
}, func() (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
})
}
// float64InstProvider provides float64 OpenTelemetry instruments.
@ -573,14 +643,28 @@ func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64Histog
// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
return p.meter.float64Insts.Lookup(instID{
Name: name,
Description: desc,
Unit: u,
Kind: kind,
}, func() (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
})
}
// lookupHistogram returns the resolved instrumentImpl.
func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
return p.meter.float64Insts.Lookup(instID{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
}, func() (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
})
}
type int64Observer struct {

View File

@ -2272,3 +2272,112 @@ func TestObservableDropAggregation(t *testing.T) {
})
}
}
func TestDuplicateInstrumentCreation(t *testing.T) {
for _, tt := range []struct {
desc string
createInstrument func(metric.Meter) error
}{
{
desc: "Int64ObservableCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64ObservableCounter("observable.int64.counter")
return err
},
},
{
desc: "Int64ObservableUpDownCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64ObservableUpDownCounter("observable.int64.up.down.counter")
return err
},
},
{
desc: "Int64ObservableGauge",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64ObservableGauge("observable.int64.gauge")
return err
},
},
{
desc: "Float64ObservableCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64ObservableCounter("observable.float64.counter")
return err
},
},
{
desc: "Float64ObservableUpDownCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64ObservableUpDownCounter("observable.float64.up.down.counter")
return err
},
},
{
desc: "Float64ObservableGauge",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64ObservableGauge("observable.float64.gauge")
return err
},
},
{
desc: "Int64Counter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64Counter("sync.int64.counter")
return err
},
},
{
desc: "Int64UpDownCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64UpDownCounter("sync.int64.up.down.counter")
return err
},
},
{
desc: "Int64Histogram",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Int64Histogram("sync.int64.histogram")
return err
},
},
{
desc: "Float64Counter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64Counter("sync.float64.counter")
return err
},
},
{
desc: "Float64UpDownCounter",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64UpDownCounter("sync.float64.up.down.counter")
return err
},
},
{
desc: "Float64Histogram",
createInstrument: func(meter metric.Meter) error {
_, err := meter.Float64Histogram("sync.float64.histogram")
return err
},
},
} {
t.Run(tt.desc, func(t *testing.T) {
reader := NewManualReader()
defer func() {
require.NoError(t, reader.Shutdown(context.Background()))
}()
m := NewMeterProvider(WithReader(reader)).Meter("TestDuplicateInstrumentCreation")
for i := 0; i < 3; i++ {
require.NoError(t, tt.createInstrument(m))
}
internalMeter, ok := m.(*meter)
require.True(t, ok)
// check that multiple calls to create the same instrument only create 1 instrument
numInstruments := len(internalMeter.int64Insts.data) + len(internalMeter.float64Insts.data) + len(internalMeter.int64ObservableInsts.data) + len(internalMeter.float64ObservableInsts.data)
require.Equal(t, 1, numInstruments)
})
}
}