From 1e357c7c9ab25a2d8ef982cb282ce91a82c1f0bf Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 29 Apr 2024 07:51:22 -0700 Subject: [PATCH] Truncate and de-duplicate log attribute values (#5230) * Truncate and de-duplicate log attr values * Fix test in otlploghttp * Remove duplicate decl of assertKV --- .../internal/transform/log_test.go | 94 +++--- sdk/log/record.go | 83 ++++- sdk/log/record_test.go | 298 ++++++++++++++++++ 3 files changed, 430 insertions(+), 45 deletions(-) diff --git a/exporters/otlp/otlplog/otlploghttp/internal/transform/log_test.go b/exporters/otlp/otlplog/otlploghttp/internal/transform/log_test.go index 2ce4557f7..311417581 100644 --- a/exporters/otlp/otlplog/otlploghttp/internal/transform/log_test.go +++ b/exporters/otlp/otlplog/otlploghttp/internal/transform/log_test.go @@ -14,6 +14,8 @@ import ( api "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/sdk/log" + "go.opentelemetry.io/otel/sdk/log/logtest" + "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/trace" ) @@ -60,51 +62,61 @@ var ( flagsB = byte(0) records = func() []log.Record { - r0 := new(log.Record) - r0.SetTimestamp(ts) - r0.SetObservedTimestamp(obs) - r0.SetSeverity(sevA) - r0.SetSeverityText("A") - r0.SetBody(bodyA) - r0.SetAttributes(alice) - r0.SetTraceID(trace.TraceID(traceIDA)) - r0.SetSpanID(trace.SpanID(spanIDA)) - r0.SetTraceFlags(trace.TraceFlags(flagsA)) + var out []log.Record - r1 := new(log.Record) - r1.SetTimestamp(ts) - r1.SetObservedTimestamp(obs) - r1.SetSeverity(sevA) - r1.SetSeverityText("A") - r1.SetBody(bodyA) - r1.SetAttributes(bob) - r1.SetTraceID(trace.TraceID(traceIDA)) - r1.SetSpanID(trace.SpanID(spanIDA)) - r1.SetTraceFlags(trace.TraceFlags(flagsA)) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevA, + SeverityText: "A", + Body: bodyA, + Attributes: []api.KeyValue{alice}, + TraceID: trace.TraceID(traceIDA), + SpanID: trace.SpanID(spanIDA), + TraceFlags: trace.TraceFlags(flagsA), + Resource: resource.Empty(), // TODO(#5228): populate and test. + }.NewRecord()) - r2 := new(log.Record) - r2.SetTimestamp(ts) - r2.SetObservedTimestamp(obs) - r2.SetSeverity(sevB) - r2.SetSeverityText("B") - r2.SetBody(bodyB) - r2.SetAttributes(alice) - r2.SetTraceID(trace.TraceID(traceIDB)) - r2.SetSpanID(trace.SpanID(spanIDB)) - r2.SetTraceFlags(trace.TraceFlags(flagsB)) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevA, + SeverityText: "A", + Body: bodyA, + Attributes: []api.KeyValue{bob}, + TraceID: trace.TraceID(traceIDA), + SpanID: trace.SpanID(spanIDA), + TraceFlags: trace.TraceFlags(flagsA), + Resource: resource.Empty(), // TODO(#5228): populate and test. + }.NewRecord()) - r3 := new(log.Record) - r3.SetTimestamp(ts) - r3.SetObservedTimestamp(obs) - r3.SetSeverity(sevB) - r3.SetSeverityText("B") - r3.SetBody(bodyB) - r3.SetAttributes(bob) - r3.SetTraceID(trace.TraceID(traceIDB)) - r3.SetSpanID(trace.SpanID(spanIDB)) - r3.SetTraceFlags(trace.TraceFlags(flagsB)) + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevB, + SeverityText: "B", + Body: bodyB, + Attributes: []api.KeyValue{alice}, + TraceID: trace.TraceID(traceIDB), + SpanID: trace.SpanID(spanIDB), + TraceFlags: trace.TraceFlags(flagsB), + Resource: resource.Empty(), // TODO(#5228): populate and test. + }.NewRecord()) - return []log.Record{*r0, *r1, *r2, *r3} + out = append(out, logtest.RecordFactory{ + Timestamp: ts, + ObservedTimestamp: obs, + Severity: sevB, + SeverityText: "B", + Body: bodyB, + Attributes: []api.KeyValue{bob}, + TraceID: trace.TraceID(traceIDB), + SpanID: trace.SpanID(spanIDB), + TraceFlags: trace.TraceFlags(flagsB), + Resource: resource.Empty(), // TODO(#5228): populate and test. + }.NewRecord()) + + return out }() pbLogRecords = []*lpb.LogRecord{ diff --git a/sdk/log/record.go b/sdk/log/record.go index 74ae2888e..2cdac6e75 100644 --- a/sdk/log/record.go +++ b/sdk/log/record.go @@ -5,8 +5,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "slices" + "strings" "sync" "time" + "unicode/utf8" "go.opentelemetry.io/otel/log" "go.opentelemetry.io/otel/sdk/instrumentation" @@ -198,8 +200,6 @@ func (r *Record) AddAttributes(attrs ...log.KeyValue) { } } else { // Unique attribute. - // TODO: apply truncation to string and []string values. - // TODO: deduplicate map values. unique = append(unique, a) uIndex[a.Key] = len(unique) - 1 } @@ -246,10 +246,13 @@ func (r *Record) addAttrs(attrs []log.KeyValue) { var i int for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ { a := attrs[i] - r.front[r.nFront] = a + r.front[r.nFront] = r.applyAttrLimits(a) r.nFront++ } + for j, a := range attrs[i:] { + attrs[i+j] = r.applyAttrLimits(a) + } r.back = slices.Grow(r.back, len(attrs[i:])) r.back = append(r.back, attrs[i:]...) } @@ -268,11 +271,14 @@ func (r *Record) SetAttributes(attrs ...log.KeyValue) { var i int for i = 0; i < len(attrs) && r.nFront < len(r.front); i++ { a := attrs[i] - r.front[r.nFront] = a + r.front[r.nFront] = r.applyAttrLimits(a) r.nFront++ } r.back = slices.Clone(attrs[i:]) + for i, a := range r.back { + r.back[i] = r.applyAttrLimits(a) + } } // head returns the first n values of kvs along with the number of elements @@ -367,3 +373,72 @@ func (r *Record) Clone() Record { res.back = slices.Clone(r.back) return res } + +func (r Record) applyAttrLimits(attr log.KeyValue) log.KeyValue { + attr.Value = r.applyValueLimits(attr.Value) + return attr +} + +func (r Record) applyValueLimits(val log.Value) log.Value { + switch val.Kind() { + case log.KindString: + s := val.AsString() + if len(s) > r.attributeValueLengthLimit { + val = log.StringValue(truncate(s, r.attributeValueLengthLimit)) + } + case log.KindSlice: + sl := val.AsSlice() + for i := range sl { + sl[i] = r.applyValueLimits(sl[i]) + } + val = log.SliceValue(sl...) + case log.KindMap: + // Deduplicate then truncate. Do not do at the same time to avoid + // wasted truncation operations. + kvs, dropped := dedup(val.AsMap()) + r.dropped += dropped + for i := range kvs { + kvs[i] = r.applyAttrLimits(kvs[i]) + } + val = log.MapValue(kvs...) + } + return val +} + +// truncate returns a copy of str truncated to have a length of at most n +// characters. If the length of str is less than n, str itself is returned. +// +// The truncate of str ensures that no valid UTF-8 code point is split. The +// copy returned will be less than n if a characters straddles the length +// limit. +// +// No truncation is performed if n is less than zero. +func truncate(str string, n int) string { + if n < 0 { + return str + } + + // cut returns a copy of the s truncated to not exceed a length of n. If + // invalid UTF-8 is encountered, s is returned with false. Otherwise, the + // truncated copy will be returned with true. + cut := func(s string) (string, bool) { + var i int + for i = 0; i < n; { + r, size := utf8.DecodeRuneInString(s[i:]) + if r == utf8.RuneError { + return s, false + } + if i+size > n { + break + } + i += size + } + return s[:i], true + } + + cp, ok := cut(str) + if !ok { + cp, _ = cut(strings.ToValidUTF8(str, "")) + } + return cp +} diff --git a/sdk/log/record_test.go b/sdk/log/record_test.go index 99adfdfa9..ccdcb98fa 100644 --- a/sdk/log/record_test.go +++ b/sdk/log/record_test.go @@ -4,11 +4,13 @@ package log import ( + "fmt" "strconv" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/log" @@ -63,6 +65,7 @@ func TestRecordAttributes(t *testing.T) { log.Bytes("6", []byte("six")), } r := new(Record) + r.attributeValueLengthLimit = -1 r.SetAttributes(attrs...) r.SetAttributes(attrs[:2]...) // Overwrite existing. r.AddAttributes(attrs[2:]...) @@ -309,18 +312,21 @@ func TestRecordAttrDeduplication(t *testing.T) { t.Run("SetAttributes", func(t *testing.T) { r := new(Record) + r.attributeValueLengthLimit = -1 r.SetAttributes(tc.attrs...) validate(t, r) }) t.Run("AddAttributes/Empty", func(t *testing.T) { r := new(Record) + r.attributeValueLengthLimit = -1 r.AddAttributes(tc.attrs...) validate(t, r) }) t.Run("AddAttributes/Duplicates", func(t *testing.T) { r := new(Record) + r.attributeValueLengthLimit = -1 r.AddAttributes(tc.attrs...) r.AddAttributes(tc.attrs...) validate(t, r) @@ -328,3 +334,295 @@ func TestRecordAttrDeduplication(t *testing.T) { }) } } + +func TestApplyAttrLimitsDeduplication(t *testing.T) { + testcases := []struct { + name string + limit int + input, want log.Value + }{ + { + // No de-duplication + name: "Slice", + input: log.SliceValue( + log.BoolValue(true), + log.BoolValue(true), + log.Float64Value(1.3), + log.Float64Value(1.3), + log.Int64Value(43), + log.Int64Value(43), + log.BytesValue([]byte("hello")), + log.BytesValue([]byte("hello")), + log.StringValue("foo"), + log.StringValue("foo"), + log.SliceValue(log.StringValue("baz")), + log.SliceValue(log.StringValue("baz")), + log.MapValue(log.String("a", "qux")), + log.MapValue(log.String("a", "qux")), + ), + want: log.SliceValue( + log.BoolValue(true), + log.BoolValue(true), + log.Float64Value(1.3), + log.Float64Value(1.3), + log.Int64Value(43), + log.Int64Value(43), + log.BytesValue([]byte("hello")), + log.BytesValue([]byte("hello")), + log.StringValue("foo"), + log.StringValue("foo"), + log.SliceValue(log.StringValue("baz")), + log.SliceValue(log.StringValue("baz")), + log.MapValue(log.String("a", "qux")), + log.MapValue(log.String("a", "qux")), + ), + }, + { + name: "Map", + input: log.MapValue( + log.Bool("a", true), + log.Int64("b", 1), + log.Bool("a", false), + log.Float64("c", 2.), + log.String("b", "3"), + log.Slice("d", log.Int64Value(4)), + log.Map("a", log.Int("key", 5)), + log.Bytes("d", []byte("six")), + log.Bool("e", true), + log.Int("f", 1), + log.Int("f", 2), + log.Int("f", 3), + log.Float64("b", 0.0), + log.Float64("b", 0.0), + log.String("g", "G"), + log.String("h", "H"), + log.String("g", "GG"), + log.Bool("a", false), + ), + want: log.MapValue( + // Order is important here. + log.Bool("a", false), + log.Float64("b", 0.0), + log.Float64("c", 2.), + log.Bytes("d", []byte("six")), + log.Bool("e", true), + log.Int("f", 3), + log.String("g", "GG"), + log.String("h", "H"), + ), + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + const key = "key" + kv := log.KeyValue{Key: key, Value: tc.input} + r := Record{attributeValueLengthLimit: -1} + + t.Run("AddAttributes", func(t *testing.T) { + r.AddAttributes(kv) + assertKV(t, r, log.KeyValue{Key: key, Value: tc.want}) + }) + + t.Run("SetAttributes", func(t *testing.T) { + r.SetAttributes(kv) + assertKV(t, r, log.KeyValue{Key: key, Value: tc.want}) + }) + }) + } +} + +func TestApplyAttrLimitsTruncation(t *testing.T) { + testcases := []struct { + name string + limit int + input, want log.Value + }{ + { + name: "Empty", + limit: 0, + input: log.Value{}, + want: log.Value{}, + }, + { + name: "Bool", + limit: 0, + input: log.BoolValue(true), + want: log.BoolValue(true), + }, + { + name: "Float64", + limit: 0, + input: log.Float64Value(1.3), + want: log.Float64Value(1.3), + }, + { + name: "Int64", + limit: 0, + input: log.Int64Value(43), + want: log.Int64Value(43), + }, + { + name: "Bytes", + limit: 0, + input: log.BytesValue([]byte("foo")), + want: log.BytesValue([]byte("foo")), + }, + { + name: "String", + limit: 0, + input: log.StringValue("foo"), + want: log.StringValue(""), + }, + { + name: "Slice", + limit: 0, + input: log.SliceValue( + log.BoolValue(true), + log.Float64Value(1.3), + log.Int64Value(43), + log.BytesValue([]byte("hello")), + log.StringValue("foo"), + log.StringValue("bar"), + log.SliceValue(log.StringValue("baz")), + log.MapValue(log.String("a", "qux")), + ), + want: log.SliceValue( + log.BoolValue(true), + log.Float64Value(1.3), + log.Int64Value(43), + log.BytesValue([]byte("hello")), + log.StringValue(""), + log.StringValue(""), + log.SliceValue(log.StringValue("")), + log.MapValue(log.String("a", "")), + ), + }, + { + name: "Map", + limit: 0, + input: log.MapValue( + log.Bool("0", true), + log.Float64("1", 1.3), + log.Int64("2", 43), + log.Bytes("3", []byte("hello")), + log.String("4", "foo"), + log.String("5", "bar"), + log.Slice("6", log.StringValue("baz")), + log.Map("7", log.String("a", "qux")), + ), + want: log.MapValue( + log.Bool("0", true), + log.Float64("1", 1.3), + log.Int64("2", 43), + log.Bytes("3", []byte("hello")), + log.String("4", ""), + log.String("5", ""), + log.Slice("6", log.StringValue("")), + log.Map("7", log.String("a", "")), + ), + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + const key = "key" + kv := log.KeyValue{Key: key, Value: tc.input} + r := Record{attributeValueLengthLimit: tc.limit} + + t.Run("AddAttributes", func(t *testing.T) { + r.AddAttributes(kv) + assertKV(t, r, log.KeyValue{Key: key, Value: tc.want}) + }) + + t.Run("SetAttributes", func(t *testing.T) { + r.SetAttributes(kv) + assertKV(t, r, log.KeyValue{Key: key, Value: tc.want}) + }) + }) + } +} + +func assertKV(t *testing.T, r Record, kv log.KeyValue) { + t.Helper() + + var kvs []log.KeyValue + r.WalkAttributes(func(kv log.KeyValue) bool { + kvs = append(kvs, kv) + return true + }) + + require.Len(t, kvs, 1) + assert.Truef(t, kv.Equal(kvs[0]), "%s != %s", kv, kvs[0]) +} + +func TestTruncate(t *testing.T) { + testcases := []struct { + input, want string + limit int + }{ + { + input: "value", + want: "value", + limit: -1, + }, + { + input: "value", + want: "", + limit: 0, + }, + { + input: "value", + want: "v", + limit: 1, + }, + { + input: "value", + want: "va", + limit: 2, + }, + { + input: "value", + want: "val", + limit: 3, + }, + { + input: "value", + want: "valu", + limit: 4, + }, + { + input: "value", + want: "value", + limit: 5, + }, + { + input: "value", + want: "value", + limit: 6, + }, + { + input: "€€€€", // 3 bytes each + want: "€€€", + limit: 10, + }, + { + input: "€"[0:2] + "hello€€", // corrupted first rune, then over limit + want: "hello€", + limit: 10, + }, + { + input: "€"[0:2] + "hello", // corrupted first rune, then not over limit + want: "hello", + limit: 10, + }, + } + + for _, tc := range testcases { + name := fmt.Sprintf("%s/%d", tc.input, tc.limit) + t.Run(name, func(t *testing.T) { + t.Log(tc.input, len(tc.input), tc.limit) + assert.Equal(t, tc.want, truncate(tc.input, tc.limit)) + }) + } +}