1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-05-31 22:49:54 +02:00

Change the Reader.Collect Signature. (#3732)

* Changes the signature of Collect().

This DOES NOT make the SDK reuse memory, but it does enable it to be added.
This commit is contained in:
Aaron Clawson 2023-02-21 09:04:27 -06:00 committed by GitHub
parent ecf0838245
commit cc8bdaaad4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 82 additions and 44 deletions

View File

@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed ### Changed
- [bridge/ot] Fall-back to TextMap carrier when it's not ot.HttpHeaders. (#3679) - [bridge/ot] Fall-back to TextMap carrier when it's not ot.HttpHeaders. (#3679)
- The `Collect` method of the `"go.opentelemetry.io/otel/sdk/metric".Reader` interface is updated to accept the `metricdata.ResourceMetrics` value the collection will be made into. This change is made to enable memory reuse by SDK users. (#3732)
### Fixed ### Fixed

View File

@ -113,7 +113,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
// Collect implements prometheus.Collector. // Collect implements prometheus.Collector.
func (c *collector) Collect(ch chan<- prometheus.Metric) { func (c *collector) Collect(ch chan<- prometheus.Metric) {
metrics, err := c.reader.Collect(context.TODO()) // TODO (#3047): Use a sync.Pool instead of allocating metrics every Collect.
metrics := metricdata.ResourceMetrics{}
err := c.reader.Collect(context.TODO(), &metrics)
if err != nil { if err != nil {
otel.Handle(err) otel.Handle(err)
if err == metric.ErrReaderNotRegistered { if err == metric.ErrReaderNotRegistered {

View File

@ -93,7 +93,7 @@ func BenchmarkCounterCollectOneAttr(b *testing.B) {
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
cntr.Add(ctx, 1, attribute.Int("K", 1)) cntr.Add(ctx, 1, attribute.Int("K", 1))
_, _ = rdr.Collect(ctx) _ = rdr.Collect(ctx, nil)
} }
} }
@ -104,7 +104,7 @@ func BenchmarkCounterCollectTenAttrs(b *testing.B) {
for j := 0; j < 10; j++ { for j := 0; j < 10; j++ {
cntr.Add(ctx, 1, attribute.Int("K", j)) cntr.Add(ctx, 1, attribute.Int("K", j))
} }
_, _ = rdr.Collect(ctx) _ = rdr.Collect(ctx, nil)
} }
} }
@ -140,7 +140,7 @@ func benchCollectHistograms(count int) func(*testing.B) {
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
collectedMetrics, _ = r.Collect(ctx) _ = r.Collect(ctx, &collectedMetrics)
if len(collectedMetrics.ScopeMetrics[0].Metrics) != count { if len(collectedMetrics.ScopeMetrics[0].Metrics) != count {
b.Fatalf("got %d metrics, want %d", len(collectedMetrics.ScopeMetrics[0].Metrics), count) b.Fatalf("got %d metrics, want %d", len(collectedMetrics.ScopeMetrics[0].Metrics), count)
} }

View File

@ -32,7 +32,7 @@ type reader struct {
externalProducers []Producer externalProducers []Producer
temporalityFunc TemporalitySelector temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error) collectFunc func(context.Context, *metricdata.ResourceMetrics) error
forceFlushFunc func(context.Context) error forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error shutdownFunc func(context.Context) error
} }
@ -48,8 +48,8 @@ func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.e
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality { func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind) return r.temporalityFunc(kind)
} }
func (r *reader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { func (r *reader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
return r.collectFunc(ctx) return r.collectFunc(ctx, rm)
} }
func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) } func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) }
func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) } func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) }

View File

