mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-28 21:09:17 +02:00
Default implementation for empty BatchProcessor (#5239)
Ensure an empty BatchProcessor does not panic when any method is called. Default an empty BatchProcessor as being shut down.
This commit is contained in:
parent
9370c5a01f
commit
b34cfc47c4
@ -29,7 +29,9 @@ const (
|
||||
var _ Processor = (*BatchProcessor)(nil)
|
||||
|
||||
// BatchProcessor is a processor that exports batches of log records.
|
||||
// A BatchProcessor must be created with [NewBatchProcessor].
|
||||
//
|
||||
// Use [NewBatchProcessor] to create a BatchProcessor. An empty BatchProcessor
|
||||
// is shut down by default, no records will be batched or exported.
|
||||
type BatchProcessor struct {
|
||||
// The BatchProcessor is designed to provide the highest throughput of
|
||||
// log records possible while being compatible with OpenTelemetry. The
|
||||
@ -170,7 +172,7 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
|
||||
|
||||
// OnEmit batches provided log record.
|
||||
func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
|
||||
if b.stopped.Load() {
|
||||
if b.stopped.Load() || b.q == nil {
|
||||
return nil
|
||||
}
|
||||
if n := b.q.Enqueue(r); n >= b.batchSize {
|
||||
@ -187,12 +189,12 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r Record) error {
|
||||
|
||||
// Enabled returns if b is enabled.
|
||||
func (b *BatchProcessor) Enabled(context.Context, Record) bool {
|
||||
return !b.stopped.Load()
|
||||
return !b.stopped.Load() && b.q != nil
|
||||
}
|
||||
|
||||
// Shutdown flushes queued log records and shuts down the decorated exporter.
|
||||
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
|
||||
if b.stopped.Swap(true) {
|
||||
if b.stopped.Swap(true) || b.q == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -219,7 +221,7 @@ var ctxErr = func(ctx context.Context) error {
|
||||
|
||||
// ForceFlush flushes queued log records and flushes the decorated exporter.
|
||||
func (b *BatchProcessor) ForceFlush(ctx context.Context) error {
|
||||
if b.stopped.Load() {
|
||||
if b.stopped.Load() || b.q == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,18 @@ import (
|
||||
"go.opentelemetry.io/otel/log"
|
||||
)
|
||||
|
||||
func TestEmptyBatchConfig(t *testing.T) {
|
||||
assert.NotPanics(t, func() {
|
||||
var bp BatchProcessor
|
||||
ctx := context.Background()
|
||||
var record Record
|
||||
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
|
||||
assert.False(t, bp.Enabled(ctx, record), "Enabled")
|
||||
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
|
||||
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewBatchConfig(t *testing.T) {
|
||||
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
|
||||
t.Log(err)
|
||||
|
Loading…
Reference in New Issue
Block a user