From b34cfc47c4e0240b728f28e62f829dc25408746b Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 22 Apr 2024 07:21:49 -0700 Subject: [PATCH] Default implementation for empty BatchProcessor (#5239) Ensure an empty BatchProcessor does not panic when any method is called. Default an empty BatchProcessor as being shut down. --- sdk/log/batch.go | 12 +++++++----- sdk/log/batch_test.go | 12 ++++++++++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index 5cb9f5956..2e705c86a 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -29,7 +29,9 @@ const ( var _ Processor = (*BatchProcessor)(nil) // BatchProcessor is a processor that exports batches of log records. -// A BatchProcessor must be created with [NewBatchProcessor]. +// +// Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor +// is shut down by default, no records will be batched or exported. type BatchProcessor struct { // The BatchProcessor is designed to provide the highest throughput of // log records possible while being compatible with OpenTelemetry. The @@ -170,7 +172,7 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) { // OnEmit batches provided log record. func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { - if b.stopped.Load() { + if b.stopped.Load() || b.q == nil { return nil } if n := b.q.Enqueue(r); n >= b.batchSize { @@ -187,12 +189,12 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error { // Enabled returns if b is enabled. func (b *BatchProcessor) Enabled(context.Context, Record) bool { - return !b.stopped.Load() + return !b.stopped.Load() && b.q != nil } // Shutdown flushes queued log records and shuts down the decorated exporter. func (b *BatchProcessor) Shutdown(ctx context.Context) error { - if b.stopped.Swap(true) { + if b.stopped.Swap(true) || b.q == nil { return nil } @@ -219,7 +221,7 @@ var ctxErr = func(ctx context.Context) error { // ForceFlush flushes queued log records and flushes the decorated exporter. func (b *BatchProcessor) ForceFlush(ctx context.Context) error { - if b.stopped.Load() { + if b.stopped.Load() || b.q == nil { return nil } diff --git a/sdk/log/batch_test.go b/sdk/log/batch_test.go index 539f04d60..90630ac6b 100644 --- a/sdk/log/batch_test.go +++ b/sdk/log/batch_test.go @@ -18,6 +18,18 @@ import ( "go.opentelemetry.io/otel/log" ) +func TestEmptyBatchConfig(t *testing.T) { + assert.NotPanics(t, func() { + var bp BatchProcessor + ctx := context.Background() + var record Record + assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit") + assert.False(t, bp.Enabled(ctx, record), "Enabled") + assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush") + assert.NoError(t, bp.Shutdown(ctx), "Shutdown") + }) +} + func TestNewBatchConfig(t *testing.T) { otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { t.Log(err)