1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

Move Aggregation/Temporality selection to the Exporter interface (#3260)

* Add Aggregation/Temporality to Exporter iface

* Use Exporter selectors in periodic reader

* Move selector opts to just manual reader

* Simplify periodic reader ref to Exporter selectors

* Fix the periodic reader tests

* Add Aggregation/Temporality method to stdoutmetric

* Add Temporality/Aggregation to otlpmetric exp

* Add Temporality/Aggregation to http/grpc otlp clients

* Add oconf tests for selector opts

* Add tests to stdoutmetric for opts

* Correct comment subject

* Add changes to changelog

* Fix otest test client
This commit is contained in:
Tyler Yahn
2022-11-01 07:56:18 -07:00
committed by GitHub
parent 8a390acc34
commit 48a05478e2
19 changed files with 436 additions and 106 deletions
+4
View File
@@ -18,6 +18,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `"go.opentelemetry.io/otel/sdk/metric".WithReader` option no longer accepts views to associate with the `Reader`.
Instead, views are now registered directly with the `MeterProvider` via the new `WithView` option.
The views registered with the `MeterProvider` apply to all `Reader`s. (#3387)
- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/sdk/metric".Exporter` interface. (#3260)
- The `Temporality(view.InstrumentKind) metricdata.Temporality` and `Aggregation(view.InstrumentKind) aggregation.Aggregation` methods are added to the `"go.opentelemetry.io/otel/exporters/otlp/otlpmetric".Client` interface. (#3260)
- The `WithTemporalitySelector` and `WithAggregationSelector` `ReaderOption`s have been changed to `ManualReaderOption`s in the `go.opentelemetry.io/otel/sdk/metric` package. (#3260)
- The periodic reader in the `go.opentelemetry.io/otel/sdk/metric` package now uses the temporality and aggregation selectors from its configured exporter instead of accepting them as options. (#3260)
### Fixed
+9
View File
@@ -17,11 +17,20 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric
import (
"context"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
// Client handles the transmission of OTLP data to an OTLP receiving endpoint.
type Client interface {
// Temporality returns the Temporality to use for an instrument kind.
Temporality(view.InstrumentKind) metricdata.Temporality
// Aggregation returns the Aggregation to use for an instrument kind.
Aggregation(view.InstrumentKind) aggregation.Aggregation
// UploadMetrics transmits metric data to an OTLP receiver.
//
// All retry logic must be handled by UploadMetrics alone, the Exporter
+32 -2
View File
@@ -21,7 +21,9 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/transform"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
@@ -34,6 +36,20 @@ type exporter struct {
shutdownOnce sync.Once
}
// Temporality returns the Temporality to use for an instrument kind.
func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Temporality(k)
}
// Aggregation returns the Aggregation to use for an instrument kind.
func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
e.clientMu.Lock()
defer e.clientMu.Unlock()
return e.client.Aggregation(k)
}
// Export transforms and transmits metric data to an OTLP receiver.
func (e *exporter) Export(ctx context.Context, rm metricdata.ResourceMetrics) error {
otlpRm, err := transform.ResourceMetrics(rm)
@@ -68,7 +84,10 @@ func (e *exporter) Shutdown(ctx context.Context) error {
e.shutdownOnce.Do(func() {
e.clientMu.Lock()
client := e.client
e.client = shutdownClient{}
e.client = shutdownClient{
temporalitySelector: client.Temporality,
aggregationSelector: client.Aggregation,
}
e.clientMu.Unlock()
err = client.Shutdown(ctx)
})
@@ -82,7 +101,10 @@ func New(client Client) metric.Exporter {
return &exporter{client: client}
}
type shutdownClient struct{}
type shutdownClient struct {
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}
func (c shutdownClient) err(ctx context.Context) error {
if err := ctx.Err(); err != nil {
@@ -91,6 +113,14 @@ func (c shutdownClient) err(ctx context.Context) error {
return errShutdown
}
func (c shutdownClient) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}
func (c shutdownClient) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}
func (c shutdownClient) UploadMetrics(ctx context.Context, _ *mpb.ResourceMetrics) error {
return c.err(ctx)
}
@@ -21,7 +21,10 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
@@ -31,6 +34,14 @@ type client struct {
n int
}
func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return metric.DefaultTemporalitySelector(k)
}
func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return metric.DefaultAggregationSelector(k)
}
func (c *client) UploadMetrics(context.Context, *mpb.ResourceMetrics) error {
c.n++
return nil
@@ -27,6 +27,10 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/view"
)
const (
@@ -57,6 +61,9 @@ type (
// gRPC configurations
GRPCCredentials credentials.TransportCredentials
TemporalitySelector metric.TemporalitySelector
AggregationSelector metric.AggregationSelector
}
Config struct {
@@ -82,6 +89,9 @@ func NewHTTPConfig(opts ...HTTPOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,
TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
}
@@ -102,6 +112,9 @@ func NewGRPCConfig(opts ...GRPCOption) Config {
URLPath: DefaultMetricsPath,
Compression: NoCompression,
Timeout: DefaultTimeout,
TemporalitySelector: metric.DefaultTemporalitySelector,
AggregationSelector: metric.DefaultAggregationSelector,
},
RetryConfig: retry.DefaultConfig,
DialOptions: []grpc.DialOption{grpc.WithUserAgent(internal.GetUserAgentHeader())},
@@ -313,3 +326,32 @@ func WithTimeout(duration time.Duration) GenericOption {
return cfg
})
}
func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Metrics.TemporalitySelector = selector
return cfg
})
}
func WithAggregationSelector(selector metric.AggregationSelector) GenericOption {
// Deep copy and validate before using.
wrapped := func(ik view.InstrumentKind) aggregation.Aggregation {
a := selector(ik)
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = metric.DefaultAggregationSelector(ik)
global.Error(
err, "using default aggregation instead",
"aggregation", a,
"replacement", cpA,
)
}
return cpA
}
return newGenericOption(func(cfg Config) Config {
cfg.Metrics.AggregationSelector = wrapped
return cfg
})
}
@@ -23,6 +23,9 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/internal/envconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
)
const (
@@ -383,6 +386,38 @@ func TestConfigs(t *testing.T) {
assert.Equal(t, c.Metrics.Timeout, 5*time.Second)
},
},
// Temporality Selector Tests
{
name: "WithTemporalitySelector",
opts: []oconf.GenericOption{
oconf.WithTemporalitySelector(deltaSelector),
},
asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) {
// Function value comparisons are disallowed, test non-default
// behavior of a TemporalitySelector here to ensure our "catch
// all" was set.
var undefinedKind view.InstrumentKind
got := c.Metrics.TemporalitySelector
assert.Equal(t, metricdata.DeltaTemporality, got(undefinedKind))
},
},
// Aggregation Selector Tests
{
name: "WithAggregationSelector",
opts: []oconf.GenericOption{
oconf.WithAggregationSelector(dropSelector),
},
asserts: func(t *testing.T, c *oconf.Config, grpcOption bool) {
// Function value comparisons are disallowed, test non-default
// behavior of a AggregationSelector here to ensure our "catch
// all" was set.
var undefinedKind view.InstrumentKind
got := c.Metrics.AggregationSelector
assert.Equal(t, aggregation.Drop{}, got(undefinedKind))
},
},
}
for _, tt := range tests {
@@ -406,6 +441,14 @@ func TestConfigs(t *testing.T) {
}
}
func dropSelector(view.InstrumentKind) aggregation.Aggregation {
return aggregation.Drop{}
}
func deltaSelector(view.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
}
func asHTTPOptions(opts []oconf.GenericOption) []oconf.HTTPOption {
converted := make([]oconf.HTTPOption, len(opts))
for i, o := range opts {
@@ -19,6 +19,10 @@ import (
"testing"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
cpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
@@ -27,6 +31,14 @@ type client struct {
storage *Storage
}
func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return metric.DefaultTemporalitySelector(k)
}
func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return metric.DefaultAggregationSelector(k)
}
func (c *client) Collect() *Storage {
return c.storage
}
@@ -28,6 +28,9 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
@@ -53,6 +56,9 @@ type client struct {
exportTimeout time.Duration
requestFunc retry.RequestFunc
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
// Shutdown as the conn should only be closed if we created it. Otherwise,
@@ -70,6 +76,9 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error
exportTimeout: cfg.Metrics.Timeout,
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
conn: cfg.GRPCConn,
temporalitySelector: cfg.Metrics.TemporalitySelector,
aggregationSelector: cfg.Metrics.AggregationSelector,
}
if len(cfg.Metrics.Headers) > 0 {
@@ -94,6 +103,16 @@ func newClient(ctx context.Context, options ...Option) (otlpmetric.Client, error
return c, nil
}
// Temporality returns the Temporality to use for an instrument kind.
func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}
// Aggregation returns the Aggregation to use for an instrument kind.
func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}
// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }
@@ -24,6 +24,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
)
// Option applies a configuration option to the Exporter.
@@ -236,3 +237,20 @@ func WithTimeout(duration time.Duration) Option {
func WithRetry(settings RetryConfig) Option {
return wrappedOption{oconf.WithRetry(retry.Config(settings))}
}
// WithTemporalitySelector sets the TemporalitySelector the client will use to
// determine the Temporality of an instrument based on its kind. If this option
// is not used, the client will use the DefaultTemporalitySelector from the
// go.opentelemetry.io/otel/sdk/metric package.
func WithTemporalitySelector(selector metric.TemporalitySelector) Option {
return wrappedOption{oconf.WithTemporalitySelector(selector)}
}
// WithAggregationSelector sets the AggregationSelector the client will use to
// determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the reader will use the DefaultAggregationSelector
// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation
// explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector metric.AggregationSelector) Option {
return wrappedOption{oconf.WithAggregationSelector(selector)}
}
@@ -34,6 +34,9 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
@@ -55,6 +58,9 @@ type client struct {
compression Compression
requestFunc retry.RequestFunc
httpClient *http.Client
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}
// Keep it in sync with golang's DefaultTransport from net/http! We
@@ -116,9 +122,22 @@ func newClient(opts ...Option) (otlpmetric.Client, error) {
req: req,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
httpClient: httpClient,
temporalitySelector: cfg.Metrics.TemporalitySelector,
aggregationSelector: cfg.Metrics.AggregationSelector,
}, nil
}
// Temporality returns the Temporality to use for an instrument kind.
func (c *client) Temporality(k view.InstrumentKind) metricdata.Temporality {
return c.temporalitySelector(k)
}
// Aggregation returns the Aggregation to use for an instrument kind.
func (c *client) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return c.aggregationSelector(k)
}
// ForceFlush does nothing, the client holds no state.
func (c *client) ForceFlush(ctx context.Context) error { return ctx.Err() }
@@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/oconf"
"go.opentelemetry.io/otel/sdk/metric"
)
// Compression describes the compression used for payloads sent to the
@@ -179,3 +180,20 @@ func WithTimeout(duration time.Duration) Option {
func WithRetry(rc RetryConfig) Option {
return wrappedOption{oconf.WithRetry(retry.Config(rc))}
}
// WithTemporalitySelector sets the TemporalitySelector the client will use to
// determine the Temporality of an instrument based on its kind. If this option
// is not used, the client will use the DefaultTemporalitySelector from the
// go.opentelemetry.io/otel/sdk/metric package.
func WithTemporalitySelector(selector metric.TemporalitySelector) Option {
return wrappedOption{oconf.WithTemporalitySelector(selector)}
}
// WithAggregationSelector sets the AggregationSelector the client will use to
// determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the reader will use the DefaultAggregationSelector
// from the go.opentelemetry.io/otel/sdk/metric package, or the aggregation
// explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector metric.AggregationSelector) Option {
return wrappedOption{oconf.WithAggregationSelector(selector)}
}
+67 -1
View File
@@ -16,11 +16,18 @@ package stdoutmetric // import "go.opentelemetry.io/otel/exporters/stdout/stdout
import (
"encoding/json"
"os"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/view"
)
// config contains options for the exporter.
type config struct {
encoder *encoderHolder
encoder *encoderHolder
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}
// newConfig creates a validated config configured with options.
@@ -36,6 +43,14 @@ func newConfig(options ...Option) config {
cfg.encoder = &encoderHolder{encoder: enc}
}
if cfg.temporalitySelector == nil {
cfg.temporalitySelector = metric.DefaultTemporalitySelector
}
if cfg.aggregationSelector == nil {
cfg.aggregationSelector = metric.DefaultAggregationSelector
}
return cfg
}
@@ -60,3 +75,54 @@ func WithEncoder(encoder Encoder) Option {
return c
})
}
// WithTemporalitySelector sets the TemporalitySelector the exporter will use
// to determine the Temporality of an instrument based on its kind. If this
// option is not used, the exporter will use the DefaultTemporalitySelector
// from the go.opentelemetry.io/otel/sdk/metric package.
func WithTemporalitySelector(selector metric.TemporalitySelector) Option {
return temporalitySelectorOption{selector: selector}
}
type temporalitySelectorOption struct {
selector metric.TemporalitySelector
}
func (t temporalitySelectorOption) apply(c config) config {
c.temporalitySelector = t.selector
return c
}
// WithAggregationSelector sets the AggregationSelector the exporter will use
// to determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the exporter will use the
// DefaultAggregationSelector from the go.opentelemetry.io/otel/sdk/metric
// package or the aggregation explicitly passed for a view matching an
// instrument.
func WithAggregationSelector(selector metric.AggregationSelector) Option {
// Deep copy and validate before using.
wrapped := func(ik view.InstrumentKind) aggregation.Aggregation {
a := selector(ik)
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = metric.DefaultAggregationSelector(ik)
global.Error(
err, "using default aggregation instead",
"aggregation", a,
"replacement", cpA,
)
}
return cpA
}
return aggregationSelectorOption{selector: wrapped}
}
type aggregationSelectorOption struct {
selector metric.AggregationSelector
}
func (t aggregationSelectorOption) apply(c config) config {
c.aggregationSelector = t.selector
return c
}
+17 -1
View File
@@ -20,7 +20,9 @@ import (
"sync/atomic"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
)
// exporter is an OpenTelemetry metric exporter.
@@ -28,6 +30,9 @@ type exporter struct {
encVal atomic.Value // encoderHolder
shutdownOnce sync.Once
temporalitySelector metric.TemporalitySelector
aggregationSelector metric.AggregationSelector
}
// New returns a configured metric exporter.
@@ -36,11 +41,22 @@ type exporter struct {
// encoder with tab indentations that output to STDOUT.
func New(options ...Option) (metric.Exporter, error) {
cfg := newConfig(options...)
exp := &exporter{}
exp := &exporter{
temporalitySelector: cfg.temporalitySelector,
aggregationSelector: cfg.aggregationSelector,
}
exp.encVal.Store(*cfg.encoder)
return exp, nil
}
func (e *exporter) Temporality(k view.InstrumentKind) metricdata.Temporality {
return e.temporalitySelector(k)
}
func (e *exporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
return e.aggregationSelector(k)
}
func (e *exporter) Export(ctx context.Context, data metricdata.ResourceMetrics) error {
select {
case <-ctx.Done():
@@ -25,7 +25,9 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
)
func testEncoderOption() stdoutmetric.Option {
@@ -97,3 +99,33 @@ func TestShutdownExporterReturnsShutdownErrorOnExport(t *testing.T) {
require.NoError(t, exp.Shutdown(ctx))
assert.EqualError(t, exp.Export(ctx, data), "exporter shutdown")
}
func deltaSelector(view.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
}
func TestTemporalitySelector(t *testing.T) {
exp, err := stdoutmetric.New(
testEncoderOption(),
stdoutmetric.WithTemporalitySelector(deltaSelector),
)
require.NoError(t, err)
var unknownKind view.InstrumentKind
assert.Equal(t, metricdata.DeltaTemporality, exp.Temporality(unknownKind))
}
func dropSelector(view.InstrumentKind) aggregation.Aggregation {
return aggregation.Drop{}
}
func TestAggregationSelector(t *testing.T) {
exp, err := stdoutmetric.New(
testEncoderOption(),
stdoutmetric.WithAggregationSelector(dropSelector),
)
require.NoError(t, err)
var unknownKind view.InstrumentKind
assert.Equal(t, aggregation.Drop{}, exp.Aggregation(unknownKind))
}
+8
View File
@@ -18,7 +18,9 @@ import (
"context"
"fmt"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
)
// ErrExporterShutdown is returned if Export or Shutdown are called after an
@@ -28,6 +30,12 @@ var ErrExporterShutdown = fmt.Errorf("exporter is shutdown")
// Exporter handles the delivery of metric data to external receivers. This is
// the final component in the metric push pipeline.
type Exporter interface {
// Temporality returns the Temporality to use for an instrument kind.
Temporality(view.InstrumentKind) metricdata.Temporality
// Aggregation returns the Aggregation to use for an instrument kind.
Aggregation(view.InstrumentKind) aggregation.Aggregation
// Export serializes and transmits metric data to a receiver.
//
// This is called synchronously, there is no concurrency safety
+50
View File
@@ -129,3 +129,53 @@ func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig {
type ManualReaderOption interface {
applyManual(manualReaderConfig) manualReaderConfig
}
// WithTemporalitySelector sets the TemporalitySelector a reader will use to
// determine the Temporality of an instrument based on its kind. If this
// option is not used, the reader will use the DefaultTemporalitySelector.
func WithTemporalitySelector(selector TemporalitySelector) ManualReaderOption {
return temporalitySelectorOption{selector: selector}
}
type temporalitySelectorOption struct {
selector func(instrument view.InstrumentKind) metricdata.Temporality
}
// applyManual returns a manualReaderConfig with option applied.
func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig {
mrc.temporalitySelector = t.selector
return mrc
}
// WithAggregationSelector sets the AggregationSelector a reader will use to
// determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the reader will use the DefaultAggregationSelector
// or the aggregation explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector AggregationSelector) ManualReaderOption {
// Deep copy and validate before using.
wrapped := func(ik view.InstrumentKind) aggregation.Aggregation {
a := selector(ik)
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = DefaultAggregationSelector(ik)
global.Error(
err, "using default aggregation instead",
"aggregation", a,
"replacement", cpA,
)
}
return cpA
}
return aggregationSelectorOption{selector: wrapped}
}
type aggregationSelectorOption struct {
selector AggregationSelector
}
// applyManual returns a manualReaderConfig with option applied.
func (t aggregationSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig {
c.aggregationSelector = t.selector
return c
}
+6 -16
View File
@@ -36,20 +36,16 @@ const (
// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
interval time.Duration
timeout time.Duration
}
// newPeriodicReaderConfig returns a periodicReaderConfig configured with
// options.
func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
c := periodicReaderConfig{
interval: defaultInterval,
timeout: defaultTimeout,
temporalitySelector: DefaultTemporalitySelector,
aggregationSelector: DefaultAggregationSelector,
interval: defaultInterval,
timeout: defaultTimeout,
}
for _, o := range options {
c = o.applyPeriodic(c)
@@ -118,9 +114,6 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reade
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
temporalitySelector: conf.temporalitySelector,
aggregationSelector: conf.aggregationSelector,
}
go func() {
@@ -140,9 +133,6 @@ type periodicReader struct {
exporter Exporter
flushCh chan chan error
temporalitySelector TemporalitySelector
aggregationSelector AggregationSelector
done chan struct{}
cancel context.CancelFunc
shutdownOnce sync.Once
@@ -187,12 +177,12 @@ func (r *periodicReader) register(p producer) {
// temporality reports the Temporality for the instrument kind provided.
func (r *periodicReader) temporality(kind view.InstrumentKind) metricdata.Temporality {
return r.temporalitySelector(kind)
return r.exporter.Temporality(kind)
}
// aggregation returns what Aggregation to use for kind.
func (r *periodicReader) aggregation(kind view.InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type.
return r.aggregationSelector(kind)
return r.exporter.Aggregation(kind)
}
// collectAndExport gather all metric data related to the periodicReader r from
+29 -16
View File
@@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
)
@@ -54,13 +55,29 @@ func TestWithInterval(t *testing.T) {
}
type fnExporter struct {
exportFunc func(context.Context, metricdata.ResourceMetrics) error
flushFunc func(context.Context) error
shutdownFunc func(context.Context) error
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
exportFunc func(context.Context, metricdata.ResourceMetrics) error
flushFunc func(context.Context) error
shutdownFunc func(context.Context) error
}
var _ Exporter = (*fnExporter)(nil)
func (e *fnExporter) Temporality(k view.InstrumentKind) metricdata.Temporality {
if e.temporalityFunc != nil {
return e.temporalityFunc(k)
}
return DefaultTemporalitySelector(k)
}
func (e *fnExporter) Aggregation(k view.InstrumentKind) aggregation.Aggregation {
if e.aggregationFunc != nil {
return e.aggregationFunc(k)
}
return DefaultAggregationSelector(k)
}
func (e *fnExporter) Export(ctx context.Context, m metricdata.ResourceMetrics) error {
if e.exportFunc != nil {
return e.exportFunc(ctx, m)
@@ -230,29 +247,25 @@ func BenchmarkPeriodicReader(b *testing.B) {
func TestPeriodiclReaderTemporality(t *testing.T) {
tests := []struct {
name string
options []PeriodicReaderOption
name string
exporter *fnExporter
// Currently only testing constant temporality. This should be expanded
// if we put more advanced selection in the SDK
wantTemporality metricdata.Temporality
}{
{
name: "default",
exporter: new(fnExporter),
wantTemporality: metricdata.CumulativeTemporality,
},
{
name: "delta",
options: []PeriodicReaderOption{
WithTemporalitySelector(deltaTemporalitySelector),
},
name: "delta",
exporter: &fnExporter{temporalityFunc: deltaTemporalitySelector},
wantTemporality: metricdata.DeltaTemporality,
},
{
name: "repeats overwrite",
options: []PeriodicReaderOption{
WithTemporalitySelector(deltaTemporalitySelector),
WithTemporalitySelector(cumulativeTemporalitySelector),
},
name: "cumulative",
exporter: &fnExporter{temporalityFunc: cumulativeTemporalitySelector},
wantTemporality: metricdata.CumulativeTemporality,
},
}
@@ -260,8 +273,8 @@ func TestPeriodiclReaderTemporality(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var undefinedInstrument view.InstrumentKind
rdr := NewPeriodicReader(new(fnExporter), tt.options...)
assert.Equal(t, tt.wantTemporality, rdr.temporality(undefinedInstrument))
rdr := NewPeriodicReader(tt.exporter)
assert.Equal(t, tt.wantTemporality.String(), rdr.temporality(undefinedInstrument).String())
})
}
}
-70
View File
@@ -18,7 +18,6 @@ import (
"context"
"fmt"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/view"
@@ -108,13 +107,6 @@ func (p shutdownProducer) produce(context.Context) (metricdata.ResourceMetrics,
return metricdata.ResourceMetrics{}, ErrReaderShutdown
}
// ReaderOption applies a configuration option value to either a ManualReader or
// a PeriodicReader.
type ReaderOption interface {
ManualReaderOption
PeriodicReaderOption
}
// TemporalitySelector selects the temporality to use based on the InstrumentKind.
type TemporalitySelector func(view.InstrumentKind) metricdata.Temporality
@@ -125,29 +117,6 @@ func DefaultTemporalitySelector(view.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}
// WithTemporalitySelector sets the TemporalitySelector a reader will use to
// determine the Temporality of an instrument based on its kind. If this
// option is not used, the reader will use the DefaultTemporalitySelector.
func WithTemporalitySelector(selector TemporalitySelector) ReaderOption {
return temporalitySelectorOption{selector: selector}
}
type temporalitySelectorOption struct {
selector func(instrument view.InstrumentKind) metricdata.Temporality
}
// applyManual returns a manualReaderConfig with option applied.
func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig {
mrc.temporalitySelector = t.selector
return mrc
}
// applyPeriodic returns a periodicReaderConfig with option applied.
func (t temporalitySelectorOption) applyPeriodic(prc periodicReaderConfig) periodicReaderConfig {
prc.temporalitySelector = t.selector
return prc
}
// AggregationSelector selects the aggregation and the parameters to use for
// that aggregation based on the InstrumentKind.
type AggregationSelector func(view.InstrumentKind) aggregation.Aggregation
@@ -172,42 +141,3 @@ func DefaultAggregationSelector(ik view.InstrumentKind) aggregation.Aggregation
}
panic("unknown instrument kind")
}
// WithAggregationSelector sets the AggregationSelector a reader will use to
// determine the aggregation to use for an instrument based on its kind. If
// this option is not used, the reader will use the DefaultAggregationSelector
// or the aggregation explicitly passed for a view matching an instrument.
func WithAggregationSelector(selector AggregationSelector) ReaderOption {
// Deep copy and validate before using.
wrapped := func(ik view.InstrumentKind) aggregation.Aggregation {
a := selector(ik)
cpA := a.Copy()
if err := cpA.Err(); err != nil {
cpA = DefaultAggregationSelector(ik)
global.Error(
err, "using default aggregation instead",
"aggregation", a,
"replacement", cpA,
)
}
return cpA
}
return aggregationSelectorOption{selector: wrapped}
}
type aggregationSelectorOption struct {
selector AggregationSelector
}
// applyManual returns a manualReaderConfig with option applied.
func (t aggregationSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig {
c.aggregationSelector = t.selector
return c
}
// applyPeriodic returns a periodicReaderConfig with option applied.
func (t aggregationSelectorOption) applyPeriodic(c periodicReaderConfig) periodicReaderConfig {
c.aggregationSelector = t.selector
return c
}