diff --git a/CHANGELOG.md b/CHANGELOG.md index 25a1b4811..3a5de1edb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Log a warning to the OpenTelemetry internal logger when a `Record` in `go.opentelemetry.io/otel/sdk/log` drops an attribute due to a limit being reached. (#5376) - Identify the `Tracer` returned from the global `TracerProvider` in `go.opentelemetry.io/otel/global` with its schema URL. (#5426) - Identify the `Meter` returned from the global `MeterProvider` in `go.opentelemetry.io/otel/global` with its schema URL. (#5426) +- Log a warning to the OpenTelemetry internal logger when a `Span` in `go.opentelemetry.io/otel/sdk/trace` drops an attribute, event, or link due to a limit being reached. (#5434) ## [1.27.0/0.49.0/0.3.0] 2024-05-21 diff --git a/sdk/trace/evictedqueue.go b/sdk/trace/evictedqueue.go index 69eb2fdfc..3c62c3299 100644 --- a/sdk/trace/evictedqueue.go +++ b/sdk/trace/evictedqueue.go @@ -3,23 +3,38 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" +import ( + "fmt" + "slices" + "sync" + + "go.opentelemetry.io/otel/internal/global" +) + // evictedQueue is a FIFO queue with a configurable capacity. -type evictedQueue struct { - queue []interface{} +type evictedQueue[T any] struct { + queue []T capacity int droppedCount int + logDropped func() } -func newEvictedQueue(capacity int) evictedQueue { +func newEvictedQueue[T any](capacity int) evictedQueue[T] { + var tVal T + msg := fmt.Sprintf("limit reached: dropping trace %T", tVal) // Do not pre-allocate queue, do this lazily. - return evictedQueue{capacity: capacity} + return evictedQueue[T]{ + capacity: capacity, + logDropped: sync.OnceFunc(func() { global.Warn(msg) }), + } } // add adds value to the evictedQueue eq. If eq is at capacity, the oldest // queued value will be discarded and the drop count incremented. -func (eq *evictedQueue) add(value interface{}) { +func (eq *evictedQueue[T]) add(value T) { if eq.capacity == 0 { eq.droppedCount++ + eq.logDropped() return } @@ -28,6 +43,12 @@ func (eq *evictedQueue) add(value interface{}) { copy(eq.queue[:eq.capacity-1], eq.queue[1:]) eq.queue = eq.queue[:eq.capacity-1] eq.droppedCount++ + eq.logDropped() } eq.queue = append(eq.queue, value) } + +// copy returns a copy of the evictedQueue. +func (eq *evictedQueue[T]) copy() []T { + return slices.Clone(eq.queue) +} diff --git a/sdk/trace/evictedqueue_test.go b/sdk/trace/evictedqueue_test.go index d52dd1291..2a58ab345 100644 --- a/sdk/trace/evictedqueue_test.go +++ b/sdk/trace/evictedqueue_test.go @@ -6,13 +6,15 @@ package trace import ( "reflect" "testing" + + "github.com/stretchr/testify/assert" ) func init() { } func TestAdd(t *testing.T) { - q := newEvictedQueue(3) + q := newEvictedQueue[string](3) q.add("value1") q.add("value2") if wantLen, gotLen := 2, len(q.queue); wantLen != gotLen { @@ -20,20 +22,31 @@ func TestAdd(t *testing.T) { } } -func (eq *evictedQueue) queueToArray() []string { - arr := make([]string, 0) - for _, value := range eq.queue { - arr = append(arr, value.(string)) - } - return arr +func TestCopy(t *testing.T) { + q := newEvictedQueue[string](3) + q.add("value1") + cp := q.copy() + + q.add("value2") + assert.Equal(t, []string{"value1"}, cp, "queue update modified copy") + + cp[0] = "value0" + assert.Equal(t, "value1", q.queue[0], "copy update modified queue") } func TestDropCount(t *testing.T) { - q := newEvictedQueue(3) + q := newEvictedQueue[string](3) + var called bool + q.logDropped = func() { called = true } + q.add("value1") + assert.False(t, called, `"value1" logged as dropped`) q.add("value2") + assert.False(t, called, `"value2" logged as dropped`) q.add("value3") + assert.False(t, called, `"value3" logged as dropped`) q.add("value1") + assert.True(t, called, `"value2" not logged as dropped`) q.add("value4") if wantLen, gotLen := 3, len(q.queue); wantLen != gotLen { t.Errorf("got queue length %d want %d", gotLen, wantLen) @@ -42,7 +55,7 @@ func TestDropCount(t *testing.T) { t.Errorf("got drop count %d want %d", gotDropCount, wantDropCount) } wantArr := []string{"value3", "value1", "value4"} - gotArr := q.queueToArray() + gotArr := q.copy() if wantLen, gotLen := len(wantArr), len(gotArr); gotLen != wantLen { t.Errorf("got array len %d want %d", gotLen, wantLen) diff --git a/sdk/trace/span.go b/sdk/trace/span.go index f0221eaa8..26b1a3943 100644 --- a/sdk/trace/span.go +++ b/sdk/trace/span.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/internal" "go.opentelemetry.io/otel/sdk/resource" @@ -137,12 +138,13 @@ type recordingSpan struct { // ReadOnlySpan exported when the span ends. attributes []attribute.KeyValue droppedAttributes int + logDropAttrsOnce sync.Once // events are stored in FIFO queue capped by configured limit. - events evictedQueue + events evictedQueue[Event] // links are stored in FIFO queue capped by configured limit. - links evictedQueue + links evictedQueue[Link] // executionTracerTaskEnd ends the execution tracer span. executionTracerTaskEnd func() @@ -219,7 +221,7 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) { limit := s.tracer.provider.spanLimits.AttributeCountLimit if limit == 0 { // No attributes allowed. - s.droppedAttributes += len(attributes) + s.addDroppedAttr(len(attributes)) return } @@ -236,7 +238,7 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) { for _, a := range attributes { if !a.Valid() { // Drop all invalid attributes. - s.droppedAttributes++ + s.addDroppedAttr(1) continue } a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a) @@ -244,6 +246,22 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) { } } +// Declared as a var so tests can override. +var logDropAttrs = func() { + global.Warn("limit reached: dropping trace Span attributes") +} + +// addDroppedAttr adds incr to the count of dropped attributes. +// +// The first, and only the first, time this method is called a warning will be +// logged. +// +// This method assumes s.mu.Lock is held by the caller. +func (s *recordingSpan) addDroppedAttr(incr int) { + s.droppedAttributes += incr + s.logDropAttrsOnce.Do(logDropAttrs) +} + // addOverCapAttrs adds the attributes attrs to the span s while // de-duplicating the attributes of s and attrs and dropping attributes that // exceed the limit. @@ -273,7 +291,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) { for _, a := range attrs { if !a.Valid() { // Drop all invalid attributes. - s.droppedAttributes++ + s.addDroppedAttr(1) continue } @@ -286,7 +304,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) { if len(s.attributes) >= limit { // Do not just drop all of the remaining attributes, make sure // updates are checked and performed. - s.droppedAttributes++ + s.addDroppedAttr(1) } else { a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a) s.attributes = append(s.attributes, a) @@ -585,7 +603,7 @@ func (s *recordingSpan) Links() []Link { if len(s.links.queue) == 0 { return []Link{} } - return s.interfaceArrayToLinksArray() + return s.links.copy() } // Events returns the events of this span. @@ -595,7 +613,7 @@ func (s *recordingSpan) Events() []Event { if len(s.events.queue) == 0 { return []Event{} } - return s.interfaceArrayToEventArray() + return s.events.copy() } // Status returns the status of this span. @@ -717,32 +735,16 @@ func (s *recordingSpan) snapshot() ReadOnlySpan { } sd.droppedAttributeCount = s.droppedAttributes if len(s.events.queue) > 0 { - sd.events = s.interfaceArrayToEventArray() + sd.events = s.events.copy() sd.droppedEventCount = s.events.droppedCount } if len(s.links.queue) > 0 { - sd.links = s.interfaceArrayToLinksArray() + sd.links = s.links.copy() sd.droppedLinkCount = s.links.droppedCount } return &sd } -func (s *recordingSpan) interfaceArrayToLinksArray() []Link { - linkArr := make([]Link, 0) - for _, value := range s.links.queue { - linkArr = append(linkArr, value.(Link)) - } - return linkArr -} - -func (s *recordingSpan) interfaceArrayToEventArray() []Event { - eventArr := make([]Event, 0) - for _, value := range s.events.queue { - eventArr = append(eventArr, value.(Event)) - } - return eventArr -} - func (s *recordingSpan) addChild() { if !s.IsRecording() { return diff --git a/sdk/trace/span_test.go b/sdk/trace/span_test.go index b387d0a94..4556da8e9 100644 --- a/sdk/trace/span_test.go +++ b/sdk/trace/span_test.go @@ -234,6 +234,22 @@ func TestTruncateAttr(t *testing.T) { } } +func TestLogDropAttrs(t *testing.T) { + orig := logDropAttrs + t.Cleanup(func() { logDropAttrs = orig }) + + var called bool + logDropAttrs = func() { called = true } + + s := &recordingSpan{} + s.addDroppedAttr(1) + assert.True(t, called, "logDropAttrs not called") + + called = false + s.addDroppedAttr(1) + assert.False(t, called, "logDropAttrs called multiple times for same Span") +} + func BenchmarkRecordingSpanSetAttributes(b *testing.B) { var attrs []attribute.KeyValue for i := 0; i < 100; i++ { diff --git a/sdk/trace/tracer.go b/sdk/trace/tracer.go index 3668b1387..8c4142d6f 100644 --- a/sdk/trace/tracer.go +++ b/sdk/trace/tracer.go @@ -132,8 +132,8 @@ func (tr *tracer) newRecordingSpan(psc, sc trace.SpanContext, name string, sr Sa spanKind: trace.ValidateSpanKind(config.SpanKind()), name: name, startTime: startTime, - events: newEvictedQueue(tr.provider.spanLimits.EventCountLimit), - links: newEvictedQueue(tr.provider.spanLimits.LinkCountLimit), + events: newEvictedQueue[Event](tr.provider.spanLimits.EventCountLimit), + links: newEvictedQueue[Link](tr.provider.spanLimits.LinkCountLimit), tracer: tr, }