From 4580e06de09960506d9ecb36da59979b10d0fda2 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Sun, 17 Mar 2024 22:57:43 -0700 Subject: [PATCH] Implement the LoggerProvider (#5080) --- sdk/log/go.mod | 2 +- sdk/log/logger.go | 6 ++ sdk/log/provider.go | 69 ++++++++++++++-- sdk/log/provider_test.go | 172 +++++++++++++++++++++++++++++++++++++-- 4 files changed, 236 insertions(+), 13 deletions(-) diff --git a/sdk/log/go.mod b/sdk/log/go.mod index f646ba1da..bbd31adf1 100644 --- a/sdk/log/go.mod +++ b/sdk/log/go.mod @@ -3,6 +3,7 @@ module go.opentelemetry.io/otel/sdk/log go 1.21 require ( + github.com/go-logr/logr v1.4.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/log v0.0.1-alpha @@ -12,7 +13,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect diff --git a/sdk/log/logger.go b/sdk/log/logger.go index 9f81d48b6..76898c60a 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/embedded" + "go.opentelemetry.io/otel/sdk/instrumentation" ) // Compile-time check logger implements log.Logger. @@ -17,6 +18,11 @@ type logger struct { embedded.Logger } +func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger { + // TODO (#5061): Implement. + return &logger{} +} + func (l *logger) Emit(ctx context.Context, r log.Record) { // TODO (#5061): Implement. } diff --git a/sdk/log/provider.go b/sdk/log/provider.go index 1ab2affd6..704c505e9 100644 --- a/sdk/log/provider.go +++ b/sdk/log/provider.go @@ -5,13 +5,19 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" + "errors" "fmt" "os" "strconv" + "sync" + "sync/atomic" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/log/embedded" + "go.opentelemetry.io/otel/log/noop" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/resource" ) @@ -100,6 +106,11 @@ type LoggerProvider struct { processors []Processor attributeCountLimit int attributeValueLengthLimit int + + loggersMu sync.Mutex + loggers map[instrumentation.Scope]*logger + + stopped atomic.Bool } // Compile-time check LoggerProvider implements log.LoggerProvider. @@ -123,26 +134,72 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider { // Logger returns a new [log.Logger] with the provided name and configuration. // +// If p is shut down, a [noop.Logger] instace is returned. +// // This method can be called concurrently. func (p *LoggerProvider) Logger(name string, opts ...log.LoggerOption) log.Logger { - // TODO (#5060): Implement. - return &logger{} + if name == "" { + global.Warn("Invalid Logger name.", "name", name) + } + + if p.stopped.Load() { + return noop.NewLoggerProvider().Logger(name, opts...) + } + + cfg := log.NewLoggerConfig(opts...) + scope := instrumentation.Scope{ + Name: name, + Version: cfg.InstrumentationVersion(), + SchemaURL: cfg.SchemaURL(), + } + + p.loggersMu.Lock() + defer p.loggersMu.Unlock() + + if p.loggers == nil { + l := newLogger(p, scope) + p.loggers = map[instrumentation.Scope]*logger{scope: l} + return l + } + + l, ok := p.loggers[scope] + if !ok { + l = newLogger(p, scope) + p.loggers[scope] = l + } + + return l } // Shutdown flushes queued log records and shuts down the decorated expoter. // // This method can be called concurrently. func (p *LoggerProvider) Shutdown(ctx context.Context) error { - // TODO (#5060): Implement. - return nil + stopped := p.stopped.Swap(true) + if stopped { + return nil + } + + var err error + for _, p := range p.processors { + err = errors.Join(err, p.Shutdown(ctx)) + } + return err } // ForceFlush flushes all exporters. // // This method can be called concurrently. func (p *LoggerProvider) ForceFlush(ctx context.Context) error { - // TODO (#5060): Implement. - return nil + if p.stopped.Load() { + return nil + } + + var err error + for _, p := range p.processors { + err = errors.Join(err, p.ForceFlush(ctx)) + } + return err } // LoggerProviderOption applies a configuration option value to a LoggerProvider. diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index a53ea1aa8..d9b4758e8 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -6,23 +6,50 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" "strconv" + "sync" "testing" + "github.com/go-logr/logr" + "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/log/noop" "go.opentelemetry.io/otel/sdk/resource" ) type processor struct { - name string + Name string + Err error + + shutdownCalls int + forceFlushCalls int } -func (processor) OnEmit(context.Context, Record) error { return nil } -func (processor) Enabled(context.Context, Record) bool { return true } -func (processor) Shutdown(context.Context) error { return nil } -func (processor) ForceFlush(context.Context) error { return nil } +func newProcessor(name string) *processor { + return &processor{Name: name} +} + +func (p *processor) OnEmit(context.Context, Record) error { + return p.Err +} + +func (*processor) Enabled(context.Context, Record) bool { + return true +} + +func (p *processor) Shutdown(context.Context) error { + p.shutdownCalls++ + return p.Err +} + +func (p *processor) ForceFlush(context.Context) error { + p.forceFlushCalls++ + return p.Err +} func TestNewLoggerProviderConfiguration(t *testing.T) { t.Cleanup(func(orig otel.ErrorHandler) func() { @@ -33,7 +60,7 @@ func TestNewLoggerProviderConfiguration(t *testing.T) { }(otel.GetErrorHandler())) res := resource.NewSchemaless(attribute.String("key", "value")) - p0, p1 := processor{name: "0"}, processor{name: "1"} + p0, p1 := newProcessor("0"), newProcessor("1") attrCntLim := 12 attrValLenLim := 21 @@ -124,3 +151,136 @@ func TestLimitValueFailsOpen(t *testing.T) { var l limit assert.Equal(t, -1, l.Value(), "limit value should default to unlimited") } + +func TestLoggerProviderConcurrentSafe(t *testing.T) { + const goRoutineN = 10 + + var wg sync.WaitGroup + wg.Add(goRoutineN) + + p := NewLoggerProvider(WithProcessor(newProcessor("0"))) + const name = "testLogger" + ctx := context.Background() + for i := 0; i < goRoutineN; i++ { + go func() { + defer wg.Done() + + _ = p.Logger(name) + _ = p.Shutdown(ctx) + _ = p.ForceFlush(ctx) + }() + } + + wg.Wait() +} + +type logSink struct { + logr.LogSink + + level int + msg string + keysAndValues []interface{} +} + +func (l *logSink) Enabled(int) bool { return true } + +func (l *logSink) Info(level int, msg string, keysAndValues ...any) { + l.level, l.msg, l.keysAndValues = level, msg, keysAndValues + l.LogSink.Info(level, msg, keysAndValues) +} + +func TestLoggerProviderLogger(t *testing.T) { + t.Run("InvalidName", func(t *testing.T) { + l := &logSink{LogSink: testr.New(t).GetSink()} + t.Cleanup(func(orig logr.Logger) func() { + global.SetLogger(logr.New(l)) + return func() { global.SetLogger(orig) } + }(global.GetLogger())) + + _ = NewLoggerProvider().Logger("") + assert.Equal(t, 1, l.level, "logged level") + assert.Equal(t, "Invalid Logger name.", l.msg, "logged message") + require.Len(t, l.keysAndValues, 2, "logged key values") + assert.Equal(t, "", l.keysAndValues[1], "logged name") + }) + + t.Run("Stopped", func(t *testing.T) { + ctx := context.Background() + p := NewLoggerProvider() + _ = p.Shutdown(ctx) + l := p.Logger("testing") + + assert.NotNil(t, l) + assert.IsType(t, noop.Logger{}, l) + }) + + t.Run("SameLoggers", func(t *testing.T) { + p := NewLoggerProvider() + + l0, l1 := p.Logger("l0"), p.Logger("l1") + l2, l3 := p.Logger("l0"), p.Logger("l1") + + assert.Same(t, l0, l2) + assert.Same(t, l1, l3) + }) +} + +func TestLoggerProviderShutdown(t *testing.T) { + t.Run("Once", func(t *testing.T) { + proc := newProcessor("") + p := NewLoggerProvider(WithProcessor(proc)) + + ctx := context.Background() + require.NoError(t, p.Shutdown(ctx)) + require.Equal(t, 1, proc.shutdownCalls, "processor Shutdown not called") + + require.NoError(t, p.Shutdown(ctx)) + assert.Equal(t, 1, proc.shutdownCalls, "processor Shutdown called multiple times") + }) + + t.Run("Error", func(t *testing.T) { + proc := newProcessor("") + proc.Err = assert.AnError + p := NewLoggerProvider(WithProcessor(proc)) + + ctx := context.Background() + assert.ErrorIs(t, p.Shutdown(ctx), assert.AnError, "processor error not returned") + }) +} + +func TestLoggerProviderForceFlush(t *testing.T) { + t.Run("Stopped", func(t *testing.T) { + proc := newProcessor("") + p := NewLoggerProvider(WithProcessor(proc)) + + ctx := context.Background() + require.NoError(t, p.ForceFlush(ctx)) + require.Equal(t, 1, proc.forceFlushCalls, "processor ForceFlush not called") + + require.NoError(t, p.Shutdown(ctx)) + + require.NoError(t, p.ForceFlush(ctx)) + assert.Equal(t, 1, proc.forceFlushCalls, "processor ForceFlush called after Shutdown") + }) + + t.Run("Multi", func(t *testing.T) { + proc := newProcessor("") + p := NewLoggerProvider(WithProcessor(proc)) + + ctx := context.Background() + require.NoError(t, p.ForceFlush(ctx)) + require.Equal(t, 1, proc.forceFlushCalls, "processor ForceFlush not called") + + require.NoError(t, p.ForceFlush(ctx)) + assert.Equal(t, 2, proc.forceFlushCalls, "processor ForceFlush not called multiple times") + }) + + t.Run("Error", func(t *testing.T) { + proc := newProcessor("") + proc.Err = assert.AnError + p := NewLoggerProvider(WithProcessor(proc)) + + ctx := context.Background() + assert.ErrorIs(t, p.ForceFlush(ctx), assert.AnError, "processor error not returned") + }) +}