From ae06a8041733c2d4287a40c03fe19dec2c7403a7 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 8 May 2024 07:42:09 -0700 Subject: [PATCH] Log records dropped by the BatchProcessor (#5276) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add dropped count to queue * Log dropped records * Add changelog entry --------- Co-authored-by: Robert PajÄ…k --- CHANGELOG.md | 1 + sdk/log/batch.go | 14 ++++++++++++ sdk/log/batch_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++ sdk/log/go.mod | 2 +- 4 files changed, 67 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00235a1e2..0db3cf6ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `RecordFactory` in `go.opentelemetry.io/otel/log/logtest` to facilitate testing the bridge implementations. (#5263) - Add `RecordFactory` in `go.opentelemetry.io/otel/sdk/log/logtest` to facilitate testing the exporter and processor implementations. (#5258) - Add example for `go.opentelemetry.io/otel/exporters/stdout/stdoutlog`. (#5242) +- The count of dropped records from the `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` is logged. (#5276) ### Changed diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 6031d1154..6c87edf81 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -10,6 +10,8 @@ import ( "sync" "sync/atomic" "time" + + "go.opentelemetry.io/otel/internal/global" ) const ( @@ -148,6 +150,10 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { return } + if d := b.q.Dropped(); d > 0 { + global.Warn("dropped log records", "dropped", d) + } + qLen := b.q.TryDequeue(buf, func(r []Record) bool { ok := b.exporter.EnqueueExport(r) if ok { @@ -253,6 +259,7 @@ func (b *BatchProcessor) ForceFlush(ctx context.Context) error { type queue struct { sync.Mutex + dropped atomic.Uint64 cap, len int read, write *ring } @@ -266,6 +273,12 @@ func newQueue(size int) *queue { } } +// Dropped returns the number of Records dropped during enqueueing since the +// last time Dropped was called. +func (q *queue) Dropped() uint64 { + return q.dropped.Swap(0) +} + // Enqueue adds r to the queue. The queue size, including the addition of r, is // returned. // @@ -283,6 +296,7 @@ func (q *queue) Enqueue(r Record) int { // Overflow. Advance read to be the new "oldest". q.len = q.cap q.read = q.read.Next() + q.dropped.Add(1) } return q.len } diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 416a19e00..bb836809f 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -4,7 +4,9 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( + "bytes" "context" + stdlog "log" "slices" "strconv" "sync" @@ -12,10 +14,12 @@ import ( "time" "unsafe" + "github.com/go-logr/stdr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/log" ) @@ -413,6 +417,41 @@ func TestBatchProcessor(t *testing.T) { }) }) + t.Run("DroppedLogs", func(t *testing.T) { + orig := global.GetLogger() + t.Cleanup(func() { global.SetLogger(orig) }) + buf := new(bytes.Buffer) + stdr.SetVerbosity(1) + global.SetLogger(stdr.New(stdlog.New(buf, "", 0))) + + e := newTestExporter(nil) + e.ExportTrigger = make(chan struct{}) + + b := NewBatchProcessor( + e, + WithMaxQueueSize(1), + WithExportMaxBatchSize(1), + WithExportInterval(time.Hour), + WithExportTimeout(time.Hour), + ) + var r Record + assert.NoError(t, b.OnEmit(ctx, r), "queued") + assert.NoError(t, b.OnEmit(ctx, r), "dropped") + + var n int + require.Eventually(t, func() bool { + n = e.ExportN() + return n > 0 + }, 2*time.Second, time.Microsecond, "blocked export not attempted") + + got := buf.String() + want := `"level"=1 "msg"="dropped log records" "dropped"=1` + assert.Contains(t, got, want) + + close(e.ExportTrigger) + _ = b.Shutdown(ctx) + }) + t.Run("ConcurrentSafe", func(t *testing.T) { const goRoutines = 10 @@ -488,6 +527,18 @@ func TestQueue(t *testing.T) { assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records") }) + t.Run("Dropped", func(t *testing.T) { + q := newQueue(1) + + _ = q.Enqueue(r) + _ = q.Enqueue(r) + assert.Equal(t, uint64(1), q.Dropped(), "fist") + + _ = q.Enqueue(r) + _ = q.Enqueue(r) + assert.Equal(t, uint64(2), q.Dropped(), "second") + }) + t.Run("Flush", func(t *testing.T) { const size = 2 q := newQueue(size) diff --git a/sdk/log/go.mod b/sdk/log/go.mod index d62dd0499..19227bdb6 100644 --- a/sdk/log/go.mod +++ b/sdk/log/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/go-logr/logr v1.4.1 + github.com/go-logr/stdr v1.2.2 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.26.0 go.opentelemetry.io/otel/log v0.2.0-alpha @@ -13,7 +14,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-logr/stdr v1.2.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/otel/metric v1.26.0 // indirect golang.org/x/sys v0.20.0 // indirect