You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-11-29 23:07:45 +02:00
Add tests for malformed selectors in readers (#4350)
This commit is contained in:
@@ -44,6 +44,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317)
|
||||
- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337)
|
||||
- Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338)
|
||||
- The `ManualReader` will not panic if `AggregationSelector` returns `nil`. (#4350)
|
||||
- If a Reader's AggregationSelector return nil or DefaultAggregation the pipeline will use the default aggregation. (#4350)
|
||||
- Log a suggested view that fixes instrument conflicts in `go.opentelemetry.io/otel/sdk/metric`. (#4349)
|
||||
- Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353)
|
||||
|
||||
|
||||
@@ -227,6 +227,9 @@ func WithAggregationSelector(selector AggregationSelector) ManualReaderOption {
|
||||
// Deep copy and validate before using.
|
||||
wrapped := func(ik InstrumentKind) aggregation.Aggregation {
|
||||
a := selector(ik)
|
||||
if a == nil {
|
||||
return nil
|
||||
}
|
||||
cpA := a.Copy()
|
||||
if err := cpA.Err(); err != nil {
|
||||
cpA = DefaultAggregationSelector(ik)
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregation"
|
||||
@@ -1811,3 +1812,148 @@ func BenchmarkInstrumentCreation(b *testing.B) {
|
||||
sfHistogram, _ = meter.Float64Histogram("sync.float64.histogram")
|
||||
}
|
||||
}
|
||||
|
||||
func testNilAggregationSelector(InstrumentKind) aggregation.Aggregation {
|
||||
return nil
|
||||
}
|
||||
func testDefaultAggregationSelector(InstrumentKind) aggregation.Aggregation {
|
||||
return aggregation.Default{}
|
||||
}
|
||||
func testUndefinedTemporalitySelector(InstrumentKind) metricdata.Temporality {
|
||||
return metricdata.Temporality(0)
|
||||
}
|
||||
func testInvalidTemporalitySelector(InstrumentKind) metricdata.Temporality {
|
||||
return metricdata.Temporality(255)
|
||||
}
|
||||
|
||||
type noErrorHandler struct {
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (h noErrorHandler) Handle(err error) {
|
||||
assert.NoError(h.t, err)
|
||||
}
|
||||
|
||||
func TestMalformedSelectors(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
reader Reader
|
||||
}
|
||||
testCases := []testCase{
|
||||
{
|
||||
name: "nil aggregation selector",
|
||||
reader: NewManualReader(WithAggregationSelector(testNilAggregationSelector)),
|
||||
},
|
||||
{
|
||||
name: "nil aggregation selector periodic",
|
||||
reader: NewPeriodicReader(&fnExporter{aggregationFunc: testNilAggregationSelector}),
|
||||
},
|
||||
{
|
||||
name: "default aggregation selector",
|
||||
reader: NewManualReader(WithAggregationSelector(testDefaultAggregationSelector)),
|
||||
},
|
||||
{
|
||||
name: "default aggregation selector periodic",
|
||||
reader: NewPeriodicReader(&fnExporter{aggregationFunc: testDefaultAggregationSelector}),
|
||||
},
|
||||
{
|
||||
name: "undefined temporality selector",
|
||||
reader: NewManualReader(WithTemporalitySelector(testUndefinedTemporalitySelector)),
|
||||
},
|
||||
{
|
||||
name: "undefined temporality selector periodic",
|
||||
reader: NewPeriodicReader(&fnExporter{temporalityFunc: testUndefinedTemporalitySelector}),
|
||||
},
|
||||
{
|
||||
name: "invalid temporality selector",
|
||||
reader: NewManualReader(WithTemporalitySelector(testInvalidTemporalitySelector)),
|
||||
},
|
||||
{
|
||||
name: "invalid temporality selector periodic",
|
||||
reader: NewPeriodicReader(&fnExporter{temporalityFunc: testInvalidTemporalitySelector}),
|
||||
},
|
||||
{
|
||||
name: "both aggregation and temporality selector",
|
||||
reader: NewManualReader(
|
||||
WithAggregationSelector(testNilAggregationSelector),
|
||||
WithTemporalitySelector(testUndefinedTemporalitySelector),
|
||||
),
|
||||
},
|
||||
{
|
||||
name: "both aggregation and temporality selector periodic",
|
||||
reader: NewPeriodicReader(&fnExporter{
|
||||
aggregationFunc: testNilAggregationSelector,
|
||||
temporalityFunc: testUndefinedTemporalitySelector,
|
||||
}),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
origErrorHandler := global.GetErrorHandler()
|
||||
defer global.SetErrorHandler(origErrorHandler)
|
||||
global.SetErrorHandler(noErrorHandler{t})
|
||||
|
||||
defer func() {
|
||||
_ = tt.reader.Shutdown(context.Background())
|
||||
}()
|
||||
|
||||
meter := NewMeterProvider(WithReader(tt.reader)).Meter("TestNilAggregationSelector")
|
||||
|
||||
// Create All instruments, they should not error
|
||||
aiCounter, err := meter.Int64ObservableCounter("observable.int64.counter")
|
||||
require.NoError(t, err)
|
||||
aiUpDownCounter, err := meter.Int64ObservableUpDownCounter("observable.int64.up.down.counter")
|
||||
require.NoError(t, err)
|
||||
aiGauge, err := meter.Int64ObservableGauge("observable.int64.gauge")
|
||||
require.NoError(t, err)
|
||||
|
||||
afCounter, err := meter.Float64ObservableCounter("observable.float64.counter")
|
||||
require.NoError(t, err)
|
||||
afUpDownCounter, err := meter.Float64ObservableUpDownCounter("observable.float64.up.down.counter")
|
||||
require.NoError(t, err)
|
||||
afGauge, err := meter.Float64ObservableGauge("observable.float64.gauge")
|
||||
require.NoError(t, err)
|
||||
|
||||
siCounter, err := meter.Int64Counter("sync.int64.counter")
|
||||
require.NoError(t, err)
|
||||
siUpDownCounter, err := meter.Int64UpDownCounter("sync.int64.up.down.counter")
|
||||
require.NoError(t, err)
|
||||
siHistogram, err := meter.Int64Histogram("sync.int64.histogram")
|
||||
require.NoError(t, err)
|
||||
|
||||
sfCounter, err := meter.Float64Counter("sync.float64.counter")
|
||||
require.NoError(t, err)
|
||||
sfUpDownCounter, err := meter.Float64UpDownCounter("sync.float64.up.down.counter")
|
||||
require.NoError(t, err)
|
||||
sfHistogram, err := meter.Float64Histogram("sync.float64.histogram")
|
||||
require.NoError(t, err)
|
||||
|
||||
callback := func(ctx context.Context, obs metric.Observer) error {
|
||||
obs.ObserveInt64(aiCounter, 1)
|
||||
obs.ObserveInt64(aiUpDownCounter, 1)
|
||||
obs.ObserveInt64(aiGauge, 1)
|
||||
obs.ObserveFloat64(afCounter, 1)
|
||||
obs.ObserveFloat64(afUpDownCounter, 1)
|
||||
obs.ObserveFloat64(afGauge, 1)
|
||||
return nil
|
||||
}
|
||||
_, err = meter.RegisterCallback(callback, aiCounter, aiUpDownCounter, aiGauge, afCounter, afUpDownCounter, afGauge)
|
||||
require.NoError(t, err)
|
||||
|
||||
siCounter.Add(context.Background(), 1)
|
||||
siUpDownCounter.Add(context.Background(), 1)
|
||||
siHistogram.Record(context.Background(), 1)
|
||||
sfCounter.Add(context.Background(), 1)
|
||||
sfUpDownCounter.Add(context.Background(), 1)
|
||||
sfHistogram.Record(context.Background(), 1)
|
||||
|
||||
var rm metricdata.ResourceMetrics
|
||||
err = tt.reader.Collect(context.Background(), &rm)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, rm.ScopeMetrics, 1)
|
||||
require.Len(t, rm.ScopeMetrics[0].Metrics, 12)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -311,6 +311,11 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
|
||||
case nil, aggregation.Default:
|
||||
// Undefined, nil, means to use the default from the reader.
|
||||
stream.Aggregation = i.pipeline.reader.aggregation(kind)
|
||||
switch stream.Aggregation.(type) {
|
||||
case nil, aggregation.Default:
|
||||
// If the reader returns default or nil use the default selector.
|
||||
stream.Aggregation = DefaultAggregationSelector(kind)
|
||||
}
|
||||
}
|
||||
|
||||
if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
|
||||
|
||||
@@ -145,6 +145,9 @@ func DefaultTemporalitySelector(InstrumentKind) metricdata.Temporality {
|
||||
|
||||
// AggregationSelector selects the aggregation and the parameters to use for
|
||||
// that aggregation based on the InstrumentKind.
|
||||
//
|
||||
// If the Aggregation returned is nil or DefaultAggregation, the selection from
|
||||
// DefaultAggregationSelector will be used.
|
||||
type AggregationSelector func(InstrumentKind) aggregation.Aggregation
|
||||
|
||||
// DefaultAggregationSelector returns the default aggregation and parameters
|
||||
|
||||
Reference in New Issue
Block a user