You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-11-27 22:49:15 +02:00
Allow configuring the exemplar filter on the metrics SDK (#5850)
Part of https://github.com/open-telemetry/opentelemetry-go/issues/5249 ### Spec https://opentelemetry.io/docs/specs/otel/metrics/sdk/#exemplarfilter > The ExemplarFilter configuration MUST allow users to select between one of the built-in ExemplarFilters. While ExemplarFilter determines which measurements are eligible for becoming an Exemplar, the ExemplarReservoir makes the final decision if a measurement becomes an exemplar and is stored. > The ExemplarFilter SHOULD be a configuration parameter of a MeterProvider for an SDK. The default value SHOULD be TraceBased. The filter configuration SHOULD follow the [environment variable specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#exemplar). > An OpenTelemetry SDK MUST support the following filters: > * [AlwaysOn](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#alwayson) > * [AlwaysOff](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#alwaysoff) > * [TraceBased](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#tracebased) ### Changes * adds exemplar.AlwaysOffFilter, which is one of the required filters from the SDK: https://opentelemetry.io/docs/specs/otel/metrics/sdk/#alwaysoff * adds `metric.WithExemplarFilter` as an option for the metrics SDK. * moves handling of `OTEL_METRICS_EXEMPLAR_FILTER` to the same location as config handling to make code easier to navigate. dropReservoir can actually be removed, but I plan to do that in a follow-up refactor, since it will be a large diff. --------- Co-authored-by: Damien Mathieu <42@dmathieu.com> Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com> Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
This commit is contained in:
@@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850)
|
||||||
|
- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850)
|
||||||
|
|
||||||
<!-- Released section -->
|
<!-- Released section -->
|
||||||
<!-- Don't change this section unless doing release -->
|
<!-- Don't change this section unless doing release -->
|
||||||
|
|
||||||
|
|||||||
@@ -6,17 +6,21 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||||
"go.opentelemetry.io/otel/sdk/resource"
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
)
|
)
|
||||||
|
|
||||||
// config contains configuration options for a MeterProvider.
|
// config contains configuration options for a MeterProvider.
|
||||||
type config struct {
|
type config struct {
|
||||||
res *resource.Resource
|
res *resource.Resource
|
||||||
readers []Reader
|
readers []Reader
|
||||||
views []View
|
views []View
|
||||||
|
exemplarFilter exemplar.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
// readerSignals returns a force-flush and shutdown function for a
|
// readerSignals returns a force-flush and shutdown function for a
|
||||||
@@ -76,7 +80,13 @@ func unifyShutdown(funcs []func(context.Context) error) func(context.Context) er
|
|||||||
|
|
||||||
// newConfig returns a config configured with options.
|
// newConfig returns a config configured with options.
|
||||||
func newConfig(options []Option) config {
|
func newConfig(options []Option) config {
|
||||||
conf := config{res: resource.Default()}
|
conf := config{
|
||||||
|
res: resource.Default(),
|
||||||
|
exemplarFilter: exemplar.TraceBasedFilter,
|
||||||
|
}
|
||||||
|
for _, o := range meterProviderOptionsFromEnv() {
|
||||||
|
conf = o.apply(conf)
|
||||||
|
}
|
||||||
for _, o := range options {
|
for _, o := range options {
|
||||||
conf = o.apply(conf)
|
conf = o.apply(conf)
|
||||||
}
|
}
|
||||||
@@ -140,3 +150,35 @@ func WithView(views ...View) Option {
|
|||||||
return cfg
|
return cfg
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithExemplarFilter configures the exemplar filter.
|
||||||
|
//
|
||||||
|
// The exemplar filter determines which measurements are offered to the
|
||||||
|
// exemplar reservoir, but the exemplar reservoir makes the final decision of
|
||||||
|
// whether to store an exemplar.
|
||||||
|
//
|
||||||
|
// By default, the [exemplar.SampledFilter]
|
||||||
|
// is used. Exemplars can be entirely disabled by providing the
|
||||||
|
// [exemplar.AlwaysOffFilter].
|
||||||
|
func WithExemplarFilter(filter exemplar.Filter) Option {
|
||||||
|
return optionFunc(func(cfg config) config {
|
||||||
|
cfg.exemplarFilter = filter
|
||||||
|
return cfg
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func meterProviderOptionsFromEnv() []Option {
|
||||||
|
var opts []Option
|
||||||
|
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
|
||||||
|
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"
|
||||||
|
|
||||||
|
switch strings.ToLower(strings.TrimSpace(os.Getenv(filterEnvKey))) {
|
||||||
|
case "always_on":
|
||||||
|
opts = append(opts, WithExemplarFilter(exemplar.AlwaysOnFilter))
|
||||||
|
case "always_off":
|
||||||
|
opts = append(opts, WithExemplarFilter(exemplar.AlwaysOffFilter))
|
||||||
|
case "trace_based":
|
||||||
|
opts = append(opts, WithExemplarFilter(exemplar.TraceBasedFilter))
|
||||||
|
}
|
||||||
|
return opts
|
||||||
|
}
|
||||||
|
|||||||
@@ -14,8 +14,10 @@ import (
|
|||||||
|
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
ottest "go.opentelemetry.io/otel/sdk/internal/internaltest"
|
ottest "go.opentelemetry.io/otel/sdk/internal/internaltest"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||||
"go.opentelemetry.io/otel/sdk/resource"
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
type reader struct {
|
type reader struct {
|
||||||
@@ -192,3 +194,102 @@ func TestWithView(t *testing.T) {
|
|||||||
)})
|
)})
|
||||||
assert.Len(t, c.views, 2)
|
assert.Len(t, c.views, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWithExemplarFilterOff(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
desc string
|
||||||
|
opts []Option
|
||||||
|
env string
|
||||||
|
expectFilterSampled bool
|
||||||
|
expectFilterNotSampled bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "default",
|
||||||
|
expectFilterSampled: true,
|
||||||
|
expectFilterNotSampled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "always on option",
|
||||||
|
opts: []Option{WithExemplarFilter(exemplar.AlwaysOnFilter)},
|
||||||
|
expectFilterSampled: true,
|
||||||
|
expectFilterNotSampled: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "always off option",
|
||||||
|
opts: []Option{WithExemplarFilter(exemplar.AlwaysOffFilter)},
|
||||||
|
expectFilterSampled: false,
|
||||||
|
expectFilterNotSampled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "trace based option",
|
||||||
|
opts: []Option{WithExemplarFilter(exemplar.TraceBasedFilter)},
|
||||||
|
expectFilterSampled: true,
|
||||||
|
expectFilterNotSampled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "last option takes precedence",
|
||||||
|
opts: []Option{
|
||||||
|
WithExemplarFilter(exemplar.AlwaysOffFilter),
|
||||||
|
WithExemplarFilter(exemplar.AlwaysOnFilter),
|
||||||
|
},
|
||||||
|
expectFilterSampled: true,
|
||||||
|
expectFilterNotSampled: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "always_off env",
|
||||||
|
env: "always_off",
|
||||||
|
expectFilterSampled: false,
|
||||||
|
expectFilterNotSampled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "always_on env",
|
||||||
|
env: "always_on",
|
||||||
|
expectFilterSampled: true,
|
||||||
|
expectFilterNotSampled: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "always_on case insensitiveenv",
|
||||||
|
env: "ALWAYS_ON",
|
||||||
|
expectFilterSampled: true,
|
||||||
|
expectFilterNotSampled: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "trace_based env",
|
||||||
|
env: "trace_based",
|
||||||
|
expectFilterSampled: true,
|
||||||
|
expectFilterNotSampled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "wrong env",
|
||||||
|
env: "foo_bar",
|
||||||
|
expectFilterSampled: true,
|
||||||
|
expectFilterNotSampled: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "option takes precedence over env var",
|
||||||
|
env: "always_off",
|
||||||
|
opts: []Option{WithExemplarFilter(exemplar.AlwaysOnFilter)},
|
||||||
|
expectFilterSampled: true,
|
||||||
|
expectFilterNotSampled: true,
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
if tc.env != "" {
|
||||||
|
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", tc.env)
|
||||||
|
}
|
||||||
|
c := newConfig(tc.opts)
|
||||||
|
assert.NotNil(t, c.exemplarFilter)
|
||||||
|
assert.Equal(t, tc.expectFilterNotSampled, c.exemplarFilter(context.Background()))
|
||||||
|
assert.Equal(t, tc.expectFilterSampled, c.exemplarFilter(sample(context.Background())))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func sample(parent context.Context) context.Context {
|
||||||
|
sc := trace.NewSpanContext(trace.SpanContextConfig{
|
||||||
|
TraceID: trace.TraceID{0x01},
|
||||||
|
SpanID: trace.SpanID{0x01},
|
||||||
|
TraceFlags: trace.FlagsSampled,
|
||||||
|
})
|
||||||
|
return trace.ContextWithSpanContext(parent, sc)
|
||||||
|
}
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import (
|
|||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||||
"go.opentelemetry.io/otel/sdk/metric"
|
"go.opentelemetry.io/otel/sdk/metric"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||||
"go.opentelemetry.io/otel/sdk/resource"
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
||||||
)
|
)
|
||||||
@@ -240,3 +241,21 @@ func ExampleNewView_exponentialHistogram() {
|
|||||||
metric.WithView(view),
|
metric.WithView(view),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ExampleWithExemplarFilter_disabled() {
|
||||||
|
// Use exemplar.AlwaysOffFilter to disable exemplar collection.
|
||||||
|
_ = metric.NewMeterProvider(
|
||||||
|
metric.WithExemplarFilter(exemplar.AlwaysOffFilter),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ExampleWithExemplarFilter_custom() {
|
||||||
|
// Create a custom filter function that only offers measurements if the
|
||||||
|
// context has an error.
|
||||||
|
customFilter := func(ctx context.Context) bool {
|
||||||
|
return ctx.Err() != nil
|
||||||
|
}
|
||||||
|
_ = metric.NewMeterProvider(
|
||||||
|
metric.WithExemplarFilter(customFilter),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,7 +4,6 @@
|
|||||||
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
|
||||||
"runtime"
|
"runtime"
|
||||||
"slices"
|
"slices"
|
||||||
|
|
||||||
@@ -13,29 +12,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// reservoirFunc returns the appropriately configured exemplar reservoir
|
// reservoirFunc returns the appropriately configured exemplar reservoir
|
||||||
// creation func based on the passed InstrumentKind and user defined
|
// creation func based on the passed InstrumentKind and filter configuration.
|
||||||
// environment variables.
|
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] {
|
||||||
//
|
|
||||||
// Note: This will only return non-nil values when the experimental exemplar
|
|
||||||
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
|
|
||||||
// is not set to always_off.
|
|
||||||
func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.FilteredExemplarReservoir[N] {
|
|
||||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
|
|
||||||
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"
|
|
||||||
|
|
||||||
var filter exemplar.Filter
|
|
||||||
|
|
||||||
switch os.Getenv(filterEnvKey) {
|
|
||||||
case "always_on":
|
|
||||||
filter = exemplar.AlwaysOnFilter
|
|
||||||
case "always_off":
|
|
||||||
return aggregate.DropReservoir
|
|
||||||
case "trace_based":
|
|
||||||
fallthrough
|
|
||||||
default:
|
|
||||||
filter = exemplar.TraceBasedFilter
|
|
||||||
}
|
|
||||||
|
|
||||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
|
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
|
||||||
// Explicit bucket histogram aggregation with more than 1 bucket will
|
// Explicit bucket histogram aggregation with more than 1 bucket will
|
||||||
// use AlignedHistogramBucketExemplarReservoir.
|
// use AlignedHistogramBucketExemplarReservoir.
|
||||||
|
|||||||
@@ -27,3 +27,8 @@ func TraceBasedFilter(ctx context.Context) bool {
|
|||||||
func AlwaysOnFilter(ctx context.Context) bool {
|
func AlwaysOnFilter(ctx context.Context) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AlwaysOffFilter is a [Filter] that never offers measurements.
|
||||||
|
func AlwaysOffFilter(ctx context.Context) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ type Builder[N int64 | float64] struct {
|
|||||||
// create new exemplar reservoirs for a new seen attribute set.
|
// create new exemplar reservoirs for a new seen attribute set.
|
||||||
//
|
//
|
||||||
// If this is not provided a default factory function that returns an
|
// If this is not provided a default factory function that returns an
|
||||||
// DropReservoir reservoir will be used.
|
// dropReservoir reservoir will be used.
|
||||||
ReservoirFunc func() FilteredExemplarReservoir[N]
|
ReservoirFunc func() FilteredExemplarReservoir[N]
|
||||||
// AggregationLimit is the cardinality limit of measurement attributes. Any
|
// AggregationLimit is the cardinality limit of measurement attributes. Any
|
||||||
// measurement for new attributes once the limit has been reached will be
|
// measurement for new attributes once the limit has been reached will be
|
||||||
@@ -54,7 +54,7 @@ func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
|
|||||||
return b.ReservoirFunc
|
return b.ReservoirFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
return DropReservoir
|
return dropReservoir
|
||||||
}
|
}
|
||||||
|
|
||||||
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
|
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
|
||||||
|
|||||||
@@ -73,7 +73,7 @@ func (c *clock) Register() (unregister func()) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
|
func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
|
||||||
return DropReservoir[N]()
|
return dropReservoir[N]()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuilderFilter(t *testing.T) {
|
func TestBuilderFilter(t *testing.T) {
|
||||||
|
|||||||
@@ -10,8 +10,8 @@ import (
|
|||||||
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
|
// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
|
||||||
func DropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
|
func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
|
||||||
|
|
||||||
type dropRes[N int64 | float64] struct{}
|
type dropRes[N int64 | float64] struct{}
|
||||||
|
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ func TestDrop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testDropFiltered[N int64 | float64](t *testing.T) {
|
func testDropFiltered[N int64 | float64](t *testing.T) {
|
||||||
r := DropReservoir[N]()
|
r := dropReservoir[N]()
|
||||||
|
|
||||||
var dest []exemplar.Exemplar
|
var dest []exemplar.Exemplar
|
||||||
r.Collect(&dest)
|
r.Collect(&dest)
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import (
|
|||||||
"go.opentelemetry.io/otel/internal/global"
|
"go.opentelemetry.io/otel/internal/global"
|
||||||
"go.opentelemetry.io/otel/metric"
|
"go.opentelemetry.io/otel/metric"
|
||||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||||
"go.opentelemetry.io/otel/sdk/resource"
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
@@ -2462,3 +2463,53 @@ func TestMeterProviderDelegation(t *testing.T) {
|
|||||||
otel.SetMeterProvider(provider)
|
otel.SetMeterProvider(provider)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExemplarFilter(t *testing.T) {
|
||||||
|
rdr := NewManualReader()
|
||||||
|
mp := NewMeterProvider(
|
||||||
|
WithReader(rdr),
|
||||||
|
// Passing AlwaysOnFilter causes collection of the exemplar for the
|
||||||
|
// counter increment below.
|
||||||
|
WithExemplarFilter(exemplar.AlwaysOnFilter),
|
||||||
|
)
|
||||||
|
|
||||||
|
m1 := mp.Meter("scope")
|
||||||
|
ctr1, err := m1.Float64Counter("ctr")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
ctr1.Add(context.Background(), 1.0)
|
||||||
|
|
||||||
|
want := metricdata.ResourceMetrics{
|
||||||
|
Resource: resource.Default(),
|
||||||
|
ScopeMetrics: []metricdata.ScopeMetrics{
|
||||||
|
{
|
||||||
|
Scope: instrumentation.Scope{
|
||||||
|
Name: "scope",
|
||||||
|
},
|
||||||
|
Metrics: []metricdata.Metrics{
|
||||||
|
{
|
||||||
|
Name: "ctr",
|
||||||
|
Data: metricdata.Sum[float64]{
|
||||||
|
Temporality: metricdata.CumulativeTemporality,
|
||||||
|
IsMonotonic: true,
|
||||||
|
DataPoints: []metricdata.DataPoint[float64]{
|
||||||
|
{
|
||||||
|
Value: 1.0,
|
||||||
|
Exemplars: []metricdata.Exemplar[float64]{
|
||||||
|
{
|
||||||
|
Value: 1.0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
got := metricdata.ResourceMetrics{}
|
||||||
|
err = rdr.Collect(context.Background(), &got)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
|
||||||
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
"go.opentelemetry.io/otel/metric"
|
"go.opentelemetry.io/otel/metric"
|
||||||
"go.opentelemetry.io/otel/metric/embedded"
|
"go.opentelemetry.io/otel/metric/embedded"
|
||||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/internal"
|
"go.opentelemetry.io/otel/sdk/metric/internal"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/internal/x"
|
"go.opentelemetry.io/otel/sdk/metric/internal/x"
|
||||||
@@ -38,14 +39,15 @@ type instrumentSync struct {
|
|||||||
compAgg aggregate.ComputeAggregation
|
compAgg aggregate.ComputeAggregation
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline {
|
func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline {
|
||||||
if res == nil {
|
if res == nil {
|
||||||
res = resource.Empty()
|
res = resource.Empty()
|
||||||
}
|
}
|
||||||
return &pipeline{
|
return &pipeline{
|
||||||
resource: res,
|
resource: res,
|
||||||
reader: reader,
|
reader: reader,
|
||||||
views: views,
|
views: views,
|
||||||
|
exemplarFilter: exemplarFilter,
|
||||||
// aggregations is lazy allocated when needed.
|
// aggregations is lazy allocated when needed.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -66,6 +68,7 @@ type pipeline struct {
|
|||||||
aggregations map[instrumentation.Scope][]instrumentSync
|
aggregations map[instrumentation.Scope][]instrumentSync
|
||||||
callbacks []func(context.Context) error
|
callbacks []func(context.Context) error
|
||||||
multiCallbacks list.List
|
multiCallbacks list.List
|
||||||
|
exemplarFilter exemplar.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
// addSync adds the instrumentSync to pipeline p with scope. This method is not
|
// addSync adds the instrumentSync to pipeline p with scope. This method is not
|
||||||
@@ -349,7 +352,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
|
|||||||
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
|
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
|
||||||
b := aggregate.Builder[N]{
|
b := aggregate.Builder[N]{
|
||||||
Temporality: i.pipeline.reader.temporality(kind),
|
Temporality: i.pipeline.reader.temporality(kind),
|
||||||
ReservoirFunc: reservoirFunc[N](stream.Aggregation),
|
ReservoirFunc: reservoirFunc[N](stream.Aggregation, i.pipeline.exemplarFilter),
|
||||||
}
|
}
|
||||||
b.Filter = stream.AttributeFilter
|
b.Filter = stream.AttributeFilter
|
||||||
// A value less than or equal to zero will disable the aggregation
|
// A value less than or equal to zero will disable the aggregation
|
||||||
@@ -552,10 +555,10 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
|
|||||||
// measurement.
|
// measurement.
|
||||||
type pipelines []*pipeline
|
type pipelines []*pipeline
|
||||||
|
|
||||||
func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines {
|
func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines {
|
||||||
pipes := make([]*pipeline, 0, len(readers))
|
pipes := make([]*pipeline, 0, len(readers))
|
||||||
for _, r := range readers {
|
for _, r := range readers {
|
||||||
p := newPipeline(res, r, views)
|
p := newPipeline(res, r, views, exemplarFilter)
|
||||||
r.register(p)
|
r.register(p)
|
||||||
pipes = append(pipes, p)
|
pipes = append(pipes, p)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
|
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||||
@@ -357,7 +358,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
|
|||||||
for _, tt := range testcases {
|
for _, tt := range testcases {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
var c cache[string, instID]
|
var c cache[string, instID]
|
||||||
p := newPipeline(nil, tt.reader, tt.views)
|
p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter)
|
||||||
i := newInserter[N](p, &c)
|
i := newInserter[N](p, &c)
|
||||||
readerAggregation := i.readerDefaultAggregation(tt.inst.Kind)
|
readerAggregation := i.readerDefaultAggregation(tt.inst.Kind)
|
||||||
input, err := i.Instrument(tt.inst, readerAggregation)
|
input, err := i.Instrument(tt.inst, readerAggregation)
|
||||||
@@ -379,7 +380,7 @@ func TestCreateAggregators(t *testing.T) {
|
|||||||
|
|
||||||
func testInvalidInstrumentShouldPanic[N int64 | float64]() {
|
func testInvalidInstrumentShouldPanic[N int64 | float64]() {
|
||||||
var c cache[string, instID]
|
var c cache[string, instID]
|
||||||
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c)
|
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter), &c)
|
||||||
inst := Instrument{
|
inst := Instrument{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
Kind: InstrumentKind(255),
|
Kind: InstrumentKind(255),
|
||||||
@@ -395,7 +396,7 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) {
|
|||||||
|
|
||||||
func TestPipelinesAggregatorForEachReader(t *testing.T) {
|
func TestPipelinesAggregatorForEachReader(t *testing.T) {
|
||||||
r0, r1 := NewManualReader(), NewManualReader()
|
r0, r1 := NewManualReader(), NewManualReader()
|
||||||
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil)
|
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter)
|
||||||
require.Len(t, pipes, 2, "created pipelines")
|
require.Len(t, pipes, 2, "created pipelines")
|
||||||
|
|
||||||
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
|
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
|
||||||
@@ -467,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
|
|||||||
|
|
||||||
for _, tt := range testCases {
|
for _, tt := range testCases {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
p := newPipelines(resource.Empty(), tt.readers, tt.views)
|
p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter)
|
||||||
testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount)
|
testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount)
|
||||||
testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount)
|
testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount)
|
||||||
testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount)
|
testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount)
|
||||||
@@ -521,7 +522,7 @@ func TestPipelineRegistryResource(t *testing.T) {
|
|||||||
readers := []Reader{NewManualReader()}
|
readers := []Reader{NewManualReader()}
|
||||||
views := []View{defaultView, v}
|
views := []View{defaultView, v}
|
||||||
res := resource.NewSchemaless(attribute.String("key", "val"))
|
res := resource.NewSchemaless(attribute.String("key", "val"))
|
||||||
pipes := newPipelines(res, readers, views)
|
pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter)
|
||||||
for _, p := range pipes {
|
for _, p := range pipes {
|
||||||
assert.True(t, res.Equal(p.resource), "resource not set")
|
assert.True(t, res.Equal(p.resource), "resource not set")
|
||||||
}
|
}
|
||||||
@@ -532,7 +533,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
|
|||||||
|
|
||||||
readers := []Reader{testRdrHistogram}
|
readers := []Reader{testRdrHistogram}
|
||||||
views := []View{defaultView}
|
views := []View{defaultView}
|
||||||
p := newPipelines(resource.Empty(), readers, views)
|
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter)
|
||||||
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}
|
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}
|
||||||
|
|
||||||
var vc cache[string, instID]
|
var vc cache[string, instID]
|
||||||
@@ -592,7 +593,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {
|
|||||||
fooInst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
|
fooInst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
|
||||||
barInst := Instrument{Name: "bar", Kind: InstrumentKindCounter}
|
barInst := Instrument{Name: "bar", Kind: InstrumentKindCounter}
|
||||||
|
|
||||||
p := newPipelines(resource.Empty(), readers, views)
|
p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter)
|
||||||
|
|
||||||
var vc cache[string, instID]
|
var vc cache[string, instID]
|
||||||
ri := newResolver[int64](p, &vc)
|
ri := newResolver[int64](p, &vc)
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import (
|
|||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
"go.opentelemetry.io/otel/metric"
|
"go.opentelemetry.io/otel/metric"
|
||||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||||
|
"go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||||
"go.opentelemetry.io/otel/sdk/resource"
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
@@ -39,7 +40,7 @@ func testSumAggregateOutput(dest *metricdata.Aggregation) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestNewPipeline(t *testing.T) {
|
func TestNewPipeline(t *testing.T) {
|
||||||
pipe := newPipeline(nil, nil, nil)
|
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)
|
||||||
|
|
||||||
output := metricdata.ResourceMetrics{}
|
output := metricdata.ResourceMetrics{}
|
||||||
err := pipe.produce(context.Background(), &output)
|
err := pipe.produce(context.Background(), &output)
|
||||||
@@ -65,7 +66,7 @@ func TestNewPipeline(t *testing.T) {
|
|||||||
|
|
||||||
func TestPipelineUsesResource(t *testing.T) {
|
func TestPipelineUsesResource(t *testing.T) {
|
||||||
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
|
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
|
||||||
pipe := newPipeline(res, nil, nil)
|
pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter)
|
||||||
|
|
||||||
output := metricdata.ResourceMetrics{}
|
output := metricdata.ResourceMetrics{}
|
||||||
err := pipe.produce(context.Background(), &output)
|
err := pipe.produce(context.Background(), &output)
|
||||||
@@ -74,7 +75,7 @@ func TestPipelineUsesResource(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPipelineConcurrentSafe(t *testing.T) {
|
func TestPipelineConcurrentSafe(t *testing.T) {
|
||||||
pipe := newPipeline(nil, nil, nil)
|
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
var output metricdata.ResourceMetrics
|
var output metricdata.ResourceMetrics
|
||||||
|
|
||||||
@@ -124,13 +125,13 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "NoView",
|
name: "NoView",
|
||||||
pipe: newPipeline(nil, reader, nil),
|
pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "NoMatchingView",
|
name: "NoMatchingView",
|
||||||
pipe: newPipeline(nil, reader, []View{
|
pipe: newPipeline(nil, reader, []View{
|
||||||
NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}),
|
NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}),
|
||||||
}),
|
}, exemplar.AlwaysOffFilter),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -215,7 +216,7 @@ func TestLogConflictName(t *testing.T) {
|
|||||||
return instID{Name: tc.existing}
|
return instID{Name: tc.existing}
|
||||||
})
|
})
|
||||||
|
|
||||||
i := newInserter[int64](newPipeline(nil, nil, nil), &vc)
|
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)
|
||||||
i.logConflict(instID{Name: tc.name})
|
i.logConflict(instID{Name: tc.name})
|
||||||
|
|
||||||
if tc.conflict {
|
if tc.conflict {
|
||||||
@@ -257,7 +258,7 @@ func TestLogConflictSuggestView(t *testing.T) {
|
|||||||
var vc cache[string, instID]
|
var vc cache[string, instID]
|
||||||
name := strings.ToLower(orig.Name)
|
name := strings.ToLower(orig.Name)
|
||||||
_ = vc.Lookup(name, func() instID { return orig })
|
_ = vc.Lookup(name, func() instID { return orig })
|
||||||
i := newInserter[int64](newPipeline(nil, nil, nil), &vc)
|
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)
|
||||||
|
|
||||||
viewSuggestion := func(inst instID, stream string) string {
|
viewSuggestion := func(inst instID, stream string) string {
|
||||||
return `"NewView(Instrument{` +
|
return `"NewView(Instrument{` +
|
||||||
@@ -362,7 +363,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var vc cache[string, instID]
|
var vc cache[string, instID]
|
||||||
pipe := newPipeline(nil, NewManualReader(), nil)
|
pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter)
|
||||||
i := newInserter[int64](pipe, &vc)
|
i := newInserter[int64](pipe, &vc)
|
||||||
|
|
||||||
readerAggregation := i.readerDefaultAggregation(kind)
|
readerAggregation := i.readerDefaultAggregation(kind)
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ func NewMeterProvider(options ...Option) *MeterProvider {
|
|||||||
flush, sdown := conf.readerSignals()
|
flush, sdown := conf.readerSignals()
|
||||||
|
|
||||||
mp := &MeterProvider{
|
mp := &MeterProvider{
|
||||||
pipes: newPipelines(conf.res, conf.readers, conf.views),
|
pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter),
|
||||||
forceFlush: flush,
|
forceFlush: flush,
|
||||||
shutdown: sdown,
|
shutdown: sdown,
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user