1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-05-19 22:13:08 +02:00

Reuse memory in metric pipelines (#3760)

* Have pipelines reuse memory

* truncate Metric slice

* Apply suggestions from code review

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Use rm pool on periodic shutdown.

* zero out RM on ctx error

* Update sdk/metric/pipeline.go

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Peter Liu <lpfvip2008@gmail.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Fix lint

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Peter Liu <lpfvip2008@gmail.com>
Co-authored-by: Tyler Yahn <codingalias@gmail.com>
This commit is contained in:
Aaron Clawson 2023-03-09 11:43:16 -06:00 committed by GitHub
parent 7dc7b30405
commit e463505da7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 99 additions and 57 deletions

View File

@ -0,0 +1,24 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal // import "go.opentelemetry.io/otel/sdk/metric/internal"
// ReuseSlice returns a zeroed view of slice if its capacity is greater than or
// equal to n. Otherwise, it returns a new []T with capacity equal to n.
func ReuseSlice[T any](slice []T, n int) []T {
if cap(slice) >= n {
return slice[:n]
}
return make([]T, n)
}

View File

@ -135,9 +135,8 @@ func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetr
err := fmt.Errorf("manual reader: invalid producer: %T", p)
return err
}
// TODO (#3047): When produce is updated to accept output as param, pass rm.
rmTemp, err := ph.produce(ctx)
*rm = rmTemp
err := ph.produce(ctx, rm)
if err != nil {
return err
}

View File

@ -120,6 +120,10 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
rmPool: sync.Pool{
New: func() interface{} {
return &metricdata.ResourceMetrics{}
}},
}
r.externalProducers.Store([]Producer{})
@ -147,6 +151,8 @@ type periodicReader struct {
done chan struct{}
cancel context.CancelFunc
shutdownOnce sync.Once
rmPool sync.Pool
}
// Compile time check the periodicReader implements Reader and is comparable.
@ -214,11 +220,12 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio
// the SDK and exports it with r's exporter.
func (r *periodicReader) collectAndExport(ctx context.Context) error {
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
rm := metricdata.ResourceMetrics{}
err := r.Collect(ctx, &rm)
rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
err := r.Collect(ctx, rm)
if err == nil {
err = r.export(ctx, rm)
err = r.export(ctx, *rm)
}
r.rmPool.Put(rm)
return err
}
@ -233,15 +240,13 @@ func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMet
return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
}
// TODO (#3047): When collect is updated to accept output as param, pass rm.
rmTemp, err := r.collect(ctx, r.sdkProducer.Load())
*rm = rmTemp
return err
return r.collect(ctx, r.sdkProducer.Load(), rm)
}
// collect unwraps p as a produceHolder and returns its produce results.
func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata.ResourceMetrics, error) {
func (r *periodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
return ErrReaderNotRegistered
}
ph, ok := p.(produceHolder)
@ -251,12 +256,12 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
return err
}
rm, err := ph.produce(ctx)
err := ph.produce(ctx, rm)
if err != nil {
return metricdata.ResourceMetrics{}, err
return err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
@ -266,7 +271,7 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
return unifyErrors(errs)
}
// export exports metric data m using r's exporter.
@ -313,11 +318,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
if ph != nil { // Reader was registered.
// Flush pending telemetry.
var m metricdata.ResourceMetrics
m, err = r.collect(ctx, ph)
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
err = r.collect(ctx, ph, m)
if err == nil {
err = r.export(ctx, m)
err = r.export(ctx, *m)
}
r.rmPool.Put(m)
}
sErr := r.exporter.Shutdown(ctx)

View File

@ -121,7 +121,7 @@ func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
p.Lock()
defer p.Unlock()
@ -132,7 +132,9 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
errs.append(err)
}
if err := ctx.Err(); err != nil {
return metricdata.ResourceMetrics{}, err
rm.Resource = nil
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
@ -143,36 +145,39 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
}
if err := ctx.Err(); err != nil {
// This means the context expired before we finished running callbacks.
return metricdata.ResourceMetrics{}, err
rm.Resource = nil
rm.ScopeMetrics = rm.ScopeMetrics[:0]
return err
}
}
sm := make([]metricdata.ScopeMetrics, 0, len(p.aggregations))
rm.Resource = p.resource
rm.ScopeMetrics = internal.ReuseSlice(rm.ScopeMetrics, len(p.aggregations))
i := 0
for scope, instruments := range p.aggregations {
metrics := make([]metricdata.Metrics, 0, len(instruments))
rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments))
j := 0
for _, inst := range instruments {
data := inst.aggregator.Aggregation()
if data != nil {
metrics = append(metrics, metricdata.Metrics{
Name: inst.name,
Description: inst.description,
Unit: inst.unit,
Data: data,
})
rm.ScopeMetrics[i].Metrics[j].Name = inst.name
rm.ScopeMetrics[i].Metrics[j].Description = inst.description
rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit
rm.ScopeMetrics[i].Metrics[j].Data = data
j++
}
}
if len(metrics) > 0 {
sm = append(sm, metricdata.ScopeMetrics{
Scope: scope,
Metrics: metrics,
})
rm.ScopeMetrics[i].Metrics = rm.ScopeMetrics[i].Metrics[:j]
if len(rm.ScopeMetrics[i].Metrics) > 0 {
rm.ScopeMetrics[i].Scope = scope
i++
}
}
return metricdata.ResourceMetrics{
Resource: p.resource,
ScopeMetrics: sm,
}, errs.errorOrNil()
rm.ScopeMetrics = rm.ScopeMetrics[:i]
return errs.errorOrNil()
}
// inserter facilitates inserting of new instruments from a single scope into a

