You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-11-27 22:49:15 +02:00
Per https://github.com/open-telemetry/opentelemetry-go/pull/6271#issuecomment-2657554647 > We agreed that we can move `FilterProcessor` directly to `sdk/log` as Logs SDK does not look to be stabilized soon. - Add the possibility to filter based on the resource and scope which is available for the SDK. The scope information is the most important as it gives the possibility to e.g. filter out logs emitted for a given logger. Thus e.g. https://github.com/open-telemetry/opentelemetry-specification/issues/4364 is not necessary. See https://github.com/open-telemetry/opentelemetry-specification/pull/4290#discussion_r1927546170 for more context. - It is going be an example for https://github.com/open-telemetry/opentelemetry-specification/issues/4363 There is a little overhead (IMO totally acceptable) because of data transformation. Most importantly, there is no new heap allocation. ``` goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/log cpu: 13th Gen Intel(R) Core(TM) i7-13800H │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ LoggerEnabled-20 4.589n ± 1% 319.750n ± 16% +6867.75% (p=0.000 n=10) │ old.txt │ new.txt │ │ B/op │ B/op vs base │ LoggerEnabled-20 0.000Ki ± 0% 1.093Ki ± 13% ? (p=0.000 n=10) │ old.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ LoggerEnabled-20 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ ¹ all samples are equal ``` `Logger.Enabled` is still more efficient than `Logger.Emit` (benchmarks from https://github.com/open-telemetry/opentelemetry-go/pull/6315). ``` goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/log cpu: 13th Gen Intel(R) Core(TM) i7-13800H BenchmarkLoggerEmit/5_attributes-20 559934 2391 ns/op 39088 B/op 1 allocs/op BenchmarkLoggerEmit/10_attributes-20 1000000 5910 ns/op 49483 B/op 5 allocs/op BenchmarkLoggerEnabled-20 1605697 968.7 ns/op 1272 B/op 0 allocs/op PASS ok go.opentelemetry.io/otel/sdk/log 10.789s ``` Prior art: - https://github.com/open-telemetry/opentelemetry-go/pull/6271 - https://github.com/open-telemetry/opentelemetry-go/pull/6286 I also created for tracking purposes: - https://github.com/open-telemetry/opentelemetry-go/issues/6328
394 lines
10 KiB
Go
394 lines
10 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package log // import "go.opentelemetry.io/otel/sdk/log"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
|
|
"github.com/go-logr/logr"
|
|
"github.com/go-logr/logr/testr"
|
|
"github.com/google/go-cmp/cmp"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/internal/global"
|
|
"go.opentelemetry.io/otel/log"
|
|
"go.opentelemetry.io/otel/log/noop"
|
|
ottest "go.opentelemetry.io/otel/sdk/internal/internaltest"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
)
|
|
|
|
const envVarResourceAttributes = "OTEL_RESOURCE_ATTRIBUTES"
|
|
|
|
type processor struct {
|
|
Name string
|
|
Err error
|
|
|
|
shutdownCalls int
|
|
forceFlushCalls int
|
|
|
|
records []Record
|
|
}
|
|
|
|
func newProcessor(name string) *processor {
|
|
return &processor{Name: name}
|
|
}
|
|
|
|
func (p *processor) OnEmit(ctx context.Context, r *Record) error {
|
|
if p.Err != nil {
|
|
return p.Err
|
|
}
|
|
|
|
p.records = append(p.records, *r)
|
|
return nil
|
|
}
|
|
|
|
func (p *processor) Shutdown(context.Context) error {
|
|
p.shutdownCalls++
|
|
return p.Err
|
|
}
|
|
|
|
func (p *processor) ForceFlush(context.Context) error {
|
|
p.forceFlushCalls++
|
|
return p.Err
|
|
}
|
|
|
|
type fltrProcessor struct {
|
|
*processor
|
|
|
|
enabled bool
|
|
params []EnabledParameters
|
|
}
|
|
|
|
var _ FilterProcessor = (*fltrProcessor)(nil)
|
|
|
|
func newFltrProcessor(name string, enabled bool) *fltrProcessor {
|
|
return &fltrProcessor{
|
|
processor: newProcessor(name),
|
|
enabled: enabled,
|
|
}
|
|
}
|
|
|
|
func (p *fltrProcessor) Enabled(_ context.Context, params EnabledParameters) bool {
|
|
p.params = append(p.params, params)
|
|
return p.enabled
|
|
}
|
|
|
|
func TestNewLoggerProviderConfiguration(t *testing.T) {
|
|
t.Cleanup(func(orig otel.ErrorHandler) func() {
|
|
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
|
|
t.Log(err)
|
|
}))
|
|
return func() { otel.SetErrorHandler(orig) }
|
|
}(otel.GetErrorHandler()))
|
|
|
|
res := resource.NewSchemaless(attribute.String("key", "value"))
|
|
p0, p1 := newProcessor("0"), newProcessor("1")
|
|
attrCntLim := 12
|
|
attrValLenLim := 21
|
|
|
|
testcases := []struct {
|
|
name string
|
|
envars map[string]string
|
|
options []LoggerProviderOption
|
|
want *LoggerProvider
|
|
}{
|
|
{
|
|
name: "Defaults",
|
|
want: &LoggerProvider{
|
|
resource: resource.Default(),
|
|
attributeCountLimit: defaultAttrCntLim,
|
|
attributeValueLengthLimit: defaultAttrValLenLim,
|
|
},
|
|
},
|
|
{
|
|
name: "Options",
|
|
options: []LoggerProviderOption{
|
|
WithResource(res),
|
|
WithProcessor(p0),
|
|
WithProcessor(p1),
|
|
WithAttributeCountLimit(attrCntLim),
|
|
WithAttributeValueLengthLimit(attrValLenLim),
|
|
},
|
|
want: &LoggerProvider{
|
|
resource: res,
|
|
processors: []Processor{p0, p1},
|
|
attributeCountLimit: attrCntLim,
|
|
attributeValueLengthLimit: attrValLenLim,
|
|
},
|
|
},
|
|
{
|
|
name: "Environment",
|
|
envars: map[string]string{
|
|
envarAttrCntLim: strconv.Itoa(attrCntLim),
|
|
envarAttrValLenLim: strconv.Itoa(attrValLenLim),
|
|
},
|
|
want: &LoggerProvider{
|
|
resource: resource.Default(),
|
|
attributeCountLimit: attrCntLim,
|
|
attributeValueLengthLimit: attrValLenLim,
|
|
},
|
|
},
|
|
{
|
|
name: "InvalidEnvironment",
|
|
envars: map[string]string{
|
|
envarAttrCntLim: "invalid attributeCountLimit",
|
|
envarAttrValLenLim: "invalid attributeValueLengthLimit",
|
|
},
|
|
want: &LoggerProvider{
|
|
resource: resource.Default(),
|
|
attributeCountLimit: defaultAttrCntLim,
|
|
attributeValueLengthLimit: defaultAttrValLenLim,
|
|
},
|
|
},
|
|
{
|
|
name: "Precedence",
|
|
envars: map[string]string{
|
|
envarAttrCntLim: strconv.Itoa(100),
|
|
envarAttrValLenLim: strconv.Itoa(101),
|
|
},
|
|
options: []LoggerProviderOption{
|
|
// These override the environment variables.
|
|
WithAttributeCountLimit(attrCntLim),
|
|
WithAttributeValueLengthLimit(attrValLenLim),
|
|
},
|
|
want: &LoggerProvider{
|
|
resource: resource.Default(),
|
|
attributeCountLimit: attrCntLim,
|
|
attributeValueLengthLimit: attrValLenLim,
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, tc := range testcases {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
for key, value := range tc.envars {
|
|
t.Setenv(key, value)
|
|
}
|
|
assert.Equal(t, tc.want, NewLoggerProvider(tc.options...))
|
|
})
|
|
}
|
|
}
|
|
|
|
func mergeResource(t *testing.T, r1, r2 *resource.Resource) *resource.Resource {
|
|
r, err := resource.Merge(r1, r2)
|
|
assert.NoError(t, err)
|
|
return r
|
|
}
|
|
|
|
func TestWithResource(t *testing.T) {
|
|
store, err := ottest.SetEnvVariables(map[string]string{
|
|
envVarResourceAttributes: "key=value,rk5=7",
|
|
})
|
|
require.NoError(t, err)
|
|
defer func() { require.NoError(t, store.Restore()) }()
|
|
|
|
cases := []struct {
|
|
name string
|
|
options []LoggerProviderOption
|
|
want *resource.Resource
|
|
msg string
|
|
}{
|
|
{
|
|
name: "explicitly empty resource",
|
|
options: []LoggerProviderOption{WithResource(resource.Empty())},
|
|
want: resource.Environment(),
|
|
},
|
|
{
|
|
name: "uses default if no resource option",
|
|
options: []LoggerProviderOption{},
|
|
want: resource.Default(),
|
|
},
|
|
{
|
|
name: "explicit resource",
|
|
options: []LoggerProviderOption{WithResource(resource.NewSchemaless(attribute.String("rk1", "rv1"), attribute.Int64("rk2", 5)))},
|
|
want: mergeResource(t, resource.Environment(), resource.NewSchemaless(attribute.String("rk1", "rv1"), attribute.Int64("rk2", 5))),
|
|
},
|
|
{
|
|
name: "last resource wins",
|
|
options: []LoggerProviderOption{
|
|
WithResource(resource.NewSchemaless(attribute.String("rk1", "vk1"), attribute.Int64("rk2", 5))),
|
|
WithResource(resource.NewSchemaless(attribute.String("rk3", "rv3"), attribute.Int64("rk4", 10))),
|
|
},
|
|
want: mergeResource(t, resource.Environment(), resource.NewSchemaless(attribute.String("rk3", "rv3"), attribute.Int64("rk4", 10))),
|
|
},
|
|
{
|
|
name: "overlapping attributes with environment resource",
|
|
options: []LoggerProviderOption{WithResource(resource.NewSchemaless(attribute.String("rk1", "rv1"), attribute.Int64("rk5", 10)))},
|
|
want: mergeResource(t, resource.Environment(), resource.NewSchemaless(attribute.String("rk1", "rv1"), attribute.Int64("rk5", 10))),
|
|
},
|
|
}
|
|
for _, tc := range cases {
|
|
tc := tc
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
got := newProviderConfig(tc.options).resource
|
|
if diff := cmp.Diff(got, tc.want); diff != "" {
|
|
t.Errorf("WithResource:\n -got +want %s", diff)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestLoggerProviderConcurrentSafe(t *testing.T) {
|
|
const goRoutineN = 10
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(goRoutineN)
|
|
|
|
p := NewLoggerProvider(WithProcessor(newProcessor("0")))
|
|
const name = "testLogger"
|
|
ctx := context.Background()
|
|
for i := 0; i < goRoutineN; i++ {
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
_ = p.Logger(name)
|
|
_ = p.Shutdown(ctx)
|
|
_ = p.ForceFlush(ctx)
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
type logSink struct {
|
|
logr.LogSink
|
|
|
|
level int
|
|
msg string
|
|
keysAndValues []interface{}
|
|
}
|
|
|
|
func (l *logSink) Enabled(int) bool { return true }
|
|
|
|
func (l *logSink) Info(level int, msg string, keysAndValues ...any) {
|
|
l.level, l.msg, l.keysAndValues = level, msg, keysAndValues
|
|
l.LogSink.Info(level, msg, keysAndValues...)
|
|
}
|
|
|
|
func TestLoggerProviderLogger(t *testing.T) {
|
|
t.Run("InvalidName", func(t *testing.T) {
|
|
l := &logSink{LogSink: testr.New(t).GetSink()}
|
|
t.Cleanup(func(orig logr.Logger) func() {
|
|
global.SetLogger(logr.New(l))
|
|
return func() { global.SetLogger(orig) }
|
|
}(global.GetLogger()))
|
|
|
|
_ = NewLoggerProvider().Logger("")
|
|
assert.Equal(t, 1, l.level, "logged level")
|
|
assert.Equal(t, "Invalid Logger name.", l.msg, "logged message")
|
|
require.Len(t, l.keysAndValues, 2, "logged key values")
|
|
assert.Equal(t, "", l.keysAndValues[1], "logged name")
|
|
})
|
|
|
|
t.Run("Stopped", func(t *testing.T) {
|
|
ctx := context.Background()
|
|
p := NewLoggerProvider()
|
|
_ = p.Shutdown(ctx)
|
|
l := p.Logger("testing")
|
|
|
|
assert.NotNil(t, l)
|
|
assert.IsType(t, noop.Logger{}, l)
|
|
})
|
|
|
|
t.Run("SameLoggers", func(t *testing.T) {
|
|
p := NewLoggerProvider()
|
|
|
|
l0, l1, l2 := p.Logger("l0"), p.Logger("l1"), p.Logger("l0", log.WithInstrumentationAttributes(attribute.String("foo", "bar")))
|
|
assert.NotSame(t, l0, l1)
|
|
assert.NotSame(t, l0, l2)
|
|
assert.NotSame(t, l1, l2)
|
|
|
|
l3, l4, l5 := p.Logger("l0"), p.Logger("l1"), p.Logger("l0", log.WithInstrumentationAttributes(attribute.String("foo", "bar")))
|
|
assert.Same(t, l0, l3)
|
|
assert.Same(t, l1, l4)
|
|
assert.Same(t, l2, l5)
|
|
})
|
|
}
|
|
|
|
func TestLoggerProviderShutdown(t *testing.T) {
|
|
t.Run("Once", func(t *testing.T) {
|
|
proc := newProcessor("")
|
|
p := NewLoggerProvider(WithProcessor(proc))
|
|
|
|
ctx := context.Background()
|
|
require.NoError(t, p.Shutdown(ctx))
|
|
require.Equal(t, 1, proc.shutdownCalls, "processor Shutdown not called")
|
|
|
|
require.NoError(t, p.Shutdown(ctx))
|
|
assert.Equal(t, 1, proc.shutdownCalls, "processor Shutdown called multiple times")
|
|
})
|
|
|
|
t.Run("Error", func(t *testing.T) {
|
|
proc := newProcessor("")
|
|
proc.Err = assert.AnError
|
|
p := NewLoggerProvider(WithProcessor(proc))
|
|
|
|
ctx := context.Background()
|
|
assert.ErrorIs(t, p.Shutdown(ctx), assert.AnError, "processor error not returned")
|
|
})
|
|
}
|
|
|
|
func TestLoggerProviderForceFlush(t *testing.T) {
|
|
t.Run("Stopped", func(t *testing.T) {
|
|
proc := newProcessor("")
|
|
p := NewLoggerProvider(WithProcessor(proc))
|
|
|
|
ctx := context.Background()
|
|
require.NoError(t, p.ForceFlush(ctx))
|
|
require.Equal(t, 1, proc.forceFlushCalls, "processor ForceFlush not called")
|
|
|
|
require.NoError(t, p.Shutdown(ctx))
|
|
|
|
require.NoError(t, p.ForceFlush(ctx))
|
|
assert.Equal(t, 1, proc.forceFlushCalls, "processor ForceFlush called after Shutdown")
|
|
})
|
|
|
|
t.Run("Multi", func(t *testing.T) {
|
|
proc := newProcessor("")
|
|
p := NewLoggerProvider(WithProcessor(proc))
|
|
|
|
ctx := context.Background()
|
|
require.NoError(t, p.ForceFlush(ctx))
|
|
require.Equal(t, 1, proc.forceFlushCalls, "processor ForceFlush not called")
|
|
|
|
require.NoError(t, p.ForceFlush(ctx))
|
|
assert.Equal(t, 2, proc.forceFlushCalls, "processor ForceFlush not called multiple times")
|
|
})
|
|
|
|
t.Run("Error", func(t *testing.T) {
|
|
proc := newProcessor("")
|
|
proc.Err = assert.AnError
|
|
p := NewLoggerProvider(WithProcessor(proc))
|
|
|
|
ctx := context.Background()
|
|
assert.ErrorIs(t, p.ForceFlush(ctx), assert.AnError, "processor error not returned")
|
|
})
|
|
}
|
|
|
|
func BenchmarkLoggerProviderLogger(b *testing.B) {
|
|
p := NewLoggerProvider()
|
|
names := make([]string, b.N)
|
|
for i := 0; i < b.N; i++ {
|
|
names[i] = fmt.Sprintf("%d logger", i)
|
|
}
|
|
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
|
|
loggers := make([]log.Logger, b.N)
|
|
for i := 0; i < b.N; i++ {
|
|
loggers[i] = p.Logger(names[i])
|
|
}
|
|
|
|
b.StopTimer()
|
|
loggers[0].Enabled(context.Background(), log.EnabledParameters{})
|
|
}
|