mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-04-27 12:12:22 +02:00
Part of https://github.com/open-telemetry/opentelemetry-go/issues/5249. Addresses https://github.com/open-telemetry/opentelemetry-go/issues/5249#issuecomment-2088725450 This removes handling of the `OTEL_GO_X_EXEMPLAR` environment variable. Instead of changing the default for the existing environment variable to enable it by default, i'm just removing it entirely. Users can still disable the feature by setting the filter to always_off. Since we will continue to support that configuration, it seems better to direct users there, rather than give them a temporary equivalent.
494 lines
13 KiB
Go
494 lines
13 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/go-logr/logr"
|
|
"github.com/go-logr/logr/funcr"
|
|
"github.com/go-logr/stdr"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.opentelemetry.io/otel/sdk/instrumentation"
|
|
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
|
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
func testSumAggregateOutput(dest *metricdata.Aggregation) int {
|
|
*dest = metricdata.Sum[int64]{
|
|
Temporality: metricdata.CumulativeTemporality,
|
|
IsMonotonic: false,
|
|
DataPoints: []metricdata.DataPoint[int64]{{Value: 1}},
|
|
}
|
|
return 1
|
|
}
|
|
|
|
func TestNewPipeline(t *testing.T) {
|
|
pipe := newPipeline(nil, nil, nil)
|
|
|
|
output := metricdata.ResourceMetrics{}
|
|
err := pipe.produce(context.Background(), &output)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, resource.Empty(), output.Resource)
|
|
assert.Len(t, output.ScopeMetrics, 0)
|
|
|
|
iSync := instrumentSync{"name", "desc", "1", testSumAggregateOutput}
|
|
assert.NotPanics(t, func() {
|
|
pipe.addSync(instrumentation.Scope{}, iSync)
|
|
})
|
|
|
|
require.NotPanics(t, func() {
|
|
pipe.addMultiCallback(func(context.Context) error { return nil })
|
|
})
|
|
|
|
err = pipe.produce(context.Background(), &output)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, resource.Empty(), output.Resource)
|
|
require.Len(t, output.ScopeMetrics, 1)
|
|
require.Len(t, output.ScopeMetrics[0].Metrics, 1)
|
|
}
|
|
|
|
func TestPipelineUsesResource(t *testing.T) {
|
|
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
|
|
pipe := newPipeline(res, nil, nil)
|
|
|
|
output := metricdata.ResourceMetrics{}
|
|
err := pipe.produce(context.Background(), &output)
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, res, output.Resource)
|
|
}
|
|
|
|
func TestPipelineConcurrentSafe(t *testing.T) {
|
|
pipe := newPipeline(nil, nil, nil)
|
|
ctx := context.Background()
|
|
var output metricdata.ResourceMetrics
|
|
|
|
var wg sync.WaitGroup
|
|
const threads = 2
|
|
for i := 0; i < threads; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
_ = pipe.produce(ctx, &output)
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func(n int) {
|
|
defer wg.Done()
|
|
name := fmt.Sprintf("name %d", n)
|
|
sync := instrumentSync{name, "desc", "1", testSumAggregateOutput}
|
|
pipe.addSync(instrumentation.Scope{}, sync)
|
|
}(i)
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
pipe.addMultiCallback(func(context.Context) error { return nil })
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func TestDefaultViewImplicit(t *testing.T) {
|
|
t.Run("Int64", testDefaultViewImplicit[int64]())
|
|
t.Run("Float64", testDefaultViewImplicit[float64]())
|
|
}
|
|
|
|
func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
|
|
inst := Instrument{
|
|
Name: "requests",
|
|
Description: "count of requests received",
|
|
Kind: InstrumentKindCounter,
|
|
Unit: "1",
|
|
}
|
|
return func(t *testing.T) {
|
|
reader := NewManualReader()
|
|
tests := []struct {
|
|
name string
|
|
pipe *pipeline
|
|
}{
|
|
{
|
|
name: "NoView",
|
|
pipe: newPipeline(nil, reader, nil),
|
|
},
|
|
{
|
|
name: "NoMatchingView",
|
|
pipe: newPipeline(nil, reader, []View{
|
|
NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}),
|
|
}),
|
|
},
|
|
}
|
|
|
|
for _, test := range tests {
|
|
t.Run(test.name, func(t *testing.T) {
|
|
var c cache[string, instID]
|
|
i := newInserter[N](test.pipe, &c)
|
|
readerAggregation := i.readerDefaultAggregation(inst.Kind)
|
|
got, err := i.Instrument(inst, readerAggregation)
|
|
require.NoError(t, err)
|
|
assert.Len(t, got, 1, "default view not applied")
|
|
for _, in := range got {
|
|
in(context.Background(), 1, *attribute.EmptySet())
|
|
}
|
|
|
|
out := metricdata.ResourceMetrics{}
|
|
err = test.pipe.produce(context.Background(), &out)
|
|
require.NoError(t, err)
|
|
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
|
|
sm := out.ScopeMetrics[0]
|
|
require.Len(t, sm.Metrics, 1, "metrics not produced from default view")
|
|
metricdatatest.AssertEqual(t, metricdata.Metrics{
|
|
Name: inst.Name,
|
|
Description: inst.Description,
|
|
Unit: "1",
|
|
Data: metricdata.Sum[N]{
|
|
Temporality: metricdata.CumulativeTemporality,
|
|
IsMonotonic: true,
|
|
DataPoints: []metricdata.DataPoint[N]{{Value: N(1)}},
|
|
},
|
|
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestLogConflictName(t *testing.T) {
|
|
testcases := []struct {
|
|
existing, name string
|
|
conflict bool
|
|
}{
|
|
{
|
|
existing: "requestCount",
|
|
name: "requestCount",
|
|
conflict: false,
|
|
},
|
|
{
|
|
existing: "requestCount",
|
|
name: "requestDuration",
|
|
conflict: false,
|
|
},
|
|
{
|
|
existing: "requestCount",
|
|
name: "requestcount",
|
|
conflict: true,
|
|
},
|
|
{
|
|
existing: "requestCount",
|
|
name: "REQUESTCOUNT",
|
|
conflict: true,
|
|
},
|
|
{
|
|
existing: "requestCount",
|
|
name: "rEqUeStCoUnT",
|
|
conflict: true,
|
|
},
|
|
}
|
|
|
|
var msg string
|
|
t.Cleanup(func(orig logr.Logger) func() {
|
|
otel.SetLogger(funcr.New(func(_, args string) {
|
|
msg = args
|
|
}, funcr.Options{Verbosity: 20}))
|
|
return func() { otel.SetLogger(orig) }
|
|
}(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile))))
|
|
|
|
for _, tc := range testcases {
|
|
var vc cache[string, instID]
|
|
|
|
name := strings.ToLower(tc.existing)
|
|
_ = vc.Lookup(name, func() instID {
|
|
return instID{Name: tc.existing}
|
|
})
|
|
|
|
i := newInserter[int64](newPipeline(nil, nil, nil), &vc)
|
|
i.logConflict(instID{Name: tc.name})
|
|
|
|
if tc.conflict {
|
|
assert.Containsf(
|
|
t, msg, "duplicate metric stream definitions",
|
|
"warning not logged for conflicting names: %s, %s",
|
|
tc.existing, tc.name,
|
|
)
|
|
} else {
|
|
assert.Equalf(
|
|
t, msg, "",
|
|
"warning logged for non-conflicting names: %s, %s",
|
|
tc.existing, tc.name,
|
|
)
|
|
}
|
|
|
|
// Reset.
|
|
msg = ""
|
|
}
|
|
}
|
|
|
|
func TestLogConflictSuggestView(t *testing.T) {
|
|
var msg string
|
|
t.Cleanup(func(orig logr.Logger) func() {
|
|
otel.SetLogger(funcr.New(func(_, args string) {
|
|
msg = args
|
|
}, funcr.Options{Verbosity: 20}))
|
|
return func() { otel.SetLogger(orig) }
|
|
}(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile))))
|
|
|
|
orig := instID{
|
|
Name: "requestCount",
|
|
Description: "number of requests",
|
|
Kind: InstrumentKindCounter,
|
|
Unit: "1",
|
|
Number: "int64",
|
|
}
|
|
|
|
var vc cache[string, instID]
|
|
name := strings.ToLower(orig.Name)
|
|
_ = vc.Lookup(name, func() instID { return orig })
|
|
i := newInserter[int64](newPipeline(nil, nil, nil), &vc)
|
|
|
|
viewSuggestion := func(inst instID, stream string) string {
|
|
return `"NewView(Instrument{` +
|
|
`Name: \"` + inst.Name +
|
|
`\", Description: \"` + inst.Description +
|
|
`\", Kind: \"InstrumentKind` + inst.Kind.String() +
|
|
`\", Unit: \"` + inst.Unit +
|
|
`\"}, ` +
|
|
stream +
|
|
`)"`
|
|
}
|
|
|
|
t.Run("Name", func(t *testing.T) {
|
|
inst := instID{
|
|
Name: "requestcount",
|
|
Description: orig.Description,
|
|
Kind: orig.Kind,
|
|
Unit: orig.Unit,
|
|
Number: orig.Number,
|
|
}
|
|
i.logConflict(inst)
|
|
assert.Containsf(t, msg, viewSuggestion(
|
|
inst, `Stream{Name: \"{{NEW_NAME}}\"}`,
|
|
), "no suggestion logged: %v", inst)
|
|
|
|
// Reset.
|
|
msg = ""
|
|
})
|
|
|
|
t.Run("Description", func(t *testing.T) {
|
|
inst := instID{
|
|
Name: orig.Name,
|
|
Description: "alt",
|
|
Kind: orig.Kind,
|
|
Unit: orig.Unit,
|
|
Number: orig.Number,
|
|
}
|
|
i.logConflict(inst)
|
|
assert.Containsf(t, msg, viewSuggestion(
|
|
inst, `Stream{Description: \"`+orig.Description+`\"}`,
|
|
), "no suggestion logged: %v", inst)
|
|
|
|
// Reset.
|
|
msg = ""
|
|
})
|
|
|
|
t.Run("Kind", func(t *testing.T) {
|
|
inst := instID{
|
|
Name: orig.Name,
|
|
Description: orig.Description,
|
|
Kind: InstrumentKindHistogram,
|
|
Unit: orig.Unit,
|
|
Number: orig.Number,
|
|
}
|
|
i.logConflict(inst)
|
|
assert.Containsf(t, msg, viewSuggestion(
|
|
inst, `Stream{Name: \"{{NEW_NAME}}\"}`,
|
|
), "no suggestion logged: %v", inst)
|
|
|
|
// Reset.
|
|
msg = ""
|
|
})
|
|
|
|
t.Run("Unit", func(t *testing.T) {
|
|
inst := instID{
|
|
Name: orig.Name,
|
|
Description: orig.Description,
|
|
Kind: orig.Kind,
|
|
Unit: "ms",
|
|
Number: orig.Number,
|
|
}
|
|
i.logConflict(inst)
|
|
assert.NotContains(t, msg, "NewView", "suggestion logged: %v", inst)
|
|
|
|
// Reset.
|
|
msg = ""
|
|
})
|
|
|
|
t.Run("Number", func(t *testing.T) {
|
|
inst := instID{
|
|
Name: orig.Name,
|
|
Description: orig.Description,
|
|
Kind: orig.Kind,
|
|
Unit: orig.Unit,
|
|
Number: "float64",
|
|
}
|
|
i.logConflict(inst)
|
|
assert.NotContains(t, msg, "NewView", "suggestion logged: %v", inst)
|
|
|
|
// Reset.
|
|
msg = ""
|
|
})
|
|
}
|
|
|
|
func TestInserterCachedAggregatorNameConflict(t *testing.T) {
|
|
const name = "requestCount"
|
|
scope := instrumentation.Scope{Name: "pipeline_test"}
|
|
kind := InstrumentKindCounter
|
|
stream := Stream{
|
|
Name: name,
|
|
Aggregation: AggregationSum{},
|
|
}
|
|
|
|
var vc cache[string, instID]
|
|
pipe := newPipeline(nil, NewManualReader(), nil)
|
|
i := newInserter[int64](pipe, &vc)
|
|
|
|
readerAggregation := i.readerDefaultAggregation(kind)
|
|
_, origID, err := i.cachedAggregator(scope, kind, stream, readerAggregation)
|
|
require.NoError(t, err)
|
|
|
|
require.Len(t, pipe.aggregations, 1)
|
|
require.Contains(t, pipe.aggregations, scope)
|
|
iSync := pipe.aggregations[scope]
|
|
require.Len(t, iSync, 1)
|
|
require.Equal(t, name, iSync[0].name)
|
|
|
|
stream.Name = "RequestCount"
|
|
_, id, err := i.cachedAggregator(scope, kind, stream, readerAggregation)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, origID, id, "multiple aggregators for equivalent name")
|
|
|
|
assert.Len(t, pipe.aggregations, 1, "additional scope added")
|
|
require.Contains(t, pipe.aggregations, scope, "original scope removed")
|
|
iSync = pipe.aggregations[scope]
|
|
require.Len(t, iSync, 1, "registered instrumentSync changed")
|
|
assert.Equal(t, name, iSync[0].name, "stream name changed")
|
|
}
|
|
|
|
func TestExemplars(t *testing.T) {
|
|
nCPU := runtime.NumCPU()
|
|
setup := func(name string) (metric.Meter, Reader) {
|
|
r := NewManualReader()
|
|
v := NewView(Instrument{Name: "int64-expo-histogram"}, Stream{
|
|
Aggregation: AggregationBase2ExponentialHistogram{
|
|
MaxSize: 160, // > 20, reservoir size should default to 20.
|
|
MaxScale: 20,
|
|
},
|
|
})
|
|
return NewMeterProvider(WithReader(r), WithView(v)).Meter(name), r
|
|
}
|
|
|
|
measure := func(ctx context.Context, m metric.Meter) {
|
|
i, err := m.Int64Counter("int64-counter")
|
|
require.NoError(t, err)
|
|
|
|
h, err := m.Int64Histogram("int64-histogram")
|
|
require.NoError(t, err)
|
|
|
|
e, err := m.Int64Histogram("int64-expo-histogram")
|
|
require.NoError(t, err)
|
|
|
|
for j := 0; j < 20*nCPU; j++ { // will be >= 20 and > nCPU
|
|
i.Add(ctx, 1)
|
|
h.Record(ctx, 1)
|
|
e.Record(ctx, 1)
|
|
}
|
|
}
|
|
|
|
check := func(t *testing.T, r Reader, nSum, nHist, nExpo int) {
|
|
t.Helper()
|
|
|
|
rm := new(metricdata.ResourceMetrics)
|
|
require.NoError(t, r.Collect(context.Background(), rm))
|
|
|
|
require.Len(t, rm.ScopeMetrics, 1, "ScopeMetrics")
|
|
sm := rm.ScopeMetrics[0]
|
|
require.Len(t, sm.Metrics, 3, "Metrics")
|
|
|
|
require.IsType(t, metricdata.Sum[int64]{}, sm.Metrics[0].Data, sm.Metrics[0].Name)
|
|
sum := sm.Metrics[0].Data.(metricdata.Sum[int64])
|
|
assert.Len(t, sum.DataPoints[0].Exemplars, nSum)
|
|
|
|
require.IsType(t, metricdata.Histogram[int64]{}, sm.Metrics[1].Data, sm.Metrics[1].Name)
|
|
hist := sm.Metrics[1].Data.(metricdata.Histogram[int64])
|
|
assert.Len(t, hist.DataPoints[0].Exemplars, nHist)
|
|
|
|
require.IsType(t, metricdata.ExponentialHistogram[int64]{}, sm.Metrics[2].Data, sm.Metrics[2].Name)
|
|
expo := sm.Metrics[2].Data.(metricdata.ExponentialHistogram[int64])
|
|
assert.Len(t, expo.DataPoints[0].Exemplars, nExpo)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
sc := trace.NewSpanContext(trace.SpanContextConfig{
|
|
SpanID: trace.SpanID{0o1},
|
|
TraceID: trace.TraceID{0o1},
|
|
TraceFlags: trace.FlagsSampled,
|
|
})
|
|
sampled := trace.ContextWithSpanContext(context.Background(), sc)
|
|
|
|
t.Run("Default", func(t *testing.T) {
|
|
m, r := setup("default")
|
|
measure(ctx, m)
|
|
check(t, r, 0, 0, 0)
|
|
|
|
measure(sampled, m)
|
|
check(t, r, nCPU, 1, 20)
|
|
})
|
|
|
|
t.Run("Invalid", func(t *testing.T) {
|
|
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "unrecognized")
|
|
m, r := setup("default")
|
|
measure(ctx, m)
|
|
check(t, r, 0, 0, 0)
|
|
|
|
measure(sampled, m)
|
|
check(t, r, nCPU, 1, 20)
|
|
})
|
|
|
|
t.Run("always_on", func(t *testing.T) {
|
|
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_on")
|
|
m, r := setup("always_on")
|
|
measure(ctx, m)
|
|
check(t, r, nCPU, 1, 20)
|
|
})
|
|
|
|
t.Run("always_off", func(t *testing.T) {
|
|
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_off")
|
|
m, r := setup("always_off")
|
|
measure(ctx, m)
|
|
check(t, r, 0, 0, 0)
|
|
})
|
|
|
|
t.Run("trace_based", func(t *testing.T) {
|
|
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "trace_based")
|
|
m, r := setup("trace_based")
|
|
measure(ctx, m)
|
|
check(t, r, 0, 0, 0)
|
|
|
|
measure(sampled, m)
|
|
check(t, r, nCPU, 1, 20)
|
|
})
|
|
}
|