mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-11-24 08:22:25 +02:00
Implement the LoggerProvider (#5080)
This commit is contained in:
parent
3542ee68a9
commit
4580e06de0
@ -3,6 +3,7 @@ module go.opentelemetry.io/otel/sdk/log
|
||||
go 1.21
|
||||
|
||||
require (
|
||||
github.com/go-logr/logr v1.4.1
|
||||
github.com/stretchr/testify v1.9.0
|
||||
go.opentelemetry.io/otel v1.24.0
|
||||
go.opentelemetry.io/otel/log v0.0.1-alpha
|
||||
@ -12,7 +13,6 @@ require (
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.24.0 // indirect
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/log"
|
||||
"go.opentelemetry.io/otel/log/embedded"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
)
|
||||
|
||||
// Compile-time check logger implements log.Logger.
|
||||
@ -17,6 +18,11 @@ type logger struct {
|
||||
embedded.Logger
|
||||
}
|
||||
|
||||
func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger {
|
||||
// TODO (#5061): Implement.
|
||||
return &logger{}
|
||||
}
|
||||
|
||||
func (l *logger) Emit(ctx context.Context, r log.Record) {
|
||||
// TODO (#5061): Implement.
|
||||
}
|
||||
|
@ -5,13 +5,19 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/log"
|
||||
"go.opentelemetry.io/otel/log/embedded"
|
||||
"go.opentelemetry.io/otel/log/noop"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
@ -100,6 +106,11 @@ type LoggerProvider struct {
|
||||
processors []Processor
|
||||
attributeCountLimit int
|
||||
attributeValueLengthLimit int
|
||||
|
||||
loggersMu sync.Mutex
|
||||
loggers map[instrumentation.Scope]*logger
|
||||
|
||||
stopped atomic.Bool
|
||||
}
|
||||
|
||||
// Compile-time check LoggerProvider implements log.LoggerProvider.
|
||||
@ -123,26 +134,72 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider {
|
||||
|
||||
// Logger returns a new [log.Logger] with the provided name and configuration.
|
||||
//
|
||||
// If p is shut down, a [noop.Logger] instace is returned.
|
||||
//
|
||||
// This method can be called concurrently.
|
||||
func (p *LoggerProvider) Logger(name string, opts ...log.LoggerOption) log.Logger {
|
||||
// TODO (#5060): Implement.
|
||||
return &logger{}
|
||||
if name == "" {
|
||||
global.Warn("Invalid Logger name.", "name", name)
|
||||
}
|
||||
|
||||
if p.stopped.Load() {
|
||||
return noop.NewLoggerProvider().Logger(name, opts...)
|
||||
}
|
||||
|
||||
cfg := log.NewLoggerConfig(opts...)
|
||||
scope := instrumentation.Scope{
|
||||
Name: name,
|
||||
Version: cfg.InstrumentationVersion(),
|
||||
SchemaURL: cfg.SchemaURL(),
|
||||
}
|
||||
|
||||
p.loggersMu.Lock()
|
||||
defer p.loggersMu.Unlock()
|
||||
|
||||
if p.loggers == nil {
|
||||
l := newLogger(p, scope)
|
||||
p.loggers = map[instrumentation.Scope]*logger{scope: l}
|
||||
return l
|
||||
}
|
||||
|
||||
l, ok := p.loggers[scope]
|
||||
if !ok {
|
||||
l = newLogger(p, scope)
|
||||
p.loggers[scope] = l
|
||||
}
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
// Shutdown flushes queued log records and shuts down the decorated expoter.
|
||||
//
|
||||
// This method can be called concurrently.
|
||||
func (p *LoggerProvider) Shutdown(ctx context.Context) error {
|
||||
// TODO (#5060): Implement.
|
||||
return nil
|
||||
stopped := p.stopped.Swap(true)
|
||||
if stopped {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, p := range p.processors {
|
||||
err = errors.Join(err, p.Shutdown(ctx))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// ForceFlush flushes all exporters.
|
||||
//
|
||||
// This method can be called concurrently.
|
||||
func (p *LoggerProvider) ForceFlush(ctx context.Context) error {
|
||||
// TODO (#5060): Implement.
|
||||
return nil
|
||||
if p.stopped.Load() {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, p := range p.processors {
|
||||
err = errors.Join(err, p.ForceFlush(ctx))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// LoggerProviderOption applies a configuration option value to a LoggerProvider.
|
||||
|
@ -6,23 +6,50 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/go-logr/logr/testr"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/log/noop"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
type processor struct {
|
||||
name string
|
||||
Name string
|
||||
Err error
|
||||
|
||||
shutdownCalls int
|
||||
forceFlushCalls int
|
||||
}
|
||||
|
||||
func (processor) OnEmit(context.Context, Record) error { return nil }
|
||||
func (processor) Enabled(context.Context, Record) bool { return true }
|
||||
func (processor) Shutdown(context.Context) error { return nil }
|
||||
func (processor) ForceFlush(context.Context) error { return nil }
|
||||
func newProcessor(name string) *processor {
|
||||
return &processor{Name: name}
|
||||
}
|
||||
|
||||
func (p *processor) OnEmit(context.Context, Record) error {
|
||||
return p.Err
|
||||
}
|
||||
|
||||
func (*processor) Enabled(context.Context, Record) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *processor) Shutdown(context.Context) error {
|
||||
p.shutdownCalls++
|
||||
return p.Err
|
||||
}
|
||||
|
||||
func (p *processor) ForceFlush(context.Context) error {
|
||||
p.forceFlushCalls++
|
||||
return p.Err
|
||||
}
|
||||
|
||||
func TestNewLoggerProviderConfiguration(t *testing.T) {
|
||||
t.Cleanup(func(orig otel.ErrorHandler) func() {
|
||||
@ -33,7 +60,7 @@ func TestNewLoggerProviderConfiguration(t *testing.T) {
|
||||
}(otel.GetErrorHandler()))
|
||||
|
||||
res := resource.NewSchemaless(attribute.String("key", "value"))
|
||||
p0, p1 := processor{name: "0"}, processor{name: "1"}
|
||||
p0, p1 := newProcessor("0"), newProcessor("1")
|
||||
attrCntLim := 12
|
||||
attrValLenLim := 21
|
||||
|
||||
@ -124,3 +151,136 @@ func TestLimitValueFailsOpen(t *testing.T) {
|
||||
var l limit
|
||||
assert.Equal(t, -1, l.Value(), "limit value should default to unlimited")
|
||||
}
|
||||
|
||||
func TestLoggerProviderConcurrentSafe(t *testing.T) {
|
||||
const goRoutineN = 10
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(goRoutineN)
|
||||
|
||||
p := NewLoggerProvider(WithProcessor(newProcessor("0")))
|
||||
const name = "testLogger"
|
||||
ctx := context.Background()
|
||||
for i := 0; i < goRoutineN; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
_ = p.Logger(name)
|
||||
_ = p.Shutdown(ctx)
|
||||
_ = p.ForceFlush(ctx)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type logSink struct {
|
||||
logr.LogSink
|
||||
|
||||
level int
|
||||
msg string
|
||||
keysAndValues []interface{}
|
||||
}
|
||||
|
||||
func (l *logSink) Enabled(int) bool { return true }
|
||||
|
||||
func (l *logSink) Info(level int, msg string, keysAndValues ...any) {
|
||||
l.level, l.msg, l.keysAndValues = level, msg, keysAndValues
|
||||
l.LogSink.Info(level, msg, keysAndValues)
|
||||
}
|
||||
|
||||
func TestLoggerProviderLogger(t *testing.T) {
|
||||
t.Run("InvalidName", func(t *testing.T) {
|
||||
l := &logSink{LogSink: testr.New(t).GetSink()}
|
||||
t.Cleanup(func(orig logr.Logger) func() {
|
||||
global.SetLogger(logr.New(l))
|
||||
return func() { global.SetLogger(orig) }
|
||||
}(global.GetLogger()))
|
||||
|
||||
_ = NewLoggerProvider().Logger("")
|
||||
assert.Equal(t, 1, l.level, "logged level")
|
||||
assert.Equal(t, "Invalid Logger name.", l.msg, "logged message")
|
||||
require.Len(t, l.keysAndValues, 2, "logged key values")
|
||||
assert.Equal(t, "", l.keysAndValues[1], "logged name")
|
||||
})
|
||||
|
||||
t.Run("Stopped", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
p := NewLoggerProvider()
|
||||
_ = p.Shutdown(ctx)
|
||||
l := p.Logger("testing")
|
||||
|
||||
assert.NotNil(t, l)
|
||||
assert.IsType(t, noop.Logger{}, l)
|
||||
})
|
||||
|
||||
t.Run("SameLoggers", func(t *testing.T) {
|
||||
p := NewLoggerProvider()
|
||||
|
||||
l0, l1 := p.Logger("l0"), p.Logger("l1")
|
||||
l2, l3 := p.Logger("l0"), p.Logger("l1")
|
||||
|
||||
assert.Same(t, l0, l2)
|
||||
assert.Same(t, l1, l3)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLoggerProviderShutdown(t *testing.T) {
|
||||
t.Run("Once", func(t *testing.T) {
|
||||
proc := newProcessor("")
|
||||
p := NewLoggerProvider(WithProcessor(proc))
|
||||
|
||||
ctx := context.Background()
|
||||
require.NoError(t, p.Shutdown(ctx))
|
||||
require.Equal(t, 1, proc.shutdownCalls, "processor Shutdown not called")
|
||||
|
||||
require.NoError(t, p.Shutdown(ctx))
|
||||
assert.Equal(t, 1, proc.shutdownCalls, "processor Shutdown called multiple times")
|
||||
})
|
||||
|
||||
t.Run("Error", func(t *testing.T) {
|
||||
proc := newProcessor("")
|
||||
proc.Err = assert.AnError
|
||||
p := NewLoggerProvider(WithProcessor(proc))
|
||||
|
||||
ctx := context.Background()
|
||||
assert.ErrorIs(t, p.Shutdown(ctx), assert.AnError, "processor error not returned")
|
||||
})
|
||||
}
|
||||
|
||||
func TestLoggerProviderForceFlush(t *testing.T) {
|
||||
t.Run("Stopped", func(t *testing.T) {
|
||||
proc := newProcessor("")
|
||||
p := NewLoggerProvider(WithProcessor(proc))
|
||||
|
||||
ctx := context.Background()
|
||||
require.NoError(t, p.ForceFlush(ctx))
|
||||
require.Equal(t, 1, proc.forceFlushCalls, "processor ForceFlush not called")
|
||||
|
||||
require.NoError(t, p.Shutdown(ctx))
|
||||
|
||||
require.NoError(t, p.ForceFlush(ctx))
|
||||
assert.Equal(t, 1, proc.forceFlushCalls, "processor ForceFlush called after Shutdown")
|
||||
})
|
||||
|
||||
t.Run("Multi", func(t *testing.T) {
|
||||
proc := newProcessor("")
|
||||
p := NewLoggerProvider(WithProcessor(proc))
|
||||
|
||||
ctx := context.Background()
|
||||
require.NoError(t, p.ForceFlush(ctx))
|
||||
require.Equal(t, 1, proc.forceFlushCalls, "processor ForceFlush not called")
|
||||
|
||||
require.NoError(t, p.ForceFlush(ctx))
|
||||
assert.Equal(t, 2, proc.forceFlushCalls, "processor ForceFlush not called multiple times")
|
||||
})
|
||||
|
||||
t.Run("Error", func(t *testing.T) {
|
||||
proc := newProcessor("")
|
||||
proc.Err = assert.AnError
|
||||
p := NewLoggerProvider(WithProcessor(proc))
|
||||
|
||||
ctx := context.Background()
|
||||
assert.ErrorIs(t, p.ForceFlush(ctx), assert.AnError, "processor error not returned")
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user