1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-26 03:52:03 +02:00

Move log.Processor.Enabled to independent FilterProcessor interfaced type (#5692)

Closes #5425 

Our current log `Processor` interface contains more functionality than
the [OTel
spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/logs/sdk.md#logrecordprocessor-operations).
The additional functionality allows processors to report back to the API
if a Record should be constructed and emitted or not, which is quite
helpful[^1][^2][^3][^4][^5].

This removes the `Enabled` method from the `Processor` type. It adds
this functionality a new optional and experimental `FilterProcessor`
interface type. The logger and provider are updated to check for this
optional interface to be implemented with the configured processors and
uses them to back the `Logger.Enabled` method, preserving existing
functionality.

By making this change:

- The `Processor` interface is now compliant with the OTel spec and does
not contain any additional unspecified behavior.
- All `Processor` implementations are no longer required to implement an
`Enabled` method. The default, when they do not implement this method,
is to assume they are enabled.

### Benchmark

```terminal
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/log
cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
                │   old.txt    │              new7.txt               │
                │    sec/op    │   sec/op     vs base                │
LoggerEnabled-8   133.30n ± 3%   32.36n ± 3%  -75.72% (p=0.000 n=10)

                │  old.txt   │            new7.txt            │
                │    B/op    │    B/op     vs base            │
LoggerEnabled-8   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal

                │  old.txt   │            new7.txt            │
                │ allocs/op  │ allocs/op   vs base            │
LoggerEnabled-8   0.000 ± 0%   0.000 ± 0%  ~ (p=1.000 n=10) ¹
¹ all samples are equal
```

This is a significant performance improvement due to the `Record` no
longer being converted from the API version to the SDK version.

[^1]: https://pkg.go.dev/go.opentelemetry.io/contrib/processors/minsev
[^2]:
https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#BatchProcessor.Enabled
[^3]:
https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#SimpleProcessor.Enabled
[^4]:
af75717ac4/bridges/otelslog/handler.go (L206-L211)
[^5]:
d0309ddd8c/bridges/otelzap/core.go (L142-L146)

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
Co-authored-by: Sam Xie <sam@samxie.me>
This commit is contained in:
Tyler Yahn 2024-08-22 09:12:23 -07:00 committed by GitHub
parent fe6c67e7e9
commit 002c0a4c03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 197 additions and 62 deletions

View File

@ -22,6 +22,10 @@ The next release will require at least [Go 1.22].
- Zero value of `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` no longer panics. (#5665)
- Add `Walk` function to `TraceState` in `go.opentelemetry.io/otel/trace` to iterate all the key-value pairs. (#5651)
- Bridge the trace state in `go.opentelemetry.io/otel/bridge/opencensus`. (#5651)
- The `FilterProcessor` interface type is added in `go.opentelemetry.io/otel/sdk/log/internal/x`.
This is an optional and experimental interface that log `Processor`s can implement to instruct the `Logger` if a `Record` will be processed or not.
It replaces the existing `Enabled` method that is removed from the `Processor` interface itself.
It does not fall within the scope of the OpenTelemetry Go versioning and stability [policy](./VERSIONING.md) and it may be changed in backwards incompatible ways or removed in feature releases. (#5692)
- Support [Go 1.23]. (#5720)
### Changed
@ -30,6 +34,8 @@ The next release will require at least [Go 1.22].
- `SimpleProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log` now returns `false` if the exporter is `nil`. (#5665)
- Update the concurrency requirements of `Exporter` in `go.opentelemetry.io/otel/sdk/log`. (#5666)
- `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` synchronizes `OnEmit` calls. (#5666)
- The `Processor` interface in `go.opentelemetry.io/otel/sdk/log` no longer includes the `Enabled` method.
See the `FilterProcessor` interface type added in `go.opentelemetry.io/otel/sdk/log/internal/x` to continue providing this functionality. (#5692)
- The `SimpleProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693)
- The `BatchProcessor` type in `go.opentelemetry.io/otel/sdk/log` is no longer comparable. (#5693)
- `NewMemberRaw`, `NewKeyProperty` and `NewKeyValuePropertyRaw` in `go.opentelemetry.io/otel/baggage` allow UTF-8 string in key. (#5132)
@ -50,6 +56,11 @@ The next release will require at least [Go 1.22].
- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#5641)
- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#5650)
### Removed
- The `Enabled` method of the `SimpleProcessor` in `go.opentelemetry.io/otel/sdk/log` is removed. (#5692)
- The `Enabled` method of the `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` is removed. (#5692)
<!-- Released section -->
<!-- Don't change this section unless doing release -->

View File

@ -196,11 +196,6 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
return nil
}
// Enabled returns if b is enabled.
func (b *BatchProcessor) Enabled(context.Context, Record) bool {
return !b.stopped.Load() && b.q != nil
}
// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
if b.stopped.Swap(true) || b.q == nil {

View File

@ -47,7 +47,6 @@ func TestEmptyBatchConfig(t *testing.T) {
ctx := context.Background()
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, *record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
@ -270,14 +269,6 @@ func TestBatchProcessor(t *testing.T) {
assert.Equal(t, 3, e.ExportN())
})
t.Run("Enabled", func(t *testing.T) {
b := NewBatchProcessor(defaultNoopExporter)
assert.True(t, b.Enabled(ctx, Record{}))
_ = b.Shutdown(ctx)
assert.False(t, b.Enabled(ctx, Record{}))
})
t.Run("Shutdown", func(t *testing.T) {
t.Run("Error", func(t *testing.T) {
e := newTestExporter(assert.AnError)

View File

@ -32,5 +32,8 @@ at a single endpoint their origin is decipherable.
See [go.opentelemetry.io/otel/log] for more information about
the OpenTelemetry Logs Bridge API.
See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about the
experimental features.
*/
package log // import "go.opentelemetry.io/otel/sdk/log"

View File

@ -7,6 +7,7 @@ import (
"context"
"fmt"
"strings"
"sync"
logapi "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/global"
@ -58,7 +59,7 @@ func ExampleProcessor_filtering() {
// Wrap the processor so that it ignores processing log records
// when a context deriving from WithIgnoreLogs is passed
// to the logging methods.
processor = &ContextFilterProcessor{processor}
processor = &ContextFilterProcessor{Processor: processor}
// The created processor can then be registered with
// the OpenTelemetry Logs SDK using the WithProcessor option.
@ -81,6 +82,15 @@ func WithIgnoreLogs(ctx context.Context) context.Context {
// [WithIgnoreLogs] is passed to its methods.
type ContextFilterProcessor struct {
log.Processor
lazyFilter sync.Once
// Use the experimental FilterProcessor interface
// (go.opentelemetry.io/otel/sdk/log/internal/x).
filter filter
}
type filter interface {
Enabled(ctx context.Context, record log.Record) bool
}
func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
@ -91,7 +101,12 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record)
}
func (p *ContextFilterProcessor) Enabled(ctx context.Context, record log.Record) bool {
return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, record)
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(filter); ok {
p.filter = f
}
})
return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, record))
}
func ignoreLogs(ctx context.Context) bool {

View File

@ -0,0 +1,35 @@
# Experimental Features
The Logs SDK contains features that have not yet stabilized.
These features are added to the OpenTelemetry Go Logs SDK prior to stabilization so that users can start experimenting with them and provide feedback.
These feature may change in backwards incompatible ways as feedback is applied.
See the [Compatibility and Stability](#compatibility-and-stability) section for more information.
## Features
- [Filter Processors](#filter-processor)
### Filter Processor
Users of logging libraries often want to know if a log `Record` will be processed or dropped before they perform complex operations to construct the `Record`.
The [`Logger`] in the Logs Bridge API provides the `Enabled` method for just this use-case.
In order for the Logs Bridge SDK to effectively implement this API, it needs to be known if the registered [`Processor`]s are enabled for the `Record` within a context.
A [`Processor`] that knows, and can identify, what `Record` it will process or drop when it is passed to `OnEmit` can communicate this to the SDK `Logger` by implementing the `FilterProcessor`.
By default, the SDK `Logger.Enabled` will return true when called.
Only if all the registered [`Processor`]s implement `FilterProcessor` and they all return `false` will `Logger.Enabled` return `false`.
See the [`minsev`] [`Processor`] for an example use-case.
It is used to filter `Record`s out that a have a `Severity` below a threshold.
[`Logger`]: https://pkg.go.dev/go.opentelemetry.io/otel/log#Logger
[`Processor`]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor
[`minsev`]: https://pkg.go.dev/go.opentelemetry.io/contrib/processors/minsev
## Compatibility and Stability
Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md).
These features may be removed or modified in successive version releases, including patch versions.
When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.

46
sdk/log/internal/x/x.go Normal file
View File

@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package x contains support for Logs SDK experimental features.
package x // import "go.opentelemetry.io/otel/sdk/log/internal/x"
import (
"context"
"go.opentelemetry.io/otel/log"
)
// FilterProcessor is a [Processor] that knows, and can identify, what
// [log.Record] it will process or drop when it is passed to OnEmit.
//
// This is useful for users of logging libraries that want to know if a [log.Record]
// will be processed or dropped before they perform complex operations to
// construct the [log.Record].
//
// [Processor] implementations that choose to support this by satisfying this
// interface are expected to re-evaluate the [log.Record]s passed to OnEmit, it is
// not expected that the caller to OnEmit will use the functionality from this
// interface prior to calling OnEmit.
//
// This should only be implemented for [Processor]s that can make reliable
// enough determination of this prior to processing a [log.Record] and where
// the result is dynamic.
//
// [Processor]: https://pkg.go.dev/go.opentelemetry.io/otel/sdk/log#Processor
type FilterProcessor interface {
// Enabled returns whether the Processor will process for the given context
// and record.
//
// The passed record is likely to be a partial record with only the
// bridge-relevant information being provided (e.g a record with only the
// Severity set). If a Logger needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and record, and will be false if the Processor will not
// process. An implementation should default to returning true for an
// indeterminate state.
//
// Implementations should not modify the record.
Enabled(ctx context.Context, record log.Record) bool
}

View File

@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/trace"
)
@ -42,13 +43,30 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
}
}
func (l *logger) Enabled(ctx context.Context, r log.Record) bool {
newRecord := l.newRecord(ctx, r)
for _, p := range l.provider.processors {
if enabled := p.Enabled(ctx, newRecord); enabled {
// Enabled returns true if at least one Processor held by the LoggerProvider
// that created the logger will process the record for the provided context.
//
// If it is not possible to definitively determine the record will be
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process the
// record.
func (l *logger) Enabled(ctx context.Context, record log.Record) bool {
fltrs := l.provider.filterProcessors()
// If there are more Processors than FilterProcessors we cannot be sure
// that all Processors will drop the record. Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(fltrs) || anyEnabled(ctx, record, fltrs)
}
func anyEnabled(ctx context.Context, r log.Record, fltrs []x.FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, r) {
// At least one Processor will process the Record.
return true
}
}
// No Processor will process the record
return false
}

View File

@ -215,8 +215,9 @@ func TestLoggerEmit(t *testing.T) {
}
func TestLoggerEnabled(t *testing.T) {
p0, p1, p2WithDisabled := newProcessor("0"), newProcessor("1"), newProcessor("2")
p2WithDisabled.enabled = false
p0 := newFltrProcessor("0", true)
p1 := newFltrProcessor("1", true)
p2WithDisabled := newFltrProcessor("2", false)
testCases := []struct {
name string
@ -273,3 +274,24 @@ func TestLoggerEnabled(t *testing.T) {
})
}
}
func BenchmarkLoggerEnabled(b *testing.B) {
provider := NewLoggerProvider(
WithProcessor(newFltrProcessor("0", false)),
WithProcessor(newFltrProcessor("1", true)),
)
logger := provider.Logger("BenchmarkLoggerEnabled")
ctx, r := context.Background(), log.Record{}
r.SetSeverityText("test")
var enabled bool
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
enabled = logger.Enabled(ctx, r)
}
_ = enabled
}

View File

@ -12,6 +12,9 @@ import (
// Any of the Processor's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Processor to manage
// this concurrency.
//
// See [go.opentelemetry.io/otel/sdk/log/internal/x] for information about how
// a Processor can be extended to support experimental features.
type Processor interface {
// OnEmit is called when a Record is emitted.
//
@ -35,27 +38,6 @@ type Processor interface {
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record *Record) error
// Enabled returns whether the Processor will process for the given context
// and record.
//
// The passed record is likely to be a partial record with only the
// bridge-relevant information being provided (e.g a record with only the
// Severity set). If a Logger needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and record, and will be false if the Processor will not
// process. The returned value may be true or false in an indeterminate
// state. An implementation should default to returning true for an
// indeterminate state, but may return false if valid reasons in particular
// circumstances exist (e.g. performance, correctness).
//
// The SDK invokes the processors sequentially in the same order as
// they were registered using [WithProcessor] until any processor returns true.
//
// Implementations should not modify the record.
Enabled(ctx context.Context, record Record) bool
// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//

View File

@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/otel/log/embedded"
"go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/sdk/resource"
)
@ -65,6 +66,9 @@ type LoggerProvider struct {
attributeCountLimit int
attributeValueLengthLimit int
fltrProcessorsOnce sync.Once
fltrProcessors []x.FilterProcessor
loggersMu sync.Mutex
loggers map[instrumentation.Scope]*logger
@ -92,6 +96,17 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider {
}
}
func (p *LoggerProvider) filterProcessors() []x.FilterProcessor {
p.fltrProcessorsOnce.Do(func() {
for _, proc := range p.processors {
if f, ok := proc.(x.FilterProcessor); ok {
p.fltrProcessors = append(p.fltrProcessors, f)
}
}
})
return p.fltrProcessors
}
// Logger returns a new [log.Logger] with the provided name and configuration.
//
// If p is shut down, a [noop.Logger] instance is returned.

View File

@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/noop"
"go.opentelemetry.io/otel/sdk/log/internal/x"
"go.opentelemetry.io/otel/sdk/resource"
)
@ -31,11 +32,10 @@ type processor struct {
forceFlushCalls int
records []Record
enabled bool
}
func newProcessor(name string) *processor {
return &processor{Name: name, enabled: true}
return &processor{Name: name}
}
func (p *processor) OnEmit(ctx context.Context, r *Record) error {
@ -47,10 +47,6 @@ func (p *processor) OnEmit(ctx context.Context, r *Record) error {
return nil
}
func (p *processor) Enabled(context.Context, Record) bool {
return p.enabled
}
func (p *processor) Shutdown(context.Context) error {
p.shutdownCalls++
return p.Err
@ -61,6 +57,25 @@ func (p *processor) ForceFlush(context.Context) error {
return p.Err
}
type fltrProcessor struct {
*processor
enabled bool
}
var _ x.FilterProcessor = (*fltrProcessor)(nil)
func newFltrProcessor(name string, enabled bool) *fltrProcessor {
return &fltrProcessor{
processor: newProcessor(name),
enabled: enabled,
}
}
func (p *fltrProcessor) Enabled(context.Context, log.Record) bool {
return p.enabled
}
func TestNewLoggerProviderConfiguration(t *testing.T) {
t.Cleanup(func(orig otel.ErrorHandler) func() {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {

View File

@ -58,11 +58,6 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
return s.exporter.Export(ctx, *records)
}
// Enabled returns true if the exporter is not nil.
func (s *SimpleProcessor) Enabled(context.Context, Record) bool {
return s.exporter != nil
}
// Shutdown shuts down the expoter.
func (s *SimpleProcessor) Shutdown(ctx context.Context) error {
if s.exporter == nil {

View File

@ -52,12 +52,6 @@ func TestSimpleProcessorOnEmit(t *testing.T) {
assert.Equal(t, []log.Record{*r}, e.records)
}
func TestSimpleProcessorEnabled(t *testing.T) {
e := new(exporter)
s := log.NewSimpleProcessor(e)
assert.True(t, s.Enabled(context.Background(), log.Record{}))
}
func TestSimpleProcessorShutdown(t *testing.T) {
e := new(exporter)
s := log.NewSimpleProcessor(e)
@ -97,7 +91,6 @@ func TestSimpleProcessorEmpty(t *testing.T) {
ctx := context.Background()
record := new(log.Record)
assert.NoError(t, s.OnEmit(ctx, record), "OnEmit")
assert.False(t, s.Enabled(ctx, *record), "Enabled")
assert.NoError(t, s.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, s.Shutdown(ctx), "Shutdown")
})
@ -119,7 +112,6 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
defer wg.Done()
_ = s.OnEmit(ctx, r)
_ = s.Enabled(ctx, *r)
_ = s.Shutdown(ctx)
_ = s.ForceFlush(ctx)
}()