mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-30 21:20:04 +02:00
Log records dropped by the BatchProcessor (#5276)
* Add dropped count to queue * Log dropped records * Add changelog entry --------- Co-authored-by: Robert Pająk <pellared@hotmail.com>
This commit is contained in:
parent
2cf3d64f3e
commit
ae06a80417
@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Add `RecordFactory` in `go.opentelemetry.io/otel/log/logtest` to facilitate testing the bridge implementations. (#5263)
|
||||
- Add `RecordFactory` in `go.opentelemetry.io/otel/sdk/log/logtest` to facilitate testing the exporter and processor implementations. (#5258)
|
||||
- Add example for `go.opentelemetry.io/otel/exporters/stdout/stdoutlog`. (#5242)
|
||||
- The count of dropped records from the `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` is logged. (#5276)
|
||||
|
||||
### Changed
|
||||
|
||||
|
@ -10,6 +10,8 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -148,6 +150,10 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
|
||||
return
|
||||
}
|
||||
|
||||
if d := b.q.Dropped(); d > 0 {
|
||||
global.Warn("dropped log records", "dropped", d)
|
||||
}
|
||||
|
||||
qLen := b.q.TryDequeue(buf, func(r []Record) bool {
|
||||
ok := b.exporter.EnqueueExport(r)
|
||||
if ok {
|
||||
@ -253,6 +259,7 @@ func (b *BatchProcessor) ForceFlush(ctx context.Context) error {
|
||||
type queue struct {
|
||||
sync.Mutex
|
||||
|
||||
dropped atomic.Uint64
|
||||
cap, len int
|
||||
read, write *ring
|
||||
}
|
||||
@ -266,6 +273,12 @@ func newQueue(size int) *queue {
|
||||
}
|
||||
}
|
||||
|
||||
// Dropped returns the number of Records dropped during enqueueing since the
|
||||
// last time Dropped was called.
|
||||
func (q *queue) Dropped() uint64 {
|
||||
return q.dropped.Swap(0)
|
||||
}
|
||||
|
||||
// Enqueue adds r to the queue. The queue size, including the addition of r, is
|
||||
// returned.
|
||||
//
|
||||
@ -283,6 +296,7 @@ func (q *queue) Enqueue(r Record) int {
|
||||
// Overflow. Advance read to be the new "oldest".
|
||||
q.len = q.cap
|
||||
q.read = q.read.Next()
|
||||
q.dropped.Add(1)
|
||||
}
|
||||
return q.len
|
||||
}
|
||||
|
@ -4,7 +4,9 @@
|
||||
package log // import "go.opentelemetry.io/otel/sdk/log"
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
stdlog "log"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
@ -12,10 +14,12 @@ import (
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/go-logr/stdr"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/log"
|
||||
)
|
||||
|
||||
@ -413,6 +417,41 @@ func TestBatchProcessor(t *testing.T) {
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("DroppedLogs", func(t *testing.T) {
|
||||
orig := global.GetLogger()
|
||||
t.Cleanup(func() { global.SetLogger(orig) })
|
||||
buf := new(bytes.Buffer)
|
||||
stdr.SetVerbosity(1)
|
||||
global.SetLogger(stdr.New(stdlog.New(buf, "", 0)))
|
||||
|
||||
e := newTestExporter(nil)
|
||||
e.ExportTrigger = make(chan struct{})
|
||||
|
||||
b := NewBatchProcessor(
|
||||
e,
|
||||
WithMaxQueueSize(1),
|
||||
WithExportMaxBatchSize(1),
|
||||
WithExportInterval(time.Hour),
|
||||
WithExportTimeout(time.Hour),
|
||||
)
|
||||
var r Record
|
||||
assert.NoError(t, b.OnEmit(ctx, r), "queued")
|
||||
assert.NoError(t, b.OnEmit(ctx, r), "dropped")
|
||||
|
||||
var n int
|
||||
require.Eventually(t, func() bool {
|
||||
n = e.ExportN()
|
||||
return n > 0
|
||||
}, 2*time.Second, time.Microsecond, "blocked export not attempted")
|
||||
|
||||
got := buf.String()
|
||||
want := `"level"=1 "msg"="dropped log records" "dropped"=1`
|
||||
assert.Contains(t, got, want)
|
||||
|
||||
close(e.ExportTrigger)
|
||||
_ = b.Shutdown(ctx)
|
||||
})
|
||||
|
||||
t.Run("ConcurrentSafe", func(t *testing.T) {
|
||||
const goRoutines = 10
|
||||
|
||||
@ -488,6 +527,18 @@ func TestQueue(t *testing.T) {
|
||||
assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records")
|
||||
})
|
||||
|
||||
t.Run("Dropped", func(t *testing.T) {
|
||||
q := newQueue(1)
|
||||
|
||||
_ = q.Enqueue(r)
|
||||
_ = q.Enqueue(r)
|
||||
assert.Equal(t, uint64(1), q.Dropped(), "fist")
|
||||
|
||||
_ = q.Enqueue(r)
|
||||
_ = q.Enqueue(r)
|
||||
assert.Equal(t, uint64(2), q.Dropped(), "second")
|
||||
})
|
||||
|
||||
t.Run("Flush", func(t *testing.T) {
|
||||
const size = 2
|
||||
q := newQueue(size)
|
||||
|
@ -4,6 +4,7 @@ go 1.21
|
||||
|
||||
require (
|
||||
github.com/go-logr/logr v1.4.1
|
||||
github.com/go-logr/stdr v1.2.2
|
||||
github.com/stretchr/testify v1.9.0
|
||||
go.opentelemetry.io/otel v1.26.0
|
||||
go.opentelemetry.io/otel/log v0.2.0-alpha
|
||||
@ -13,7 +14,6 @@ require (
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.26.0 // indirect
|
||||
golang.org/x/sys v0.20.0 // indirect
|
||||
|
Loading…
Reference in New Issue
Block a user