1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-07 23:02:15 +02:00
opentelemetry-go/sdk/metric/pipeline_test.go
Tyler Yahn ab61991465
Log a view suggestion for duplicate instrument conflicts (#4349)
* Log a view suggestion for duplicate instrument conflicts

* Add change to changelog

* Update changelog entry
2023-07-24 07:47:56 -07:00

361 lines
8.9 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
"fmt"
"log"
"os"
"strings"
"sync"
"testing"
"github.com/go-logr/logr"
"github.com/go-logr/logr/funcr"
"github.com/go-logr/stdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
)
func testSumAggregateOutput(dest *metricdata.Aggregation) int {
*dest = metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{{Value: 1}},
}
return 1
}
func TestNewPipeline(t *testing.T) {
pipe := newPipeline(nil, nil, nil)
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
assert.Len(t, output.ScopeMetrics, 0)
iSync := instrumentSync{"name", "desc", "1", testSumAggregateOutput}
assert.NotPanics(t, func() {
pipe.addSync(instrumentation.Scope{}, iSync)
})
require.NotPanics(t, func() {
pipe.addMultiCallback(func(context.Context) error { return nil })
})
err = pipe.produce(context.Background(), &output)
require.NoError(t, err)
assert.Equal(t, resource.Empty(), output.Resource)
require.Len(t, output.ScopeMetrics, 1)
require.Len(t, output.ScopeMetrics[0].Metrics, 1)
}
func TestPipelineUsesResource(t *testing.T) {
res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource"))
pipe := newPipeline(res, nil, nil)
output := metricdata.ResourceMetrics{}
err := pipe.produce(context.Background(), &output)
assert.NoError(t, err)
assert.Equal(t, res, output.Resource)
}
func TestPipelineConcurrentSafe(t *testing.T) {
pipe := newPipeline(nil, nil, nil)
ctx := context.Background()
var output metricdata.ResourceMetrics
var wg sync.WaitGroup
const threads = 2
for i := 0; i < threads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = pipe.produce(ctx, &output)
}()
wg.Add(1)
go func(n int) {
defer wg.Done()
name := fmt.Sprintf("name %d", n)
sync := instrumentSync{name, "desc", "1", testSumAggregateOutput}
pipe.addSync(instrumentation.Scope{}, sync)
}(i)
wg.Add(1)
go func() {
defer wg.Done()
pipe.addMultiCallback(func(context.Context) error { return nil })
}()
}
wg.Wait()
}
func TestDefaultViewImplicit(t *testing.T) {
t.Run("Int64", testDefaultViewImplicit[int64]())
t.Run("Float64", testDefaultViewImplicit[float64]())
}
func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
inst := Instrument{
Name: "requests",
Description: "count of requests received",
Kind: InstrumentKindCounter,
Unit: "1",
}
return func(t *testing.T) {
reader := NewManualReader()
tests := []struct {
name string
pipe *pipeline
}{
{
name: "NoView",
pipe: newPipeline(nil, reader, nil),
},
{
name: "NoMatchingView",
pipe: newPipeline(nil, reader, []View{
NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}),
}),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var c cache[string, instID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")
for _, in := range got {
in(context.Background(), 1, *attribute.EmptySet())
}
out := metricdata.ResourceMetrics{}
err = test.pipe.produce(context.Background(), &out)
require.NoError(t, err)
require.Len(t, out.ScopeMetrics, 1, "Aggregator not registered with pipeline")
sm := out.ScopeMetrics[0]
require.Len(t, sm.Metrics, 1, "metrics not produced from default view")
metricdatatest.AssertEqual(t, metricdata.Metrics{
Name: inst.Name,
Description: inst.Description,
Unit: "1",
Data: metricdata.Sum[N]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[N]{{Value: N(1)}},
},
}, sm.Metrics[0], metricdatatest.IgnoreTimestamp())
})
}
}
}
func TestLogConflictName(t *testing.T) {
testcases := []struct {
existing, name string
conflict bool
}{
{
existing: "requestCount",
name: "requestCount",
conflict: false,
},
{
existing: "requestCount",
name: "requestDuration",
conflict: false,
},
{
existing: "requestCount",
name: "requestcount",
conflict: true,
},
{
existing: "requestCount",
name: "REQUESTCOUNT",
conflict: true,
},
{
existing: "requestCount",
name: "rEqUeStCoUnT",
conflict: true,
},
}
var msg string
t.Cleanup(func(orig logr.Logger) func() {
otel.SetLogger(funcr.New(func(_, args string) {
msg = args
}, funcr.Options{Verbosity: 20}))
return func() { otel.SetLogger(orig) }
}(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile))))
for _, tc := range testcases {
var vc cache[string, instID]
name := strings.ToLower(tc.existing)
_ = vc.Lookup(name, func() instID {
return instID{Name: tc.existing}
})
i := newInserter[int64](newPipeline(nil, nil, nil), &vc)
i.logConflict(instID{Name: tc.name})
if tc.conflict {
assert.Containsf(
t, msg, "duplicate metric stream definitions",
"warning not logged for conflicting names: %s, %s",
tc.existing, tc.name,
)
} else {
assert.Equalf(
t, msg, "",
"warning logged for non-conflicting names: %s, %s",
tc.existing, tc.name,
)
}
// Reset.
msg = ""
}
}
func TestLogConflictSuggestView(t *testing.T) {
var msg string
t.Cleanup(func(orig logr.Logger) func() {
otel.SetLogger(funcr.New(func(_, args string) {
msg = args
}, funcr.Options{Verbosity: 20}))
return func() { otel.SetLogger(orig) }
}(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile))))
orig := instID{
Name: "requestCount",
Description: "number of requests",
Kind: InstrumentKindCounter,
Unit: "1",
Number: "int64",
}
var vc cache[string, instID]
name := strings.ToLower(orig.Name)
_ = vc.Lookup(name, func() instID { return orig })
i := newInserter[int64](newPipeline(nil, nil, nil), &vc)
viewSuggestion := func(inst instID, stream string) string {
return `"NewView(Instrument{` +
`Name: \"` + inst.Name +
`\", Description: \"` + inst.Description +
`\", Kind: \"InstrumentKind` + inst.Kind.String() +
`\", Unit: \"` + inst.Unit +
`\"}, ` +
stream +
`)"`
}
t.Run("Name", func(t *testing.T) {
inst := instID{
Name: "requestcount",
Description: orig.Description,
Kind: orig.Kind,
Unit: orig.Unit,
Number: orig.Number,
}
i.logConflict(inst)
assert.Containsf(t, msg, viewSuggestion(
inst, `Stream{Name: \"{{NEW_NAME}}\"}`,
), "no suggestion logged: %v", inst)
// Reset.
msg = ""
})
t.Run("Description", func(t *testing.T) {
inst := instID{
Name: orig.Name,
Description: "alt",
Kind: orig.Kind,
Unit: orig.Unit,
Number: orig.Number,
}
i.logConflict(inst)
assert.Containsf(t, msg, viewSuggestion(
inst, `Stream{Description: \"`+orig.Description+`\"}`,
), "no suggestion logged: %v", inst)
// Reset.
msg = ""
})
t.Run("Kind", func(t *testing.T) {
inst := instID{
Name: orig.Name,
Description: orig.Description,
Kind: InstrumentKindHistogram,
Unit: orig.Unit,
Number: orig.Number,
}
i.logConflict(inst)
assert.Containsf(t, msg, viewSuggestion(
inst, `Stream{Name: \"{{NEW_NAME}}\"}`,
), "no suggestion logged: %v", inst)
// Reset.
msg = ""
})
t.Run("Unit", func(t *testing.T) {
inst := instID{
Name: orig.Name,
Description: orig.Description,
Kind: orig.Kind,
Unit: "ms",
Number: orig.Number,
}
i.logConflict(inst)
assert.NotContains(t, msg, "NewView", "suggestion logged: %v", inst)
// Reset.
msg = ""
})
t.Run("Number", func(t *testing.T) {
inst := instID{
Name: orig.Name,
Description: orig.Description,
Kind: orig.Kind,
Unit: orig.Unit,
Number: "float64",
}
i.logConflict(inst)
assert.NotContains(t, msg, "NewView", "suggestion logged: %v", inst)
// Reset.
msg = ""
})
}