View File

@ -42,7 +42,8 @@ func (testSumAggregator) Aggregation() metricdata.Aggregation {
func TestEmptyPipeline(t *testing.T) {
pipe := &pipeline{}
output, err := pipe.produce(context.Background())
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Nil(t, output.Resource)
assert.Len(t, output.ScopeMetrics, 0)
@ -56,7 +57,7 @@ func TestEmptyPipeline(t *testing.T) {
pipe.addMultiCallback(func(context.Context) error { return nil })
})
output, err = pipe.produce(context.Background())
err = pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Nil(t, output.Resource)
require.Len(t, output.ScopeMetrics, 1)
@ -66,7 +67,8 @@ func TestEmptyPipeline(t *testing.T) {
func TestNewPipeline(t *testing.T) {
pipe := newPipeline(nil, nil, nil)
output, err := pipe.produce(context.Background())
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
assert.Len(t, output.ScopeMetrics, 0)
@ -80,7 +82,7 @@ func TestNewPipeline(t *testing.T) {
pipe.addMultiCallback(func(context.Context) error { return nil })
})
output, err = pipe.produce(context.Background())
err = pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
require.Len(t, output.ScopeMetrics, 1)
@ -91,7 +93,8 @@ func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil)
output, err := pipe.produce(context.Background())
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
assert.NoError(t, err)
assert.Equal(t, res, output.Resource)
}
@ -99,6 +102,7 @@ func TestPipelineUsesResource(t *testing.T) {
func TestPipelineConcurrency(t *testing.T) {
pipe := newPipeline(nil, nil, nil)
ctx := context.Background()
var output metricdata.ResourceMetrics
var wg sync.WaitGroup
const threads = 2
@ -106,7 +110,7 @@ func TestPipelineConcurrency(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = pipe.produce(ctx)
_ = pipe.produce(ctx, &output)
}()
wg.Add(1)
@ -167,7 +171,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
a.Aggregate(1, *attribute.EmptySet())
}
out, err := test.pipe.produce(context.Background())
out := metricdata.ResourceMetrics{}
err = test.pipe.produce(context.Background(), &out)
require.NoError(t, err)
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
sm := out.ScopeMetrics[0]

View File

@ -99,7 +99,7 @@ type sdkProducer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (metricdata.ResourceMetrics, error)
produce(context.Context, *metricdata.ResourceMetrics) error
}
// Producer produces metrics for a Reader from an external source.
@ -113,15 +113,15 @@ type Producer interface {
// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
produce func(context.Context) (metricdata.ResourceMetrics, error)
produce func(context.Context, *metricdata.ResourceMetrics) error
}
// shutdownProducer produces an ErrReaderShutdown error always.
type shutdownProducer struct{}
// produce returns an ErrReaderShutdown error.
func (p shutdownProducer) produce(context.Context) (metricdata.ResourceMetrics, error) {
return metricdata.ResourceMetrics{}, ErrReaderShutdown
func (p shutdownProducer) produce(context.Context, *metricdata.ResourceMetrics) error {
return ErrReaderShutdown
}
// TemporalitySelector selects the temporality to use based on the InstrumentKind.

View File

@ -103,10 +103,11 @@ func (ts *readerTestSuite) TestMultipleForceFlush() {
func (ts *readerTestSuite) TestMultipleRegister() {
p0 := testSDKProducer{
produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) {
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
// Differentiate this producer from the second by returning an
// error.
return testResourceMetricsA, assert.AnError
*rm = testResourceMetricsA
return assert.AnError
},
}
p1 := testSDKProducer{}
@ -144,8 +145,9 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() {
func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() {
ts.Reader.register(testSDKProducer{
produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) {
return metricdata.ResourceMetrics{}, assert.AnError
produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error {
*rm = metricdata.ResourceMetrics{}
return assert.AnError
}})
ts.Reader.RegisterProducer(testExternalProducer{})
@ -252,14 +254,15 @@ var testResourceMetricsAB = metricdata.ResourceMetrics{
}
type testSDKProducer struct {
produceFunc func(context.Context) (metricdata.ResourceMetrics, error)
produceFunc func(context.Context, *metricdata.ResourceMetrics) error
}
func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
func (p testSDKProducer) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if p.produceFunc != nil {
return p.produceFunc(ctx)
return p.produceFunc(ctx, rm)
}
return testResourceMetricsA, nil
*rm = testResourceMetricsA
return nil
}
type testExternalProducer struct {