@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -112,11 +113,17 @@ func (mr *manualReader) Shutdown(context.Context) error {
} }
// Collect gathers all metrics from the SDK and other Producers, calling any // Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown. // callbacks necessary and stores the result in rm.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { //
// Collect will return an error if called after shutdown.
// Collect will return an error if rm is a nil ResourceMetrics.
func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if rm == nil {
return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
}
p := mr.sdkProducer.Load() p := mr.sdkProducer.Load()
if p == nil { if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered return ErrReaderNotRegistered
} }
ph, ok := p.(produceHolder) ph, ok := p.(produceHolder)
@ -126,12 +133,13 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
// happen, return an error instead of panicking so a users code does // happen, return an error instead of panicking so a users code does
// not halt in the processes. // not halt in the processes.
err := fmt.Errorf("manual reader: invalid producer: %T", p) err := fmt.Errorf("manual reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err return err
} }
// TODO (#3047): When produce is updated to accept output as param, pass rm.
rm, err := ph.produce(ctx) rmTemp, err := ph.produce(ctx)
*rm = rmTemp
if err != nil { if err != nil {
return metricdata.ResourceMetrics{}, err return err
} }
var errs []error var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) { for _, producer := range mr.externalProducers.Load().([]Producer) {
@ -141,7 +149,7 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
} }
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
} }
return rm, unifyErrors(errs) return unifyErrors(errs)
} }
// manualReaderConfig contains configuration options for a ManualReader. // manualReaderConfig contains configuration options for a ManualReader.

View File

@ -472,7 +472,8 @@ func TestMeterCreatesInstruments(t *testing.T) {
tt.fn(t, m) tt.fn(t, m)
rm, err := rdr.Collect(context.Background()) rm := metricdata.ResourceMetrics{}
err := rdr.Collect(context.Background(), &rm)
assert.NoError(t, err) assert.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1) require.Len(t, rm.ScopeMetrics, 1)
@ -566,7 +567,7 @@ func TestCallbackObserverNonRegistered(t *testing.T) {
var got metricdata.ResourceMetrics var got metricdata.ResourceMetrics
assert.NotPanics(t, func() { assert.NotPanics(t, func() {
got, err = rdr.Collect(context.Background()) err = rdr.Collect(context.Background(), &got)
}) })
assert.NoError(t, err) assert.NoError(t, err)
@ -660,7 +661,8 @@ func TestGlobalInstRegisterCallback(t *testing.T) {
_, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr) _, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr)
assert.NoError(t, err) assert.NoError(t, err)
got, err := rdr.Collect(context.Background()) got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err) assert.NoError(t, err)
assert.Lenf(t, l.messages, 0, "Warnings and errors logged:\n%s", l) assert.Lenf(t, l.messages, 0, "Warnings and errors logged:\n%s", l)
metricdatatest.AssertEqual(t, metricdata.ResourceMetrics{ metricdatatest.AssertEqual(t, metricdata.ResourceMetrics{
@ -772,7 +774,8 @@ func TestMetersProvideScope(t *testing.T) {
}, },
} }
got, err := rdr.Collect(context.Background()) got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err) assert.NoError(t, err)
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
} }
@ -816,14 +819,14 @@ func TestUnregisterUnregisters(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
ctx := context.Background() ctx := context.Background()
_, err = r.Collect(ctx) err = r.Collect(ctx, &metricdata.ResourceMetrics{})
require.NoError(t, err) require.NoError(t, err)
assert.True(t, called, "callback not called for registered callback") assert.True(t, called, "callback not called for registered callback")
called = false called = false
require.NoError(t, reg.Unregister(), "unregister") require.NoError(t, reg.Unregister(), "unregister")
_, err = r.Collect(ctx) err = r.Collect(ctx, &metricdata.ResourceMetrics{})
require.NoError(t, err) require.NoError(t, err)
assert.False(t, called, "callback called for unregistered callback") assert.False(t, called, "callback called for unregistered callback")
} }
@ -869,7 +872,8 @@ func TestRegisterCallbackDropAggregations(t *testing.T) {
) )
require.NoError(t, err) require.NoError(t, err)
data, err := r.Collect(context.Background()) data := metricdata.ResourceMetrics{}
err = r.Collect(context.Background(), &data)
require.NoError(t, err) require.NoError(t, err)
assert.False(t, called, "callback called for all drop instruments") assert.False(t, called, "callback called for all drop instruments")
@ -1238,7 +1242,8 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
).Meter("TestAttributeFilter") ).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr)) require.NoError(t, tt.register(t, mtr))
m, err := rdr.Collect(context.Background()) m := metricdata.ResourceMetrics{}
err := rdr.Collect(context.Background(), &m)
assert.NoError(t, err) assert.NoError(t, err)
require.Len(t, m.ScopeMetrics, 1) require.Len(t, m.ScopeMetrics, 1)
@ -1331,7 +1336,8 @@ func TestAsynchronousExample(t *testing.T) {
collect := func(t *testing.T) { collect := func(t *testing.T) {
t.Helper() t.Helper()
got, err := reader.Collect(context.Background()) got := metricdata.ResourceMetrics{}
err := reader.Collect(context.Background(), &got)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1) require.Len(t, got.ScopeMetrics, 1)
metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp())

View File

@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -206,21 +207,29 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio
// collectAndExport gather all metric data related to the periodicReader r from // collectAndExport gather all metric data related to the periodicReader r from
// the SDK and exports it with r's exporter. // the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error { func (r *periodicReader) collectAndExport(ctx context.Context) error {
m, err := r.Collect(ctx) // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := metricdata.ResourceMetrics{}
err := r.Collect(ctx, &rm)
if err == nil { if err == nil {
err = r.export(ctx, m) err = r.export(ctx, rm)
} }
return err return err
} }
// Collect gathers and returns all metric data related to the Reader from // Collect gathers and returns all metric data related to the Reader from
// the SDK and other Producers. The returned metric data is not exported // the SDK and other Producers and stores the result in rm. The returned metric
// to the configured exporter, it is left to the caller to handle that if // data is not exported to the configured exporter, it is left to the caller to
// desired. // handle that if desired.
// //
// An error is returned if this is called after Shutdown. // An error is returned if this is called after Shutdown. An error is return if rm is nil.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
return r.collect(ctx, r.sdkProducer.Load()) if rm == nil {
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
// TODO (#3047): When collect is updated to accept output as param, pass rm.
rmTemp, err := r.collect(ctx, r.sdkProducer.Load())
*rm = rmTemp
return err
} }
// collect unwraps p as a produceHolder and returns its produce results. // collect unwraps p as a produceHolder and returns its produce results.

