1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

Add Metric Producer as a new interface, which returns scope metrics (#3524)

* add RegisterProducer method and metric.Producer interface

* rename testProducer to testSDKProducer

* rename testMetrics to testResourceMetrics

* add testExternalProducer for testing bridges

* add test data for testing external producers

* clean up help text

* unit tests for external Producer

* changelog

* improve test coverage

* Update CHANGELOG.md

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* support partial errors

* fix lint

* add additional test

* unallocate producers on shutdown

* don't register Producers after shutdown

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
David Ashpole
2022-12-15 10:50:45 -05:00
committed by GitHub
parent 4e763472ee
commit 14a17b3ad6
8 changed files with 267 additions and 80 deletions
+4
View File
@@ -12,6 +12,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The deprecated `go.opentelemetry.io/otel/sdk/metric/view` package is removed. (#3520)
### Added
- Add `Producer` interface and `Reader.RegisterProducer(Producer)` to `go.opentelemetry.io/otel/sdk/metric` to enable external metric Producers. (#3524)
## [1.11.2/0.34.0] 2022-12-05
### Added
+13 -8
View File
@@ -54,14 +54,19 @@ func unify(funcs []func(context.Context) error) func(context.Context) error {
errs = append(errs, err)
}
}
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
return unifyErrors(errs)
}
}
// unifyErrors combines multiple errors into a single error.
func unifyErrors(errs []error) error {
switch len(errs) {
case 0:
return nil
case 1:
return errs[0]
default:
return fmt.Errorf("%v", errs)
}
}
+9 -7
View File
@@ -28,12 +28,13 @@ import (
)
type reader struct {
producer producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
producer sdkProducer
externalProducers []Producer
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
collectFunc func(context.Context) (metricdata.ResourceMetrics, error)
forceFlushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
var _ Reader = (*reader)(nil)
@@ -42,7 +43,8 @@ func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // n
return r.aggregationFunc(kind)
}
func (r *reader) register(p producer) { r.producer = p }
func (r *reader) register(p sdkProducer) { r.producer = p }
func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.externalProducers, p) }
func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.temporalityFunc(kind)
}
+49 -11
View File
@@ -28,9 +28,13 @@ import (
// manualReader is a simple Reader that allows an application to
// read metrics on demand.
type manualReader struct {
producer atomic.Value
sdkProducer atomic.Value
shutdownOnce sync.Once
mu sync.Mutex
isShutdown bool
externalProducers atomic.Value
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
}
@@ -41,22 +45,39 @@ var _ = map[Reader]struct{}{&manualReader{}: {}}
// NewManualReader returns a Reader which is directly called to collect metrics.
func NewManualReader(opts ...ManualReaderOption) Reader {
cfg := newManualReaderConfig(opts)
return &manualReader{
r := &manualReader{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
r.externalProducers.Store([]Producer{})
return r
}
// register stores the Producer which enables the caller to read
// metrics on demand.
func (mr *manualReader) register(p producer) {
// register stores the sdkProducer which enables the caller
// to read metrics from the SDK on demand.
func (mr *manualReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !mr.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register manual reader"
global.Error(errDuplicateRegister, msg)
}
}
// RegisterProducer stores the external Producer which enables the caller
// to read metrics on demand.
func (mr *manualReader) RegisterProducer(p Producer) {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.isShutdown {
return
}
currentProducers := mr.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
mr.externalProducers.Store(newProducers)
}
// temporality reports the Temporality for the instrument kind provided.
func (mr *manualReader) temporality(kind InstrumentKind) metricdata.Temporality {
return mr.temporalitySelector(kind)
@@ -77,18 +98,23 @@ func (mr *manualReader) Shutdown(context.Context) error {
err := ErrReaderShutdown
mr.shutdownOnce.Do(func() {
// Any future call to Collect will now return ErrReaderShutdown.
mr.producer.Store(produceHolder{
mr.sdkProducer.Store(produceHolder{
produce: shutdownProducer{}.produce,
})
mr.mu.Lock()
defer mr.mu.Unlock()
mr.isShutdown = true
// release references to Producer(s)
mr.externalProducers.Store([]Producer{})
err = nil
})
return err
}
// Collect gathers all metrics from the SDK, calling any callbacks necessary.
// Collect will return an error if called after shutdown.
// Collect gathers all metrics from the SDK and other Producers, calling any
// callbacks necessary. Collect will return an error if called after shutdown.
func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
p := mr.producer.Load()
p := mr.sdkProducer.Load()
if p == nil {
return metricdata.ResourceMetrics{}, ErrReaderNotRegistered
}
@@ -103,7 +129,19 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics
return metricdata.ResourceMetrics{}, err
}
return ph.produce(ctx)
rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
var errs []error
for _, producer := range mr.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
}
// manualReaderConfig contains configuration options for a ManualReader.
+47 -8
View File
@@ -114,6 +114,7 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
cancel: cancel,
done: make(chan struct{}),
}
r.externalProducers.Store([]Producer{})
go func() {
defer func() { close(r.done) }()
@@ -126,7 +127,11 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
// periodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type periodicReader struct {
producer atomic.Value
sdkProducer atomic.Value
mu sync.Mutex
isShutdown bool
externalProducers atomic.Value
timeout time.Duration
exporter Exporter
@@ -166,14 +171,28 @@ func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
}
// register registers p as the producer of this reader.
func (r *periodicReader) register(p producer) {
func (r *periodicReader) register(p sdkProducer) {
// Only register once. If producer is already set, do nothing.
if !r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
msg := "did not register periodic reader"
global.Error(errDuplicateRegister, msg)
}
}
// RegisterProducer registers p as an external Producer of this reader.
func (r *periodicReader) RegisterProducer(p Producer) {
r.mu.Lock()
defer r.mu.Unlock()
if r.isShutdown {
return
}
currentProducers := r.externalProducers.Load().([]Producer)
newProducers := []Producer{}
newProducers = append(newProducers, currentProducers...)
newProducers = append(newProducers, p)
r.externalProducers.Store(newProducers)
}
// temporality reports the Temporality for the instrument kind provided.
func (r *periodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
return r.exporter.Temporality(kind)
@@ -195,12 +214,13 @@ func (r *periodicReader) collectAndExport(ctx context.Context) error {
}
// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
// the SDK and other Producers. The returned metric data is not exported
// to the configured exporter, it is left to the caller to handle that if
// desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) {
return r.collect(ctx, r.producer.Load())
return r.collect(ctx, r.sdkProducer.Load())
}
// collect unwraps p as a produceHolder and returns its produce results.
@@ -218,7 +238,20 @@ func (r *periodicReader) collect(ctx context.Context, p interface{}) (metricdata
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return metricdata.ResourceMetrics{}, err
}
return ph.produce(ctx)
rm, err := ph.produce(ctx)
if err != nil {
return metricdata.ResourceMetrics{}, err
}
var errs []error
for _, producer := range r.externalProducers.Load().([]Producer) {
externalMetrics, err := producer.Produce(ctx)
if err != nil {
errs = append(errs, err)
}
rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
}
return rm, unifyErrors(errs)
}
// export exports metric data m using r's exporter.
@@ -259,7 +292,7 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
<-r.done
// Any future call to Collect will now return ErrReaderShutdown.
ph := r.producer.Swap(produceHolder{
ph := r.sdkProducer.Swap(produceHolder{
produce: shutdownProducer{}.produce,
})
@@ -276,6 +309,12 @@ func (r *periodicReader) Shutdown(ctx context.Context) error {
if err == nil || err == ErrReaderShutdown {
err = sErr
}
r.mu.Lock()
defer r.mu.Unlock()
r.isShutdown = true
// release references to Producer(s)
r.externalProducers.Store([]Producer{})
})
return err
}
+12 -8
View File
@@ -114,7 +114,8 @@ func (ts *periodicReaderTestSuite) SetupTest() {
}
ts.ErrReader = NewPeriodicReader(e)
ts.ErrReader.register(testProducer{})
ts.ErrReader.register(testSDKProducer{})
ts.ErrReader.RegisterProducer(testExternalProducer{})
}
func (ts *periodicReaderTestSuite) TearDownTest() {
@@ -186,14 +187,15 @@ func TestPeriodicReaderRun(t *testing.T) {
exp := &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
// The testSDKProducer produces testResourceMetricsAB.
assert.Equal(t, testResourceMetricsAB, m)
return assert.AnError
},
}
r := NewPeriodicReader(exp)
r.register(testProducer{})
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
trigger <- time.Now()
assert.Equal(t, assert.AnError, <-eh.Err)
@@ -210,8 +212,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
called = new(bool)
return &fnExporter{
exportFunc: func(_ context.Context, m metricdata.ResourceMetrics) error {
// The testProducer produces testMetrics.
assert.Equal(t, testMetrics, m)
// The testSDKProducer produces testResourceMetricsA.
assert.Equal(t, testResourceMetricsAB, m)
*called = true
return assert.AnError
},
@@ -221,7 +223,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("ForceFlush", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
@@ -232,7 +235,8 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
t.Run("Shutdown", func(t *testing.T) {
exp, called := expFunc(t)
r := NewPeriodicReader(exp)
r.register(testProducer{})
r.register(testSDKProducer{})
r.RegisterProducer(testExternalProducer{})
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})
+16 -3
View File
@@ -51,7 +51,12 @@ type Reader interface {
// register registers a Reader with a MeterProvider.
// The producer argument allows the Reader to signal the sdk to collect
// and send aggregated metric measurements.
register(producer)
register(sdkProducer)
// RegisterProducer registers a an external Producer with this Reader.
// The Producer is used as a source of aggregated metric data which is
// incorporated into metrics collected from the SDK.
RegisterProducer(Producer)
// temporality reports the Temporality for the instrument kind provided.
temporality(InstrumentKind) metricdata.Temporality
@@ -84,14 +89,22 @@ type Reader interface {
Shutdown(context.Context) error
}
// producer produces metrics for a Reader.
type producer interface {
// sdkProducer produces metrics for a Reader.
type sdkProducer interface {
// produce returns aggregated metrics from a single collection.
//
// This method is safe to call concurrently.
produce(context.Context) (metricdata.ResourceMetrics, error)
}
// Producer produces metrics for a Reader from an external source.
type Producer interface {
// Produce returns aggregated metrics from an external source.
//
// This method should be safe to call concurrently.
Produce(context.Context) ([]metricdata.ScopeMetrics, error)
}
// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
+117 -35
View File
@@ -57,16 +57,25 @@ func (ts *readerTestSuite) TestErrorForNotRegistered() {
ts.ErrorIs(err, ErrReaderNotRegistered)
}
func (ts *readerTestSuite) TestProducer() {
ts.Reader.register(testProducer{})
func (ts *readerTestSuite) TestSDKProducer() {
ts.Reader.register(testSDKProducer{})
m, err := ts.Reader.Collect(context.Background())
ts.NoError(err)
ts.Equal(testMetrics, m)
ts.Equal(testResourceMetricsA, m)
}
func (ts *readerTestSuite) TestExternalProducer() {
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
m, err := ts.Reader.Collect(context.Background())
ts.NoError(err)
ts.Equal(testResourceMetricsAB, m)
}
func (ts *readerTestSuite) TestCollectAfterShutdown() {
ctx := context.Background()
ts.Reader.register(testProducer{})
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
ts.Require().NoError(ts.Reader.Shutdown(ctx))
m, err := ts.Reader.Collect(ctx)
@@ -76,27 +85,29 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() {
func (ts *readerTestSuite) TestShutdownTwice() {
ctx := context.Background()
ts.Reader.register(testProducer{})
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
ts.Require().NoError(ts.Reader.Shutdown(ctx))
ts.ErrorIs(ts.Reader.Shutdown(ctx), ErrReaderShutdown)
}
func (ts *readerTestSuite) TestMultipleForceFlush() {
ctx := context.Background()
ts.Reader.register(testProducer{})
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
ts.Require().NoError(ts.Reader.ForceFlush(ctx))
ts.NoError(ts.Reader.ForceFlush(ctx))
}
func (ts *readerTestSuite) TestMultipleRegister() {
p0 := testProducer{
p0 := testSDKProducer{
produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) {
// Differentiate this producer from the second by returning an
// error.
return testMetrics, assert.AnError
return testResourceMetricsA, assert.AnError
},
}
p1 := testProducer{}
p1 := testSDKProducer{}
ts.Reader.register(p0)
// This should be ignored.
@@ -106,11 +117,46 @@ func (ts *readerTestSuite) TestMultipleRegister() {
ts.Equal(assert.AnError, err)
}
func (ts *readerTestSuite) TestExternalProducerPartialSuccess() {
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(
testExternalProducer{
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
return []metricdata.ScopeMetrics{}, assert.AnError
},
},
)
ts.Reader.RegisterProducer(
testExternalProducer{
produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
return []metricdata.ScopeMetrics{testScopeMetricsB}, nil
},
},
)
m, err := ts.Reader.Collect(context.Background())
ts.Equal(assert.AnError, err)
ts.Equal(testResourceMetricsAB, m)
}
func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() {
ts.Reader.register(testSDKProducer{
produceFunc: func(ctx context.Context) (metricdata.ResourceMetrics, error) {
return metricdata.ResourceMetrics{}, assert.AnError
}})
ts.Reader.RegisterProducer(testExternalProducer{})
m, err := ts.Reader.Collect(context.Background())
ts.Equal(assert.AnError, err)
ts.Equal(metricdata.ResourceMetrics{}, m)
}
func (ts *readerTestSuite) TestMethodConcurrency() {
// Requires the race-detector (a default test option for the project).
// All reader methods should be concurrent-safe.
ts.Reader.register(testProducer{})
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
ctx := context.Background()
var wg sync.WaitGroup
@@ -141,49 +187,85 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() {
ctx := context.Background()
ts.Require().NoError(ts.Reader.Shutdown(ctx))
// Registering after shutdown should not revert the shutdown.
ts.Reader.register(testProducer{})
ts.Reader.register(testSDKProducer{})
ts.Reader.RegisterProducer(testExternalProducer{})
m, err := ts.Reader.Collect(ctx)
ts.ErrorIs(err, ErrReaderShutdown)
ts.Equal(metricdata.ResourceMetrics{}, m)
}
var testMetrics = metricdata.ResourceMetrics{
Resource: resource.NewSchemaless(attribute.String("test", "Reader")),
ScopeMetrics: []metricdata.ScopeMetrics{{
Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"},
Metrics: []metricdata.Metrics{{
Name: "fake data",
Description: "Data used to test a reader",
Unit: unit.Dimensionless,
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{{
Attributes: attribute.NewSet(attribute.String("user", "alice")),
StartTime: time.Now(),
Time: time.Now().Add(time.Second),
Value: -1,
}},
},
}},
var testScopeMetricsA = metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"},
Metrics: []metricdata.Metrics{{
Name: "fake data",
Description: "Data used to test a reader",
Unit: unit.Dimensionless,
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{{
Attributes: attribute.NewSet(attribute.String("user", "alice")),
StartTime: time.Now(),
Time: time.Now().Add(time.Second),
Value: -1,
}},
},
}},
}
type testProducer struct {
var testScopeMetricsB = metricdata.ScopeMetrics{
Scope: instrumentation.Scope{Name: "sdk/metric/test/reader/external"},
Metrics: []metricdata.Metrics{{
Name: "fake scope data",
Description: "Data used to test a Producer reader",
Unit: unit.Milliseconds,
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{{
Attributes: attribute.NewSet(attribute.String("user", "ben")),
StartTime: time.Now(),
Time: time.Now().Add(time.Second),
Value: 10,
}},
},
}},
}
var testResourceMetricsA = metricdata.ResourceMetrics{
Resource: resource.NewSchemaless(attribute.String("test", "Reader")),
ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA},
}
var testResourceMetricsAB = metricdata.ResourceMetrics{
Resource: resource.NewSchemaless(attribute.String("test", "Reader")),
ScopeMetrics: []metricdata.ScopeMetrics{testScopeMetricsA, testScopeMetricsB},
}
type testSDKProducer struct {
produceFunc func(context.Context) (metricdata.ResourceMetrics, error)
}
func (p testProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
func (p testSDKProducer) produce(ctx context.Context) (metricdata.ResourceMetrics, error) {
if p.produceFunc != nil {
return p.produceFunc(ctx)
}
return testMetrics, nil
return testResourceMetricsA, nil
}
type testExternalProducer struct {
produceFunc func(context.Context) ([]metricdata.ScopeMetrics, error)
}
func (p testExternalProducer) Produce(ctx context.Context) ([]metricdata.ScopeMetrics, error) {
if p.produceFunc != nil {
return p.produceFunc(ctx)
}
return []metricdata.ScopeMetrics{testScopeMetricsB}, nil
}
func benchReaderCollectFunc(r Reader) func(*testing.B) {
ctx := context.Background()
r.register(testProducer{})
r.register(testSDKProducer{})
// Store bechmark results in a closure to prevent the compiler from
// inlining and skipping the function.
@@ -198,7 +280,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) {
for n := 0; n < b.N; n++ {
collectedMetrics, err = r.Collect(ctx)
assert.Equalf(b, testMetrics, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err)
assert.Equalf(b, testResourceMetricsA, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err)
}
}
}