1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-07 23:02:15 +02:00
opentelemetry-go/sdk/metric/pipeline_test.go
David Ashpole 81b2a33e1b
Add selector of exemplar reservoir providers to metric.Stream configuration (#5861)
Resolve https://github.com/open-telemetry/opentelemetry-go/issues/5249

### Spec

> exemplar_reservoir: A functional type that generates an exemplar
reservoir a MeterProvider will use when storing exemplars. This
functional type needs to be a factory or callback similar to aggregation
selection functionality which allows different reservoirs to be chosen
by the aggregation.

> Users can provide an exemplar_reservoir, but it is up to their
discretion. Therefore, the stream configuration parameter needs to be
structured to accept an exemplar_reservoir, but MUST NOT obligate a user
to provide one. If the user does not provide an exemplar_reservoir
value, the MeterProvider MUST apply a [default exemplar
reservoir](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exemplar-defaults).

Also,

> the reservoir MUST be given the Attributes associated with its
timeseries point either at construction so that additional sampling
performed by the reservoir has access to all attributes from a
measurement in the "offer" method.

### Changes

In sdk/metric/exemplar, add:
* `exemplar.ReservoirProvider`
* `exemplar.HistogramReservoirProvider`
* `exemplar.FixedSizeReservoirProvider`

In sdk/metric, add:
* `metric.ExemplarReservoirProviderSelector` (func Aggregation ->
ReservoirProvider)
* `metric.DefaultExemplarReservoirProviderSelector` (our default
implementation)
* `ExemplarReservoirProviderSelector` added to `metric.Stream`

Note: the only required public types are
`metric.ExemplarReservoirProviderSelector` and
`ExemplarReservoirProviderSelector` in `metric.Stream`. Others are for
convenience and readability.

### Alternatives considered

#### Add ExemplarReservoirProvider directly to metric.Stream, instead of
ExemplarReservoirProviderSelector

This would mean users can configure a `func() exemplar.Reservoir`
instead of a `func(Aggregation) func() exemplar.Reservoir`.

I don't think this complies with the statement: `This functional type
needs to be a factory or callback similar to aggregation selection
functionality which allows different reservoirs to be chosen by the
aggregation.`. I'm interpreting "allows different reservoirs to be
chosen by the aggregation" as meaning "allows different reservoirs to be
chosen **based on the** aggregation", rather than meaning that the
aggregation is somehow choosing the reservoir.

### Future work

There is some refactoring I plan to do after this to simplify the
interaction between the internal/aggregate and exemplar package. I've
omitted that from this PR to keep the diff smaller.

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Robert Pająk <pellared@hotmail.com>
2024-10-18 09:05:10 -04:00

521 lines
14 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
"fmt"
"log"
"os"
"runtime"
"strings"
"sync"
"testing"
"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
"github.com/go-logr/stdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
)
func testSumAggregateOutput(dest *metricdata.Aggregation) int {
*dest = metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{{Value: 1}},
}
return 1
}
func TestNewPipeline(t *testing.T) {
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
assert.Empty(t, output.ScopeMetrics)
iSync := instrumentSync{"name", "desc", "1", testSumAggregateOutput}
assert.NotPanics(t, func() {
pipe.addSync(instrumentation.Scope{}, iSync)
})
require.NotPanics(t, func() {
pipe.addMultiCallback(func(context.Context) error { return nil })
})
err = pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
require.Len(t, output.ScopeMetrics, 1)
require.Len(t, output.ScopeMetrics[0].Metrics, 1)
}
func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter)
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
assert.NoError(t, err)
assert.Equal(t, res, output.Resource)
}
func TestPipelineConcurrentSafe(t *testing.T) {
pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter)
ctx := context.Background()
var output metricdata.ResourceMetrics
var wg sync.WaitGroup
const threads = 2
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = pipe.produce(ctx, &output)
}()
wg.Add(1)
go func(n int) {
defer wg.Done()
name := fmt.Sprintf("name %d", n)
sync := instrumentSync{name, "desc", "1", testSumAggregateOutput}
pipe.addSync(instrumentation.Scope{}, sync)
}(i)
wg.Add(1)
go func() {
defer wg.Done()
pipe.addMultiCallback(func(context.Context) error { return nil })
}()
}
wg.Wait()
}
func TestDefaultViewImplicit(t *testing.T) {
t.Run("Int64", testDefaultViewImplicit[int64]())
t.Run("Float64", testDefaultViewImplicit[float64]())
}
func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
inst := Instrument{
Name: "requests",
Description: "count of requests received",
Kind: InstrumentKindCounter,
Unit: "1",
}
return func(t *testing.T) {
reader := NewManualReader()
tests := []struct {
name string
pipe *pipeline
}{
{
name: "NoView",
pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter),
},
{
name: "NoMatchingView",
pipe: newPipeline(nil, reader, []View{
NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}),
}, exemplar.AlwaysOffFilter),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var c cache[string, instID]
i := newInserter[N](test.pipe, &c)
readerAggregation := i.readerDefaultAggregation(inst.Kind)
got, err := i.Instrument(inst, readerAggregation)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")
for _, in := range got {
in(context.Background(), 1, *attribute.EmptySet())
}
out := metricdata.ResourceMetrics{}
err = test.pipe.produce(context.Background(), &out)
require.NoError(t, err)
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
sm := out.ScopeMetrics[0]
require.Len(t, sm.Metrics, 1, "metrics not produced from default view")
metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: inst.Name,
Description: inst.Description,
Unit: "1",
Data: metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[N]{{Value: N(1)}},
},
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())
})
}
}
}
func TestLogConflictName(t *testing.T) {
testcases := []struct {
existing, name string
conflict bool
}{
{
existing: "requestCount",
name: "requestCount",
conflict: false,
},
{
existing: "requestCount",
name: "requestDuration",
conflict: false,
},
{
existing: "requestCount",
name: "requestcount",
conflict: true,
},
{
existing: "requestCount",
name: "REQUESTCOUNT",
conflict: true,
},
{
existing: "requestCount",
name: "rEqUeStCoUnT",
conflict: true,
},
}
var msg string
t.Cleanup(func(orig logr.Logger) func() {
otel.SetLogger(funcr.New(func(_, args string) {
msg = args
}, funcr.Options{Verbosity: 20}))
return func() { otel.SetLogger(orig) }
}(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile))))
for _, tc := range testcases {
var vc cache[string, instID]
name := strings.ToLower(tc.existing)
_ = vc.Lookup(name, func() instID {
return instID{Name: tc.existing}
})
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)
i.logConflict(instID{Name: tc.name})
if tc.conflict {
assert.Containsf(
t, msg, "duplicate metric stream definitions",
"warning not logged for conflicting names: %s, %s",
tc.existing, tc.name,
)
} else {
assert.Equalf(
t, "", msg,
"warning logged for non-conflicting names: %s, %s",
tc.existing, tc.name,
)
}
// Reset.
msg = ""
}
}
func TestLogConflictSuggestView(t *testing.T) {
var msg string
t.Cleanup(func(orig logr.Logger) func() {
otel.SetLogger(funcr.New(func(_, args string) {
msg = args
}, funcr.Options{Verbosity: 20}))
return func() { otel.SetLogger(orig) }
}(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile))))
orig := instID{
Name: "requestCount",
Description: "number of requests",
Kind: InstrumentKindCounter,
Unit: "1",
Number: "int64",
}
var vc cache[string, instID]
name := strings.ToLower(orig.Name)
_ = vc.Lookup(name, func() instID { return orig })
i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc)
viewSuggestion := func(inst instID, stream string) string {
return `"NewView(Instrument{` +
`Name: \"` + inst.Name +
`\", Description: \"` + inst.Description +
`\", Kind: \"InstrumentKind` + inst.Kind.String() +
`\", Unit: \"` + inst.Unit +
`\"}, ` +
stream +
`)"`
}
t.Run("Name", func(t *testing.T) {
inst := instID{
Name: "requestcount",
Description: orig.Description,
Kind: orig.Kind,
Unit: orig.Unit,
Number: orig.Number,
}
i.logConflict(inst)
assert.Containsf(t, msg, viewSuggestion(
inst, `Stream{Name: \"{{NEW_NAME}}\"}`,
), "no suggestion logged: %v", inst)
// Reset.
msg = ""
})
t.Run("Description", func(t *testing.T) {
inst := instID{
Name: orig.Name,
Description: "alt",
Kind: orig.Kind,
Unit: orig.Unit,
Number: orig.Number,
}
i.logConflict(inst)
assert.Containsf(t, msg, viewSuggestion(
inst, `Stream{Description: \"`+orig.Description+`\"}`,
), "no suggestion logged: %v", inst)
// Reset.
msg = ""
})
t.Run("Kind", func(t *testing.T) {
inst := instID{
Name: orig.Name,
Description: orig.Description,
Kind: InstrumentKindHistogram,
Unit: orig.Unit,
Number: orig.Number,
}
i.logConflict(inst)
assert.Containsf(t, msg, viewSuggestion(
inst, `Stream{Name: \"{{NEW_NAME}}\"}`,
), "no suggestion logged: %v", inst)
// Reset.
msg = ""
})
t.Run("Unit", func(t *testing.T) {
inst := instID{
Name: orig.Name,
Description: orig.Description,
Kind: orig.Kind,
Unit: "ms",
Number: orig.Number,
}
i.logConflict(inst)
assert.NotContains(t, msg, "NewView", "suggestion logged: %v", inst)
// Reset.
msg = ""
})
t.Run("Number", func(t *testing.T) {
inst := instID{
Name: orig.Name,
Description: orig.Description,
Kind: orig.Kind,
Unit: orig.Unit,
Number: "float64",
}
i.logConflict(inst)
assert.NotContains(t, msg, "NewView", "suggestion logged: %v", inst)
// Reset.
msg = ""
})
}
func TestInserterCachedAggregatorNameConflict(t *testing.T) {
const name = "requestCount"
scope := instrumentation.Scope{Name: "pipeline_test"}
kind := InstrumentKindCounter
stream := Stream{
Name: name,
Aggregation: AggregationSum{},
}
var vc cache[string, instID]
pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter)
i := newInserter[int64](pipe, &vc)
readerAggregation := i.readerDefaultAggregation(kind)
_, origID, err := i.cachedAggregator(scope, kind, stream, readerAggregation)
require.NoError(t, err)
require.Len(t, pipe.aggregations, 1)
require.Contains(t, pipe.aggregations, scope)
iSync := pipe.aggregations[scope]
require.Len(t, iSync, 1)
require.Equal(t, name, iSync[0].name)
stream.Name = "RequestCount"
_, id, err := i.cachedAggregator(scope, kind, stream, readerAggregation)
require.NoError(t, err)
assert.Equal(t, origID, id, "multiple aggregators for equivalent name")
assert.Len(t, pipe.aggregations, 1, "additional scope added")
require.Contains(t, pipe.aggregations, scope, "original scope removed")
iSync = pipe.aggregations[scope]
require.Len(t, iSync, 1, "registered instrumentSync changed")
assert.Equal(t, name, iSync[0].name, "stream name changed")
}
func TestExemplars(t *testing.T) {
nCPU := runtime.NumCPU()
setup := func(name string) (metric.Meter, Reader) {
r := NewManualReader()
v := NewView(Instrument{Name: "int64-expo-histogram"}, Stream{
Aggregation: AggregationBase2ExponentialHistogram{
MaxSize: 160, // > 20, reservoir size should default to 20.
MaxScale: 20,
},
})
return NewMeterProvider(WithReader(r), WithView(v)).Meter(name), r
}
measure := func(ctx context.Context, m metric.Meter) {
i, err := m.Int64Counter("int64-counter")
require.NoError(t, err)
h, err := m.Int64Histogram("int64-histogram")
require.NoError(t, err)
e, err := m.Int64Histogram("int64-expo-histogram")
require.NoError(t, err)
for j := 0; j < 20*nCPU; j++ { // will be >= 20 and > nCPU
i.Add(ctx, 1)
h.Record(ctx, 1)
e.Record(ctx, 1)
}
}
check := func(t *testing.T, r Reader, nSum, nHist, nExpo int) {
t.Helper()
rm := new(metricdata.ResourceMetrics)
require.NoError(t, r.Collect(context.Background(), rm))
require.Len(t, rm.ScopeMetrics, 1, "ScopeMetrics")
sm := rm.ScopeMetrics[0]
require.Len(t, sm.Metrics, 3, "Metrics")
require.IsType(t, metricdata.Sum[int64]{}, sm.Metrics[0].Data, sm.Metrics[0].Name)
sum := sm.Metrics[0].Data.(metricdata.Sum[int64])
assert.Len(t, sum.DataPoints[0].Exemplars, nSum)
require.IsType(t, metricdata.Histogram[int64]{}, sm.Metrics[1].Data, sm.Metrics[1].Name)
hist := sm.Metrics[1].Data.(metricdata.Histogram[int64])
assert.Len(t, hist.DataPoints[0].Exemplars, nHist)
require.IsType(t, metricdata.ExponentialHistogram[int64]{}, sm.Metrics[2].Data, sm.Metrics[2].Name)
expo := sm.Metrics[2].Data.(metricdata.ExponentialHistogram[int64])
assert.Len(t, expo.DataPoints[0].Exemplars, nExpo)
}
ctx := context.Background()
sc := trace.NewSpanContext(trace.SpanContextConfig{
SpanID: trace.SpanID{0o1},
TraceID: trace.TraceID{0o1},
TraceFlags: trace.FlagsSampled,
})
sampled := trace.ContextWithSpanContext(context.Background(), sc)
t.Run("Default", func(t *testing.T) {
m, r := setup("default")
measure(ctx, m)
check(t, r, 0, 0, 0)
measure(sampled, m)
check(t, r, nCPU, 1, 20)
})
t.Run("Invalid", func(t *testing.T) {
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "unrecognized")
m, r := setup("default")
measure(ctx, m)
check(t, r, 0, 0, 0)
measure(sampled, m)
check(t, r, nCPU, 1, 20)
})
t.Run("always_on", func(t *testing.T) {
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_on")
m, r := setup("always_on")
measure(ctx, m)
check(t, r, nCPU, 1, 20)
})
t.Run("always_off", func(t *testing.T) {
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_off")
m, r := setup("always_off")
measure(ctx, m)
check(t, r, 0, 0, 0)
})
t.Run("trace_based", func(t *testing.T) {
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "trace_based")
m, r := setup("trace_based")
measure(ctx, m)
check(t, r, 0, 0, 0)
measure(sampled, m)
check(t, r, nCPU, 1, 20)
})
t.Run("Custom reservoir", func(t *testing.T) {
r := NewManualReader()
reservoirProviderSelector := func(agg Aggregation) exemplar.ReservoirProvider {
return exemplar.FixedSizeReservoirProvider(2)
}
v1 := NewView(Instrument{Name: "int64-expo-histogram"}, Stream{
Aggregation: AggregationBase2ExponentialHistogram{
MaxSize: 160, // > 20, reservoir size should default to 20.
MaxScale: 20,
},
ExemplarReservoirProviderSelector: reservoirProviderSelector,
})
v2 := NewView(Instrument{Name: "int64-counter"}, Stream{
ExemplarReservoirProviderSelector: reservoirProviderSelector,
})
v3 := NewView(Instrument{Name: "int64-histogram"}, Stream{
ExemplarReservoirProviderSelector: reservoirProviderSelector,
})
m := NewMeterProvider(WithReader(r), WithView(v1, v2, v3)).Meter("custom-reservoir")
measure(ctx, m)
check(t, r, 0, 0, 0)
measure(sampled, m)
check(t, r, 2, 2, 2)
})
}