From 5da6cd28a8b626c5f1e02dad8697568f58554797 Mon Sep 17 00:00:00 2001 From: Joe Stephenson Date: Fri, 11 Jul 2025 08:54:13 +0100 Subject: [PATCH] sdk/log: Add WithAllowKeyDuplication logger provider option (#6968) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #5133 This couldn't be added as an option on a processor, as that would involve moving all the attribute deduplication. logic outside of the record type. Instead this PR provides the same functionality but it is set when creating the log provider The below benchstat report shows the performance improvement when `allowDupKeys` is set ``` goos: darwin goarch: arm64 pkg: go.opentelemetry.io/otel/sdk/log cpu: Apple M2 Pro │ withoutDedup.txt │ withDedup.txt │ │ sec/op │ sec/op vs base │ SetAddAttributes/SetAttributes-12 141.3n ± 2% 167.4n ± 5% +18.51% (p=0.000 n=10) SetAddAttributes/AddAttributes-12 117.5n ± 2% 124.8n ± 5% +6.17% (p=0.000 n=10) geomean 128.9n 144.5n +12.17% │ withoutDedup.txt │ withDedup.txt │ │ B/op │ B/op vs base │ SetAddAttributes/SetAttributes-12 48.00 ± 0% 48.00 ± 0% ~ (p=1.000 n=10) ¹ SetAddAttributes/AddAttributes-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean │ withoutDedup.txt │ withDedup.txt │ │ allocs/op │ allocs/op vs base │ SetAddAttributes/SetAttributes-12 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ SetAddAttributes/AddAttributes-12 0.000 ± 0% 0.000 ± 0% ~ (p=1.000 n=10) ¹ geomean ² +0.00% ² ¹ all samples are equal ² summaries must be >0 to compute geomean ``` --- CHANGELOG.md | 1 + sdk/log/logger.go | 1 + sdk/log/logger_test.go | 39 +++++++++ sdk/log/provider.go | 21 +++++ sdk/log/provider_test.go | 2 + sdk/log/record.go | 102 ++++++++++++---------- sdk/log/record_test.go | 178 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 300 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a1e084d5..64d910962 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `RPCGRPCRequestMetadata` - `RPCGRPCResponseMetadata` - Add `ErrorType` attribute helper function to the `go.opentelmetry.io/otel/semconv/v1.34.0` package. (#6962) +- Add `WithAllowKeyDuplication` in `go.opentelemetry.io/otel/sdk/log` which can be used to disable deduplication for log records. (#6968) ### Changed diff --git a/sdk/log/logger.go b/sdk/log/logger.go index 1ec8ff883..d3acd0562 100644 --- a/sdk/log/logger.go +++ b/sdk/log/logger.go @@ -94,6 +94,7 @@ func (l *logger) newRecord(ctx context.Context, r log.Record) Record { scope: &l.instrumentationScope, attributeValueLengthLimit: l.provider.attributeValueLengthLimit, attributeCountLimit: l.provider.attributeCountLimit, + allowDupKeys: l.provider.allowDupKeys, } // This field SHOULD be set once the event is observed by OpenTelemetry. diff --git a/sdk/log/logger_test.go b/sdk/log/logger_test.go index fa22cd31a..fd5e8f73b 100644 --- a/sdk/log/logger_test.go +++ b/sdk/log/logger_test.go @@ -47,6 +47,11 @@ func TestLoggerEmit(t *testing.T) { rWithNoObservedTimestamp := r rWithNoObservedTimestamp.SetObservedTimestamp(time.Time{}) + rWithoutDeduplicateAttributes := r + rWithoutDeduplicateAttributes.AddAttributes( + log.String("k1", "str1"), + ) + contextWithSpanContext := trace.ContextWithSpanContext( context.Background(), trace.NewSpanContext(trace.SpanContextConfig{ @@ -206,6 +211,40 @@ func TestLoggerEmit(t *testing.T) { }, }, }, + { + name: "WithoutAttributeDeduplication", + logger: newLogger(NewLoggerProvider( + WithProcessor(p0), + WithProcessor(p1), + WithAttributeValueLengthLimit(5), + WithAttributeCountLimit(5), + WithResource(resource.NewSchemaless(attribute.String("key", "value"))), + WithAllowKeyDuplication(), + ), instrumentation.Scope{Name: "scope"}), + ctx: context.Background(), + record: rWithoutDeduplicateAttributes, + expectedRecords: []Record{ + { + eventName: r.EventName(), + timestamp: r.Timestamp(), + body: r.Body(), + severity: r.Severity(), + severityText: r.SeverityText(), + observedTimestamp: r.ObservedTimestamp(), + resource: resource.NewSchemaless(attribute.String("key", "value")), + attributeValueLengthLimit: 5, + attributeCountLimit: 5, + scope: &instrumentation.Scope{Name: "scope"}, + front: [attributesInlineCount]log.KeyValue{ + log.String("k1", "str"), + log.Float64("k2", 1.0), + log.String("k1", "str1"), + }, + nFront: 3, + allowDupKeys: true, + }, + }, + }, } for _, tc := range testCases { diff --git a/sdk/log/provider.go b/sdk/log/provider.go index 359357b7e..c69422e12 100644 --- a/sdk/log/provider.go +++ b/sdk/log/provider.go @@ -32,6 +32,7 @@ type providerConfig struct { fltrProcessors []FilterProcessor attrCntLim setting[int] attrValLenLim setting[int] + allowDupKeys setting[bool] } func newProviderConfig(opts []LoggerProviderOption) providerConfig { @@ -67,6 +68,7 @@ type LoggerProvider struct { fltrProcessors []FilterProcessor attributeCountLimit int attributeValueLengthLimit int + allowDupKeys bool loggersMu sync.Mutex loggers map[instrumentation.Scope]*logger @@ -93,6 +95,7 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider { fltrProcessors: cfg.fltrProcessors, attributeCountLimit: cfg.attrCntLim.Value, attributeValueLengthLimit: cfg.attrValLenLim.Value, + allowDupKeys: cfg.allowDupKeys.Value, } } @@ -254,3 +257,21 @@ func WithAttributeValueLengthLimit(limit int) LoggerProviderOption { return cfg }) } + +// WithAllowKeyDuplication sets whether deduplication is skipped for log attributes or other key-value collections. +// +// By default, the key-value collections within a log record are deduplicated to comply with the OpenTelemetry Specification. +// Deduplication means that if multiple key–value pairs with the same key are present, only a single pair +// is retained and others are discarded. +// +// Disabling deduplication with this option can improve performance e.g. of adding attributes to the log record. +// +// Note that if you disable deduplication, you are responsible for ensuring that duplicate +// key-value pairs within in a single collection are not emitted, +// or that the telemetry receiver can handle such duplicates. +func WithAllowKeyDuplication() LoggerProviderOption { + return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig { + cfg.allowDupKeys = newSetting(true) + return cfg + }) +} diff --git a/sdk/log/provider_test.go b/sdk/log/provider_test.go index 9497baa73..a50fa5d25 100644 --- a/sdk/log/provider_test.go +++ b/sdk/log/provider_test.go @@ -115,12 +115,14 @@ func TestNewLoggerProviderConfiguration(t *testing.T) { WithProcessor(p1), WithAttributeCountLimit(attrCntLim), WithAttributeValueLengthLimit(attrValLenLim), + WithAllowKeyDuplication(), }, want: &LoggerProvider{ resource: res, processors: []Processor{p0, p1}, attributeCountLimit: attrCntLim, attributeValueLengthLimit: attrValLenLim, + allowDupKeys: true, }, }, { diff --git a/sdk/log/record.go b/sdk/log/record.go index 38fd65079..3f6fec5f3 100644 --- a/sdk/log/record.go +++ b/sdk/log/record.go @@ -93,6 +93,9 @@ type Record struct { attributeValueLengthLimit int attributeCountLimit int + // specifies whether we should deduplicate any key value collections or not + allowDupKeys bool + noCmp [0]func() //nolint: unused // This is indeed used. } @@ -192,56 +195,60 @@ func (r *Record) AddAttributes(attrs ...log.KeyValue) { if n == 0 { // Avoid the more complex duplicate map lookups below. var drop int - attrs, drop = dedup(attrs) - r.setDropped(drop) + if !r.allowDupKeys { + attrs, drop = dedup(attrs) + r.setDropped(drop) + } - attrs, drop = head(attrs, r.attributeCountLimit) + attrs, drop := head(attrs, r.attributeCountLimit) r.addDropped(drop) r.addAttrs(attrs) return } - // Used to find duplicates between attrs and existing attributes in r. - rIndex := r.attrIndex() - defer putIndex(rIndex) + if !r.allowDupKeys { + // Used to find duplicates between attrs and existing attributes in r. + rIndex := r.attrIndex() + defer putIndex(rIndex) - // Unique attrs that need to be added to r. This uses the same underlying - // array as attrs. - // - // Note, do not iterate attrs twice by just calling dedup(attrs) here. - unique := attrs[:0] - // Used to find duplicates within attrs itself. The index value is the - // index of the element in unique. - uIndex := getIndex() - defer putIndex(uIndex) + // Unique attrs that need to be added to r. This uses the same underlying + // array as attrs. + // + // Note, do not iterate attrs twice by just calling dedup(attrs) here. + unique := attrs[:0] + // Used to find duplicates within attrs itself. The index value is the + // index of the element in unique. + uIndex := getIndex() + defer putIndex(uIndex) - // Deduplicate attrs within the scope of all existing attributes. - for _, a := range attrs { - // Last-value-wins for any duplicates in attrs. - idx, found := uIndex[a.Key] - if found { - r.addDropped(1) - unique[idx] = a - continue - } - - idx, found = rIndex[a.Key] - if found { - // New attrs overwrite any existing with the same key. - r.addDropped(1) - if idx < 0 { - r.front[-(idx + 1)] = a - } else { - r.back[idx] = a + // Deduplicate attrs within the scope of all existing attributes. + for _, a := range attrs { + // Last-value-wins for any duplicates in attrs. + idx, found := uIndex[a.Key] + if found { + r.addDropped(1) + unique[idx] = a + continue + } + + idx, found = rIndex[a.Key] + if found { + // New attrs overwrite any existing with the same key. + r.addDropped(1) + if idx < 0 { + r.front[-(idx + 1)] = a + } else { + r.back[idx] = a + } + } else { + // Unique attribute. + unique = append(unique, a) + uIndex[a.Key] = len(unique) - 1 } - } else { - // Unique attribute. - unique = append(unique, a) - uIndex[a.Key] = len(unique) - 1 } + attrs = unique } - attrs = unique if r.attributeCountLimit > 0 && n+len(attrs) > r.attributeCountLimit { // Truncate the now unique attributes to comply with limit. @@ -297,8 +304,11 @@ func (r *Record) addAttrs(attrs []log.KeyValue) { // SetAttributes sets (and overrides) attributes to the log record. func (r *Record) SetAttributes(attrs ...log.KeyValue) { var drop int - attrs, drop = dedup(attrs) - r.setDropped(drop) + r.setDropped(0) + if !r.allowDupKeys { + attrs, drop = dedup(attrs) + r.setDropped(drop) + } attrs, drop = head(attrs, r.attributeCountLimit) r.addDropped(drop) @@ -426,10 +436,14 @@ func (r *Record) applyValueLimits(val log.Value) log.Value { } val = log.SliceValue(sl...) case log.KindMap: - // Deduplicate then truncate. Do not do at the same time to avoid - // wasted truncation operations. - kvs, dropped := dedup(val.AsMap()) - r.addDropped(dropped) + kvs := val.AsMap() + if !r.allowDupKeys { + // Deduplicate then truncate. Do not do at the same time to avoid + // wasted truncation operations. + var dropped int + kvs, dropped = dedup(kvs) + r.addDropped(dropped) + } for i := range kvs { kvs[i] = r.applyAttrLimits(kvs[i]) } diff --git a/sdk/log/record_test.go b/sdk/log/record_test.go index 411a8a3e4..8132dc595 100644 --- a/sdk/log/record_test.go +++ b/sdk/log/record_test.go @@ -230,6 +230,158 @@ func TestRecordDroppedAttributes(t *testing.T) { } } +func TestRecordAttrAllowDuplicateAttributes(t *testing.T) { + testcases := []struct { + name string + attrs []log.KeyValue + want []log.KeyValue + }{ + { + name: "EmptyKey", + attrs: make([]log.KeyValue, 10), + want: make([]log.KeyValue, 10), + }, + { + name: "MapKey", + attrs: []log.KeyValue{ + log.Map("key", log.Int("key", 5), log.Int("key", 10)), + }, + want: []log.KeyValue{ + log.Map("key", log.Int("key", 5), log.Int("key", 10)), + }, + }, + { + name: "NonEmptyKey", + attrs: []log.KeyValue{ + log.Bool("key", true), + log.Int64("key", 1), + log.Bool("key", false), + log.Float64("key", 2.), + log.String("key", "3"), + log.Slice("key", log.Int64Value(4)), + log.Map("key", log.Int("key", 5)), + log.Bytes("key", []byte("six")), + log.Bool("key", false), + }, + want: []log.KeyValue{ + log.Bool("key", true), + log.Int64("key", 1), + log.Bool("key", false), + log.Float64("key", 2.), + log.String("key", "3"), + log.Slice("key", log.Int64Value(4)), + log.Map("key", log.Int("key", 5)), + log.Bytes("key", []byte("six")), + log.Bool("key", false), + }, + }, + { + name: "Multiple", + attrs: []log.KeyValue{ + log.Bool("a", true), + log.Int64("b", 1), + log.Bool("a", false), + log.Float64("c", 2.), + log.String("b", "3"), + log.Slice("d", log.Int64Value(4)), + log.Map("a", log.Int("key", 5)), + log.Bytes("d", []byte("six")), + log.Bool("e", true), + log.Int("f", 1), + log.Int("f", 2), + log.Int("f", 3), + log.Float64("b", 0.0), + log.Float64("b", 0.0), + log.String("g", "G"), + log.String("h", "H"), + log.String("g", "GG"), + log.Bool("a", false), + }, + want: []log.KeyValue{ + // Order is important here. + log.Bool("a", true), + log.Int64("b", 1), + log.Bool("a", false), + log.Float64("c", 2.), + log.String("b", "3"), + log.Slice("d", log.Int64Value(4)), + log.Map("a", log.Int("key", 5)), + log.Bytes("d", []byte("six")), + log.Bool("e", true), + log.Int("f", 1), + log.Int("f", 2), + log.Int("f", 3), + log.Float64("b", 0.0), + log.Float64("b", 0.0), + log.String("g", "G"), + log.String("h", "H"), + log.String("g", "GG"), + log.Bool("a", false), + }, + }, + { + name: "NoDuplicate", + attrs: func() []log.KeyValue { + out := make([]log.KeyValue, attributesInlineCount*2) + for i := range out { + out[i] = log.Bool(strconv.Itoa(i), true) + } + return out + }(), + want: func() []log.KeyValue { + out := make([]log.KeyValue, attributesInlineCount*2) + for i := range out { + out[i] = log.Bool(strconv.Itoa(i), true) + } + return out + }(), + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + validate := func(t *testing.T, r *Record, want []log.KeyValue) { + t.Helper() + + var i int + r.WalkAttributes(func(kv log.KeyValue) bool { + if assert.Lessf(t, i, len(want), "additional: %v", kv) { + want := want[i] + assert.Truef(t, kv.Equal(want), "%d: want %v, got %v", i, want, kv) + } + i++ + return true + }) + } + + t.Run("SetAttributes", func(t *testing.T) { + r := new(Record) + r.allowDupKeys = true + r.attributeValueLengthLimit = -1 + r.SetAttributes(tc.attrs...) + validate(t, r, tc.want) + }) + + t.Run("AddAttributes/Empty", func(t *testing.T) { + r := new(Record) + r.allowDupKeys = true + r.attributeValueLengthLimit = -1 + r.AddAttributes(tc.attrs...) + validate(t, r, tc.want) + }) + + t.Run("AddAttributes/Twice", func(t *testing.T) { + r := new(Record) + r.allowDupKeys = true + r.attributeValueLengthLimit = -1 + r.AddAttributes(tc.attrs...) + r.AddAttributes(tc.attrs...) + want := append(tc.want, tc.want...) + validate(t, r, want) + }) + }) + } +} + func TestRecordAttrDeduplication(t *testing.T) { testcases := []struct { name string @@ -763,6 +915,19 @@ func BenchmarkSetAddAttributes(b *testing.B) { } }) + b.Run("SetAttributes/AllowDuplicates", func(b *testing.B) { + records := make([]Record, b.N) + for i := range records { + records[i].allowDupKeys = true + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + records[i].SetAttributes(kv) + } + }) + b.Run("AddAttributes", func(b *testing.B) { records := make([]Record, b.N) @@ -772,4 +937,17 @@ func BenchmarkSetAddAttributes(b *testing.B) { records[i].AddAttributes(kv) } }) + + b.Run("AddAttributes/AllowDuplicates", func(b *testing.B) { + records := make([]Record, b.N) + for i := range records { + records[i].allowDupKeys = true + } + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + records[i].AddAttributes(kv) + } + }) }