1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-26 03:52:03 +02:00

sdk/log: Processor.OnEmit accetps a Record pointer (#5636)

## What

Change `Processor.OnEmit` methods to accept a record pointer.

## Why

Fixes https://github.com/open-telemetry/opentelemetry-go/issues/5219

This would be specification compliant according to discussions around
https://github.com/open-telemetry/opentelemetry-specification/pull/4067

This is inline of how processors Go span processors works and how log
processors work in other languages.

If the performance (an additional heap allocation during log processing)
would occur to be a significant problem for some users, we have at few
possible solutions:

1. Utilize PGO which may also lead to decreasing heap allocations
(sources:
https://landontclipp.github.io/blog/2023/08/25/profile-guided-optimizations-in-go/#devirtualization,
https://andrewwphillips.github.io/blog/pgo.html). Currently it does not
but I expect it may change in future.
2. Improve the Go compilers escape analysis (related to previous point)
3. introduce new "chaining processor" which can be also desirable in
other languages

## Benchstat

`old` is from `main`.
`new` is from current branch.

`new-pgo` is from current branch with PGO optimization. I first run
benchmarks to generate a CPU profile using `go test -run=^$ -bench=.
-count=10 -cpuprofile default.pgo` and then I rerun the tests with PGO.
Currently, the number of heap allocations is the same.

```
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/log
cpu: Intel(R) Core(TM) i9-10885H CPU @ 2.40GHz
                                  │    old.txt    │                new.txt                │              new-pgo.txt              │
                                  │    sec/op     │    sec/op     vs base                 │    sec/op     vs base                 │
BatchProcessorOnEmit-16              402.7n ± 18%   382.0n ±  7%         ~ (p=0.247 n=10)   376.7n ± 14%         ~ (p=0.210 n=10)
Processor/Simple-16                  350.9n ±  9%   782.5n ±  6%  +123.00% (p=0.000 n=10)   755.1n ±  5%  +115.19% (p=0.000 n=10)
Processor/Batch-16                   1.333µ ± 15%   1.497µ ± 11%   +12.27% (p=0.000 n=10)   1.528µ ±  8%   +14.63% (p=0.000 n=10)
Processor/SetTimestampSimple-16      329.5n ± 15%   711.6n ±  4%  +115.93% (p=0.000 n=10)   721.9n ±  5%  +119.04% (p=0.000 n=10)
Processor/SetTimestampBatch-16       1.163µ ±  2%   1.524µ ±  3%   +31.03% (p=0.000 n=10)   1.461µ ±  5%   +25.57% (p=0.000 n=10)
Processor/AddAttributesSimple-16     408.7n ±  3%   810.1n ±  4%   +98.21% (p=0.000 n=10)   830.1n ±  4%  +103.11% (p=0.000 n=10)
Processor/AddAttributesBatch-16      1.270µ ±  2%   1.623µ ±  4%   +27.71% (p=0.000 n=10)   1.597µ ±  7%   +25.66% (p=0.000 n=10)
Processor/SetAttributesSimple-16     436.2n ± 10%   796.1n ±  3%   +82.50% (p=0.000 n=10)   817.6n ±  4%   +87.43% (p=0.000 n=10)
Processor/SetAttributesBatch-16      1.202µ ±  2%   1.552µ ±  2%   +29.06% (p=0.000 n=10)   1.659µ ± 11%   +37.96% (p=0.000 n=10)
LoggerNewRecord/5_attributes-16      366.6n ±  3%   363.7n ±  7%         ~ (p=0.952 n=10)   426.2n ±  7%   +16.27% (p=0.000 n=10)
LoggerNewRecord/10_attributes-16     1.711µ ±  2%   1.909µ ± 18%   +11.57% (p=0.000 n=10)   2.077µ ± 10%   +21.39% (p=0.000 n=10)
LoggerProviderLogger-16              650.1n ±  4%   690.1n ±  8%    +6.15% (p=0.019 n=10)   737.6n ± 13%   +13.47% (p=0.004 n=10)
WalkAttributes/1_attributes-16       5.264n ± 12%   5.510n ±  8%         ~ (p=0.812 n=10)   5.865n ±  5%   +11.41% (p=0.011 n=10)
WalkAttributes/10_attributes-16      5.440n ±  8%   5.881n ±  7%    +8.12% (p=0.004 n=10)   6.104n ±  7%   +12.21% (p=0.005 n=10)
WalkAttributes/100_attributes-16     5.403n ±  9%   5.894n ±  9%    +9.10% (p=0.029 n=10)   5.783n ±  6%         ~ (p=0.052 n=10)
WalkAttributes/1000_attributes-16    5.196n ±  4%   5.860n ±  8%   +12.79% (p=0.000 n=10)   5.981n ± 13%   +15.13% (p=0.002 n=10)
SetAddAttributes/SetAttributes-16    181.2n ± 14%   208.1n ± 12%   +14.85% (p=0.005 n=10)   209.9n ± 11%   +15.87% (p=0.007 n=10)
SetAddAttributes/AddAttributes-16    156.7n ± 14%   161.1n ± 16%         ~ (p=0.190 n=10)   165.5n ± 15%         ~ (p=0.315 n=10)
SimpleProcessorOnEmit-16            11.775n ± 10%   9.027n ± 17%   -23.33% (p=0.000 n=10)   9.389n ± 18%   -20.26% (p=0.002 n=10)
geomean                              169.1n         209.6n         +23.98%                  215.5n         +27.48%

                        │     old.txt     │               new.txt                │              new-pgo.txt              │
                        │       B/s       │     B/s       vs base                │      B/s       vs base                │
BatchProcessorOnEmit-16   1004.39Mi ± 15%   79.88Mi ± 7%  -92.05% (p=0.000 n=10)   81.06Mi ± 12%  -91.93% (p=0.000 n=10)

                                  │   old.txt    │                new.txt                 │               new-pgo.txt               │
                                  │     B/op     │    B/op      vs base                   │     B/op      vs base                   │
BatchProcessorOnEmit-16             0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
Processor/Simple-16                   0.0 ± 0%      417.0 ± 0%         ? (p=0.000 n=10)      417.0 ±  0%         ? (p=0.000 n=10)
Processor/Batch-16                  621.5 ± 2%     1057.5 ± 1%   +70.15% (p=0.000 n=10)     1064.5 ±  1%   +71.28% (p=0.000 n=10)
Processor/SetTimestampSimple-16       0.0 ± 0%      417.0 ± 0%         ? (p=0.000 n=10)      418.0 ±  0%         ? (p=0.000 n=10)
Processor/SetTimestampBatch-16      626.5 ± 3%     1049.5 ± 1%   +67.52% (p=0.000 n=10)     1057.5 ±  2%   +68.79% (p=0.000 n=10)
Processor/AddAttributesSimple-16      0.0 ± 0%      417.0 ± 0%         ? (p=0.000 n=10)      417.0 ±  0%         ? (p=0.000 n=10)
Processor/AddAttributesBatch-16     616.5 ± 3%     1053.0 ± 2%   +70.80% (p=0.000 n=10)     1048.5 ±  2%   +70.07% (p=0.000 n=10)
Processor/SetAttributesSimple-16    48.00 ± 0%     466.00 ± 0%  +870.83% (p=0.000 n=10)     466.00 ±  0%  +870.83% (p=0.000 n=10)
Processor/SetAttributesBatch-16     648.0 ± 3%     1089.5 ± 1%   +68.13% (p=0.000 n=10)     1087.5 ±  4%   +67.82% (p=0.000 n=10)
LoggerNewRecord/5_attributes-16     0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
LoggerNewRecord/10_attributes-16    610.0 ± 0%      610.0 ± 0%         ~ (p=1.000 n=10) ¹    610.0 ±  0%         ~ (p=1.000 n=10) ¹
LoggerProviderLogger-16             354.5 ± 6%      368.0 ± 7%         ~ (p=0.288 n=10)      391.0 ± 29%         ~ (p=0.239 n=10)
WalkAttributes/1_attributes-16      0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
WalkAttributes/10_attributes-16     0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
WalkAttributes/100_attributes-16    0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
WalkAttributes/1000_attributes-16   0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/SetAttributes-16   48.00 ± 0%      48.00 ± 0%         ~ (p=1.000 n=10) ¹    48.00 ±  0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/AddAttributes-16   0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
SimpleProcessorOnEmit-16            0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
geomean                                        ²                ?                       ²                 ?                       ²
¹ all samples are equal
² summaries must be >0 to compute geomean

                                  │   old.txt    │                new.txt                │              new-pgo.txt              │
                                  │  allocs/op   │ allocs/op   vs base                   │ allocs/op   vs base                   │
BatchProcessorOnEmit-16             0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
Processor/Simple-16                 0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/Batch-16                  0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/SetTimestampSimple-16     0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/SetTimestampBatch-16      0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/AddAttributesSimple-16    0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/AddAttributesBatch-16     0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/SetAttributesSimple-16    1.000 ± 0%     2.000 ± 0%  +100.00% (p=0.000 n=10)     2.000 ± 0%  +100.00% (p=0.000 n=10)
Processor/SetAttributesBatch-16     1.000 ± 0%     2.000 ± 0%  +100.00% (p=0.000 n=10)     2.000 ± 0%  +100.00% (p=0.000 n=10)
LoggerNewRecord/5_attributes-16     0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
LoggerNewRecord/10_attributes-16    4.000 ± 0%     4.000 ± 0%         ~ (p=1.000 n=10) ¹   4.000 ± 0%         ~ (p=1.000 n=10) ¹
LoggerProviderLogger-16             1.000 ± 0%     1.000 ± 0%         ~ (p=1.000 n=10) ¹   1.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/1_attributes-16      0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/10_attributes-16     0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/100_attributes-16    0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/1000_attributes-16   0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/SetAttributes-16   1.000 ± 0%     1.000 ± 0%         ~ (p=1.000 n=10) ¹   1.000 ± 0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/AddAttributes-16   0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
SimpleProcessorOnEmit-16            0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
geomean                                        ²               ?                       ²               ?                       ²
¹ all samples are equal
² summaries must be >0 to compute geomean
```
This commit is contained in:
Robert Pająk 2024-08-01 10:13:43 +02:00 committed by GitHub
parent 2726c6aab7
commit 8e8ad092cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 175 additions and 129 deletions

View File

@ -17,6 +17,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
See our [versioning policy](VERSIONING.md) for more information about these stability guarantees. (#5629)
- Add `InstrumentationScope` field to `SpanStub` in `go.opentelemetry.io/otel/sdk/trace/tracetest`, as a replacement for the deprecated `InstrumentationLibrary`. (#5627)
### Changed
- `Processor.OnEmit` in `go.opentelemetry.io/otel/sdk/log` now accepts a pointer to `Record` instead of a value so that the record modifications done in a processor are propagated to subsequent registered processors. (#5636)
### Fixed
- Correct comments for the priority of the `WithEndpoint` and `WithEndpointURL` options and their corresponding environment variables in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5584)

View File

@ -146,7 +146,31 @@ provided via API.
Moreover it is safer to have these abstraction decoupled.
E.g. there can be a need for some fields that can be set via API and cannot be modified by the processors.
### Processor.OnEmit to accept Record values
There was a proposal to make the [Processor](#processor)'s `OnEmit`
to accept a [Record](#record) value instead of a pointer to reduce allocations
as well as to have design similar to [`slog.Handler`](https://pkg.go.dev/log/slog#Handler).
There have been long discussions within the OpenTelemetry Specification SIG[^5]
about whether such a design would comply with the specification. The summary
was that the current processor design flaws are present in other languages as
well. Therefore, it would be favorable to introduce new processing concepts
(e.g. chaining processors) in the specification that would coexist with the
current "mutable" processor design.
The performance disadvantages caused by using a pointer (which at the time of
writing causes an additional heap allocation) may be mitigated by future
versions of the Go compiler, thanks to improved escape analysis and
profile-guided optimization (PGO)[^6].
On the other hand, [Processor](#processor)'s `Enabled` is fine to accept
a [Record](#record) value as the processors should not mutate the passed
parameters.
[^1]: [A Guide to the Go Garbage Collector](https://tip.golang.org/doc/gc-guide)
[^2]: [OpenTelemetry Logging](https://opentelemetry.io/docs/specs/otel/logs)
[^3]: [Conversation on representing LogRecordProcessor and LogRecordExporter via a single Expoter interface](https://github.com/open-telemetry/opentelemetry-go/pull/4954#discussion_r1515050480)
[^4]: [Introduce Processor](https://github.com/pellared/opentelemetry-go/pull/9)
[^5]: [Log record mutations do not have to be visible in next registered processors](https://github.com/open-telemetry/opentelemetry-specification/pull/4067)
[^6]: [Profile-guided optimization](https://go.dev/doc/pgo)

View File

@ -176,11 +176,13 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
}
// OnEmit batches provided log record.
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
if b.stopped.Load() || b.q == nil {
return nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
// The record is cloned so that changes done by subsequent processors
// are not going to lead to a data race.
if n := b.q.Enqueue(r.Clone()); n >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
default:

View File

@ -45,9 +45,9 @@ func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
var record Record
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, record), "Enabled")
assert.False(t, bp.Enabled(ctx, *record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
@ -197,8 +197,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Nanosecond),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, size) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < size; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
var got []Record
assert.Eventually(t, func() bool {
@ -220,8 +220,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 10*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < 10*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 1
@ -243,8 +243,8 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 2*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
for i := 0; i < 2*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
var n int
@ -255,7 +255,7 @@ func TestBatchProcessor(t *testing.T) {
var err error
require.Eventually(t, func() bool {
err = b.OnEmit(ctx, Record{})
err = b.OnEmit(ctx, new(Record))
return true
}, time.Second, time.Microsecond, "OnEmit blocked")
assert.NoError(t, err)
@ -303,7 +303,7 @@ func TestBatchProcessor(t *testing.T) {
assert.NoError(t, b.Shutdown(ctx))
want := e.ExportN()
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.Equal(t, want, e.ExportN(), "Export called after shutdown")
})
@ -311,7 +311,7 @@ func TestBatchProcessor(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchProcessor(e)
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.NoError(t, b.Shutdown(ctx))
assert.NoError(t, b.ForceFlush(ctx))
@ -344,7 +344,7 @@ func TestBatchProcessor(t *testing.T) {
)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
require.NoError(t, b.OnEmit(ctx, r))
@ -353,7 +353,7 @@ func TestBatchProcessor(t *testing.T) {
if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") {
got := e.Records()
if assert.Len(t, got[0], 1, "records received") {
assert.Equal(t, r, got[0][0])
assert.Equal(t, *r, got[0][0])
}
}
})
@ -381,7 +381,7 @@ func TestBatchProcessor(t *testing.T) {
// Enqueue 10 x "batch size" amount of records.
for i := 0; i < 10*batch; i++ {
require.NoError(t, b.OnEmit(ctx, Record{}))
require.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input)
@ -423,7 +423,7 @@ func TestBatchProcessor(t *testing.T) {
b := NewBatchProcessor(e)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
var r Record
r := new(Record)
r.SetBody(log.BoolValue(true))
_ = b.OnEmit(ctx, r)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
@ -453,7 +453,7 @@ func TestBatchProcessor(t *testing.T) {
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
var r Record
r := new(Record)
// First record will be blocked by testExporter.Export
assert.NoError(t, b.OnEmit(ctx, r), "exported record")
require.Eventually(t, func() bool {
@ -497,7 +497,7 @@ func TestBatchProcessor(t *testing.T) {
case <-ctx.Done():
return
default:
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.OnEmit(ctx, new(Record)))
// Ignore partial flush errors.
_ = b.ForceFlush(ctx)
}
@ -642,7 +642,7 @@ func TestQueue(t *testing.T) {
}
func BenchmarkBatchProcessorOnEmit(b *testing.B) {
var r Record
r := new(Record)
body := log.BoolValue(true)
r.SetBody(body)

View File

@ -16,59 +16,77 @@ import (
func BenchmarkProcessor(b *testing.B) {
for _, tc := range []struct {
name string
f func() Processor
f func() []LoggerProviderOption
}{
{
name: "Simple",
f: func() Processor {
return NewSimpleProcessor(noopExporter{})
f: func() []LoggerProviderOption {
return []LoggerProviderOption{WithProcessor(NewSimpleProcessor(noopExporter{}))}
},
},
{
name: "Batch",
f: func() Processor {
return NewBatchProcessor(noopExporter{})
f: func() []LoggerProviderOption {
return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))}
},
},
{
name: "SetTimestampSimple",
f: func() Processor {
return timestampDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(timestampProcessor{}),
WithProcessor(NewSimpleProcessor(noopExporter{})),
}
},
},
{
name: "SetTimestampBatch",
f: func() Processor {
return timestampDecorator{NewBatchProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(timestampProcessor{}),
WithProcessor(NewBatchProcessor(noopExporter{})),
}
},
},
{
name: "AddAttributesSimple",
f: func() Processor {
return attrAddDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrAddProcessor{}),
WithProcessor(NewSimpleProcessor(noopExporter{})),
}
},
},
{
name: "AddAttributesBatch",
f: func() Processor {
return attrAddDecorator{NewBatchProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrAddProcessor{}),
WithProcessor(NewBatchProcessor(noopExporter{})),
}
},
},
{
name: "SetAttributesSimple",
f: func() Processor {
return attrSetDecorator{NewSimpleProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrSetDecorator{}),
WithProcessor(NewSimpleProcessor(noopExporter{})),
}
},
},
{
name: "SetAttributesBatch",
f: func() Processor {
return attrSetDecorator{NewBatchProcessor(noopExporter{})}
f: func() []LoggerProviderOption {
return []LoggerProviderOption{
WithProcessor(attrSetDecorator{}),
WithProcessor(NewBatchProcessor(noopExporter{})),
}
},
},
} {
b.Run(tc.name, func(b *testing.B) {
provider := NewLoggerProvider(WithProcessor(tc.f()))
provider := NewLoggerProvider(tc.f()...)
b.Cleanup(func() { assert.NoError(b, provider.Shutdown(context.Background())) })
logger := provider.Logger(b.Name())
@ -91,32 +109,59 @@ func BenchmarkProcessor(b *testing.B) {
}
}
type timestampDecorator struct {
Processor
}
type timestampProcessor struct{}
func (e timestampDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (p timestampProcessor) OnEmit(ctx context.Context, r *Record) error {
r.SetObservedTimestamp(time.Date(1988, time.November, 17, 0, 0, 0, 0, time.UTC))
return e.Processor.OnEmit(ctx, r)
return nil
}
type attrAddDecorator struct {
Processor
func (p timestampProcessor) Enabled(context.Context, Record) bool {
return true
}
func (e attrAddDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (p timestampProcessor) Shutdown(ctx context.Context) error {
return nil
}
func (p timestampProcessor) ForceFlush(ctx context.Context) error {
return nil
}
type attrAddProcessor struct{}
func (p attrAddProcessor) OnEmit(ctx context.Context, r *Record) error {
r.AddAttributes(log.String("add", "me"))
return e.Processor.OnEmit(ctx, r)
return nil
}
type attrSetDecorator struct {
Processor
func (p attrAddProcessor) Enabled(context.Context, Record) bool {
return true
}
func (e attrSetDecorator) OnEmit(ctx context.Context, r Record) error {
r = r.Clone()
func (p attrAddProcessor) Shutdown(ctx context.Context) error {
return nil
}
func (p attrAddProcessor) ForceFlush(ctx context.Context) error {
return nil
}
type attrSetDecorator struct{}
func (p attrSetDecorator) OnEmit(ctx context.Context, r *Record) error {
r.SetAttributes(log.String("replace", "me"))
return e.Processor.OnEmit(ctx, r)
return nil
}
func (p attrSetDecorator) Enabled(context.Context, Record) bool {
return true
}
func (p attrSetDecorator) Shutdown(ctx context.Context) error {
return nil
}
func (p attrSetDecorator) ForceFlush(ctx context.Context) error {
return nil
}

View File

@ -83,7 +83,7 @@ type ContextFilterProcessor struct {
log.Processor
}
func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record log.Record) error {
func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
if ignoreLogs(ctx) {
return nil
}
@ -104,35 +104,45 @@ func ExampleProcessor_redact() {
// Existing processor that emits telemetry.
var processor log.Processor = log.NewBatchProcessor(nil)
// Wrap the processor so that it redacts values from token attributes.
processor = &RedactTokensProcessor{processor}
// Add a processor so that it redacts values from token attributes.
redactProcessor := &RedactTokensProcessor{}
// The created processor can then be registered with
// the OpenTelemetry Logs SDK using the WithProcessor option.
_ = log.NewLoggerProvider(
// Order is important here. Redact before handing to the processor.
log.WithProcessor(redactProcessor),
log.WithProcessor(processor),
)
}
// RedactTokensProcessor is a [log.Processor] decorator that redacts values
// from attributes containing "token" in the key.
type RedactTokensProcessor struct {
log.Processor
}
type RedactTokensProcessor struct{}
// OnEmit redacts values from attributes containing "token" in the key
// by replacing them with a REDACTED value.
func (p *RedactTokensProcessor) OnEmit(ctx context.Context, record log.Record) error {
cloned := false
func (p *RedactTokensProcessor) OnEmit(ctx context.Context, record *log.Record) error {
record.WalkAttributes(func(kv logapi.KeyValue) bool {
if strings.Contains(strings.ToLower(kv.Key), "token") {
if !cloned {
record = record.Clone()
cloned = true
}
record.AddAttributes(logapi.String(kv.Key, "REDACTED"))
}
return true
})
return p.Processor.OnEmit(ctx, record)
return nil
}
// Enabled returns true.
func (p *RedactTokensProcessor) Enabled(context.Context, log.Record) bool {
return true
}
// Shutdown returns nil.
func (p *RedactTokensProcessor) Shutdown(ctx context.Context) error {
return nil
}
// ForceFlush returns nil.
func (p *RedactTokensProcessor) ForceFlush(ctx context.Context) error {
return nil
}

View File

@ -36,7 +36,7 @@ func newLogger(p *LoggerProvider, scope instrumentation.Scope) *logger {
func (l *logger) Emit(ctx context.Context, r log.Record) {
newRecord := l.newRecord(ctx, r)
for _, p := range l.provider.processors {
if err := p.OnEmit(ctx, newRecord); err != nil {
if err := p.OnEmit(ctx, &newRecord); err != nil {
otel.Handle(err)
}
}

View File

@ -1,46 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
//go:build !race
package log
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
)
func TestAllocationLimits(t *testing.T) {
// This test is not run with a race detector. The sync.Pool used by parts
// of the SDK has memory optimizations removed for the race detector. Do
// not test performance of the SDK in that state.
const runs = 10
logger := newLogger(NewLoggerProvider(), instrumentation.Scope{})
r := log.Record{}
r.SetTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetObservedTimestamp(time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC))
r.SetBody(log.StringValue("testing body value"))
r.SetSeverity(log.SeverityInfo)
r.SetSeverityText("testing text")
r.AddAttributes(
log.String("k1", "str"),
log.Float64("k2", 1.0),
log.Int("k3", 2),
log.Bool("k4", true),
log.Bytes("k5", []byte{1}),
)
assert.Equal(t, 0.0, testing.AllocsPerRun(runs, func() {
logger.newRecord(context.Background(), r)
}), "newRecord")
}

View File

@ -26,9 +26,14 @@ type Processor interface {
// considered unrecoverable and will be reported to a configured error
// Handler.
//
// Before modifying a Record, the implementation must use Record.Clone
// The SDK invokes the processors sequentially in the same order as
// they were registered using [WithProcessor].
// Implementations may synchronously modify the record so that the changes
// are visible in the next registered processor.
// Notice that [Record] is not concurrent safe. Therefore, asynchronous
// processing may cause race conditions. Use [Record.Clone]
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record Record) error
OnEmit(ctx context.Context, record *Record) error
// Enabled returns whether the Processor will process for the given context
// and record.
//
@ -44,8 +49,10 @@ type Processor interface {
// indeterminate state, but may return false if valid reasons in particular
// circumstances exist (e.g. performance, correctness).
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
// The SDK invokes the processors sequentially in the same order as
// they were registered using [WithProcessor] until any processor returns true.
//
// Implementations should not modify the record.
Enabled(ctx context.Context, record Record) bool
// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.

View File

@ -189,8 +189,8 @@ func WithResource(res *resource.Resource) LoggerProviderOption {
// By default, if this option is not used, the LoggerProvider will perform no
// operations; no data will be exported without a processor.
//
// Each WithProcessor creates a separate pipeline. Use custom decorators
// for advanced scenarios such as enriching with attributes.
// The SDK invokes the processors sequentially in the same order as they were
// registered.
//
// For production, use [NewBatchProcessor] to batch log records before they are exported.
// For testing and debugging, use [NewSimpleProcessor] to synchronously export log records.

View File

@ -38,12 +38,12 @@ func newProcessor(name string) *processor {
return &processor{Name: name, enabled: true}
}
func (p *processor) OnEmit(ctx context.Context, r Record) error {
func (p *processor) OnEmit(ctx context.Context, r *Record) error {
if p.Err != nil {
return p.Err
}
p.records = append(p.records, r)
p.records = append(p.records, *r)
return nil
}

View File

@ -40,9 +40,9 @@ var simpleProcRecordsPool = sync.Pool{
}
// OnEmit batches provided log record.
func (s *SimpleProcessor) OnEmit(ctx context.Context, r Record) error {
func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
records := simpleProcRecordsPool.Get().(*[]Record)
(*records)[0] = r
(*records)[0] = *r
defer func() {
simpleProcRecordsPool.Put(records)
}()

View File

@ -42,12 +42,12 @@ func TestSimpleProcessorOnEmit(t *testing.T) {
e := new(exporter)
s := log.NewSimpleProcessor(e)
var r log.Record
r := new(log.Record)
r.SetSeverityText("test")
_ = s.OnEmit(context.Background(), r)
require.True(t, e.exportCalled, "exporter Export not called")
assert.Equal(t, []log.Record{r}, e.records)
assert.Equal(t, []log.Record{*r}, e.records)
}
func TestSimpleProcessorEnabled(t *testing.T) {
@ -75,7 +75,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
wg.Add(goRoutineN)
var r log.Record
r := new(log.Record)
r.SetSeverityText("test")
ctx := context.Background()
s := log.NewSimpleProcessor(nil)
@ -84,7 +84,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
defer wg.Done()
_ = s.OnEmit(ctx, r)
_ = s.Enabled(ctx, r)
_ = s.Enabled(ctx, *r)
_ = s.Shutdown(ctx)
_ = s.ForceFlush(ctx)
}()
@ -94,7 +94,7 @@ func TestSimpleProcessorConcurrentSafe(t *testing.T) {
}
func BenchmarkSimpleProcessorOnEmit(b *testing.B) {
var r log.Record
r := new(log.Record)
r.SetSeverityText("test")
ctx := context.Background()
s := log.NewSimpleProcessor(nil)