View File

@ -65,8 +65,9 @@ type Reader interface {
aggregation(InstrumentKind) aggregation.Aggregation // nolint:revive // import-shadow for method scoped by type. aggregation(InstrumentKind) aggregation.Aggregation // nolint:revive // import-shadow for method scoped by type.
// Collect gathers and returns all metric data related to the Reader from // Collect gathers and returns all metric data related to the Reader from
// the SDK. An error is returned if this is called after Shutdown. // the SDK and stores it in out. An error is returned if this is called
Collect(context.Context) (metricdata.ResourceMetrics, error) // after Shutdown or if out is nil.
Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error
// ForceFlush flushes all metric measurements held in an export pipeline. // ForceFlush flushes all metric measurements held in an export pipeline.
// //

View File

@ -53,13 +53,14 @@ func (ts *readerTestSuite) TearDownTest() {
} }
func (ts *readerTestSuite) TestErrorForNotRegistered() { func (ts *readerTestSuite) TestErrorForNotRegistered() {
_, err := ts.Reader.Collect(context.Background()) err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{})
ts.ErrorIs(err, ErrReaderNotRegistered) ts.ErrorIs(err, ErrReaderNotRegistered)
} }
func (ts *readerTestSuite) TestSDKProducer() { func (ts *readerTestSuite) TestSDKProducer() {
ts.Reader.register(testSDKProducer{}) ts.Reader.register(testSDKProducer{})
m, err := ts.Reader.Collect(context.Background()) m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.NoError(err) ts.NoError(err)
ts.Equal(testResourceMetricsA, m) ts.Equal(testResourceMetricsA, m)
} }
@ -67,7 +68,8 @@ func (ts *readerTestSuite) TestSDKProducer() {
func (ts *readerTestSuite) TestExternalProducer() { func (ts *readerTestSuite) TestExternalProducer() {
ts.Reader.register(testSDKProducer{}) ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{}) ts.Reader.RegisterProducer(testExternalProducer{})
m, err := ts.Reader.Collect(context.Background()) m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.NoError(err) ts.NoError(err)
ts.Equal(testResourceMetricsAB, m) ts.Equal(testResourceMetricsAB, m)
} }
@ -78,7 +80,8 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() {
ts.Reader.RegisterProducer(testExternalProducer{}) ts.Reader.RegisterProducer(testExternalProducer{})
ts.Require().NoError(ts.Reader.Shutdown(ctx)) ts.Require().NoError(ts.Reader.Shutdown(ctx))
m, err := ts.Reader.Collect(ctx) m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(ctx, &m)
ts.ErrorIs(err, ErrReaderShutdown) ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(metricdata.ResourceMetrics{}, m) ts.Equal(metricdata.ResourceMetrics{}, m)
} }
@ -113,7 +116,7 @@ func (ts *readerTestSuite) TestMultipleRegister() {
// This should be ignored. // This should be ignored.
ts.Reader.register(p1) ts.Reader.register(p1)
_, err := ts.Reader.Collect(context.Background()) err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{})
ts.Equal(assert.AnError, err) ts.Equal(assert.AnError, err)
} }
@ -134,7 +137,8 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() {
}, },
) )
m, err := ts.Reader.Collect(context.Background()) m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.Equal(assert.AnError, err) ts.Equal(assert.AnError, err)
ts.Equal(testResourceMetricsAB, m) ts.Equal(testResourceMetricsAB, m)
} }
@ -146,7 +150,8 @@ func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() {
}}) }})
ts.Reader.RegisterProducer(testExternalProducer{}) ts.Reader.RegisterProducer(testExternalProducer{})
m, err := ts.Reader.Collect(context.Background()) m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(context.Background(), &m)
ts.Equal(assert.AnError, err) ts.Equal(assert.AnError, err)
ts.Equal(metricdata.ResourceMetrics{}, m) ts.Equal(metricdata.ResourceMetrics{}, m)
} }
@ -165,7 +170,7 @@ func (ts *readerTestSuite) TestMethodConcurrency() {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
_, _ = ts.Reader.Collect(ctx) _ = ts.Reader.Collect(ctx, nil)
}() }()
wg.Add(1) wg.Add(1)
@ -190,11 +195,17 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() {
ts.Reader.register(testSDKProducer{}) ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{}) ts.Reader.RegisterProducer(testExternalProducer{})
m, err := ts.Reader.Collect(ctx) m := metricdata.ResourceMetrics{}
err := ts.Reader.Collect(ctx, &m)
ts.ErrorIs(err, ErrReaderShutdown) ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(metricdata.ResourceMetrics{}, m) ts.Equal(metricdata.ResourceMetrics{}, m)
} }
func (ts *readerTestSuite) TestCollectNilResourceMetricError() {
ctx := context.Background()
ts.Assert().Error(ts.Reader.Collect(ctx, nil))
}
var testScopeMetricsA = metricdata.ScopeMetrics{ var testScopeMetricsA = metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"}, Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"},
Metrics: []metricdata.Metrics{{ Metrics: []metricdata.Metrics{{
@ -279,7 +290,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) {
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
collectedMetrics, err = r.Collect(ctx) err = r.Collect(ctx, &collectedMetrics)
assert.Equalf(b, testResourceMetricsA, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) assert.Equalf(b, testResourceMetricsA, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err)
} }
} }