1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-26 21:05:00 +02:00

Log warning when a trace attribute/event/link is discarded due to limits (#5434)

Fix #5343

- Update the `evictionQueue` to log when it drops a value
- Update the `evictionQueue` to be declared over an `[T any]` parameter
so it knows what to log when it is dropping a value and to reduce the
`interface{}` allocation
- Add a `clone` method to replace the now unneeded
`interfaceArrayTo*Array` functions.
- Update the `recordingSpan` to log once that is dropped an attribute
when limits are reached.
This commit is contained in:
Tyler Yahn 2024-05-30 11:40:08 -07:00 committed by GitHub
parent fad23ee62c
commit 5bfa9c55be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 95 additions and 42 deletions

View File

@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Log a warning to the OpenTelemetry internal logger when a `Record` in `go.opentelemetry.io/otel/sdk/log` drops an attribute due to a limit being reached. (#5376)
- Identify the `Tracer` returned from the global `TracerProvider` in `go.opentelemetry.io/otel/global` with its schema URL. (#5426)
- Identify the `Meter` returned from the global `MeterProvider` in `go.opentelemetry.io/otel/global` with its schema URL. (#5426)
- Log a warning to the OpenTelemetry internal logger when a `Span` in `go.opentelemetry.io/otel/sdk/trace` drops an attribute, event, or link due to a limit being reached. (#5434)
## [1.27.0/0.49.0/0.3.0] 2024-05-21

View File

@ -3,23 +3,38 @@
package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"fmt"
"slices"
"sync"
"go.opentelemetry.io/otel/internal/global"
)
// evictedQueue is a FIFO queue with a configurable capacity.
type evictedQueue struct {
queue []interface{}
type evictedQueue[T any] struct {
queue []T
capacity int
droppedCount int
logDropped func()
}
func newEvictedQueue(capacity int) evictedQueue {
func newEvictedQueue[T any](capacity int) evictedQueue[T] {
var tVal T
msg := fmt.Sprintf("limit reached: dropping trace %T", tVal)
// Do not pre-allocate queue, do this lazily.
return evictedQueue{capacity: capacity}
return evictedQueue[T]{
capacity: capacity,
logDropped: sync.OnceFunc(func() { global.Warn(msg) }),
}
}
// add adds value to the evictedQueue eq. If eq is at capacity, the oldest
// queued value will be discarded and the drop count incremented.
func (eq *evictedQueue) add(value interface{}) {
func (eq *evictedQueue[T]) add(value T) {
if eq.capacity == 0 {
eq.droppedCount++
eq.logDropped()
return
}
@ -28,6 +43,12 @@ func (eq *evictedQueue) add(value interface{}) {
copy(eq.queue[:eq.capacity-1], eq.queue[1:])
eq.queue = eq.queue[:eq.capacity-1]
eq.droppedCount++
eq.logDropped()
}
eq.queue = append(eq.queue, value)
}
// copy returns a copy of the evictedQueue.
func (eq *evictedQueue[T]) copy() []T {
return slices.Clone(eq.queue)
}

View File

@ -6,13 +6,15 @@ package trace
import (
"reflect"
"testing"
"github.com/stretchr/testify/assert"
)
func init() {
}
func TestAdd(t *testing.T) {
q := newEvictedQueue(3)
q := newEvictedQueue[string](3)
q.add("value1")
q.add("value2")
if wantLen, gotLen := 2, len(q.queue); wantLen != gotLen {
@ -20,20 +22,31 @@ func TestAdd(t *testing.T) {
}
}
func (eq *evictedQueue) queueToArray() []string {
arr := make([]string, 0)
for _, value := range eq.queue {
arr = append(arr, value.(string))
}
return arr
func TestCopy(t *testing.T) {
q := newEvictedQueue[string](3)
q.add("value1")
cp := q.copy()
q.add("value2")
assert.Equal(t, []string{"value1"}, cp, "queue update modified copy")
cp[0] = "value0"
assert.Equal(t, "value1", q.queue[0], "copy update modified queue")
}
func TestDropCount(t *testing.T) {
q := newEvictedQueue(3)
q := newEvictedQueue[string](3)
var called bool
q.logDropped = func() { called = true }
q.add("value1")
assert.False(t, called, `"value1" logged as dropped`)
q.add("value2")
assert.False(t, called, `"value2" logged as dropped`)
q.add("value3")
assert.False(t, called, `"value3" logged as dropped`)
q.add("value1")
assert.True(t, called, `"value2" not logged as dropped`)
q.add("value4")
if wantLen, gotLen := 3, len(q.queue); wantLen != gotLen {
t.Errorf("got queue length %d want %d", gotLen, wantLen)
@ -42,7 +55,7 @@ func TestDropCount(t *testing.T) {
t.Errorf("got drop count %d want %d", gotDropCount, wantDropCount)
}
wantArr := []string{"value3", "value1", "value4"}
gotArr := q.queueToArray()
gotArr := q.copy()
if wantLen, gotLen := len(wantArr), len(gotArr); gotLen != wantLen {
t.Errorf("got array len %d want %d", gotLen, wantLen)

View File

@ -17,6 +17,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/internal"
"go.opentelemetry.io/otel/sdk/resource"
@ -137,12 +138,13 @@ type recordingSpan struct {
// ReadOnlySpan exported when the span ends.
attributes []attribute.KeyValue
droppedAttributes int
logDropAttrsOnce sync.Once
// events are stored in FIFO queue capped by configured limit.
events evictedQueue
events evictedQueue[Event]
// links are stored in FIFO queue capped by configured limit.
links evictedQueue
links evictedQueue[Link]
// executionTracerTaskEnd ends the execution tracer span.
executionTracerTaskEnd func()
@ -219,7 +221,7 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
limit := s.tracer.provider.spanLimits.AttributeCountLimit
if limit == 0 {
// No attributes allowed.
s.droppedAttributes += len(attributes)
s.addDroppedAttr(len(attributes))
return
}
@ -236,7 +238,7 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
for _, a := range attributes {
if !a.Valid() {
// Drop all invalid attributes.
s.droppedAttributes++
s.addDroppedAttr(1)
continue
}
a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a)
@ -244,6 +246,22 @@ func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
}
}
// Declared as a var so tests can override.
var logDropAttrs = func() {
global.Warn("limit reached: dropping trace Span attributes")
}
// addDroppedAttr adds incr to the count of dropped attributes.
//
// The first, and only the first, time this method is called a warning will be
// logged.
//
// This method assumes s.mu.Lock is held by the caller.
func (s *recordingSpan) addDroppedAttr(incr int) {
s.droppedAttributes += incr
s.logDropAttrsOnce.Do(logDropAttrs)
}
// addOverCapAttrs adds the attributes attrs to the span s while
// de-duplicating the attributes of s and attrs and dropping attributes that
// exceed the limit.
@ -273,7 +291,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) {
for _, a := range attrs {
if !a.Valid() {
// Drop all invalid attributes.
s.droppedAttributes++
s.addDroppedAttr(1)
continue
}
@ -286,7 +304,7 @@ func (s *recordingSpan) addOverCapAttrs(limit int, attrs []attribute.KeyValue) {
if len(s.attributes) >= limit {
// Do not just drop all of the remaining attributes, make sure
// updates are checked and performed.
s.droppedAttributes++
s.addDroppedAttr(1)
} else {
a = truncateAttr(s.tracer.provider.spanLimits.AttributeValueLengthLimit, a)
s.attributes = append(s.attributes, a)
@ -585,7 +603,7 @@ func (s *recordingSpan) Links() []Link {
if len(s.links.queue) == 0 {
return []Link{}
}
return s.interfaceArrayToLinksArray()
return s.links.copy()
}
// Events returns the events of this span.
@ -595,7 +613,7 @@ func (s *recordingSpan) Events() []Event {
if len(s.events.queue) == 0 {
return []Event{}
}
return s.interfaceArrayToEventArray()
return s.events.copy()
}
// Status returns the status of this span.
@ -717,32 +735,16 @@ func (s *recordingSpan) snapshot() ReadOnlySpan {
}
sd.droppedAttributeCount = s.droppedAttributes
if len(s.events.queue) > 0 {
sd.events = s.interfaceArrayToEventArray()
sd.events = s.events.copy()
sd.droppedEventCount = s.events.droppedCount
}
if len(s.links.queue) > 0 {
sd.links = s.interfaceArrayToLinksArray()
sd.links = s.links.copy()
sd.droppedLinkCount = s.links.droppedCount
}
return &sd
}
func (s *recordingSpan) interfaceArrayToLinksArray() []Link {
linkArr := make([]Link, 0)
for _, value := range s.links.queue {
linkArr = append(linkArr, value.(Link))
}
return linkArr
}
func (s *recordingSpan) interfaceArrayToEventArray() []Event {
eventArr := make([]Event, 0)
for _, value := range s.events.queue {
eventArr = append(eventArr, value.(Event))
}
return eventArr
}
func (s *recordingSpan) addChild() {
if !s.IsRecording() {
return

View File

@ -234,6 +234,22 @@ func TestTruncateAttr(t *testing.T) {
}
}
func TestLogDropAttrs(t *testing.T) {
orig := logDropAttrs
t.Cleanup(func() { logDropAttrs = orig })
var called bool
logDropAttrs = func() { called = true }
s := &recordingSpan{}
s.addDroppedAttr(1)
assert.True(t, called, "logDropAttrs not called")
called = false
s.addDroppedAttr(1)
assert.False(t, called, "logDropAttrs called multiple times for same Span")
}
func BenchmarkRecordingSpanSetAttributes(b *testing.B) {
var attrs []attribute.KeyValue
for i := 0; i < 100; i++ {

View File

@ -132,8 +132,8 @@ func (tr *tracer) newRecordingSpan(psc, sc trace.SpanContext, name string, sr Sa
spanKind: trace.ValidateSpanKind(config.SpanKind()),
name: name,
startTime: startTime,
events: newEvictedQueue(tr.provider.spanLimits.EventCountLimit),
links: newEvictedQueue(tr.provider.spanLimits.LinkCountLimit),
events: newEvictedQueue[Event](tr.provider.spanLimits.EventCountLimit),
links: newEvictedQueue[Link](tr.provider.spanLimits.LinkCountLimit),
tracer: tr,
}