1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-04-21 11:57:04 +02:00

Implement the BatchingProcessor (#5093)

* [WIP] Implement the BatchingProcessor

* Add TestExportSync

* Add TestChunker

* Test export error default to ErrorHandler

* Fix lint

* Fix chunk smaller than size error

* Add batch tests

* Fix lint

* Update OnEmit test

Check the len of records in eventually assertion given that is what we
are going to measure.

* Revert unneeded change to BatchingProcessor doc

* Add batch type

* Refactor testing of batching config

The BatchingProcessor is not expected to ultimately contain
configuration fields for queue size or export parameters (see #5093).
This will break TestNewBatchingProcessorConfiguration which tests the
configuration by evaluating the BatchingProcessor directly.

Instead, test the batchingConfig and rename the test to
TestNewBatchingConfig to match what is being tested.

* Implement the BatchingProcessor without polling

* Add TestBatchingProcessor

* Add ConcurrentSafe test

* Expand Shutdown tests

* Test context canceled for ForceFlush

* Refactor batch to queue

* Use exportSync

* Update docs and naming

* Split buffered export to its own type

* Update comments and naming

* Fix lint

* Remove redundant triggered type

* Add interval polling

* Refactor test structure

* Add custom ring implimementation

* Add BenchmarkBatchingProcessor

* Fix merge

* Remove custom ring impl

* Remove BenchmarkBatchingProcessor

* Update dev docs

* Test nil exporter

* Update OnEmit test

Ensure the poll goroutine will completely flush the queue of batches.

* Test RetriggerFlushNonBlocking

* Update ascii diagram

* Fix flaky OnEmit

* Revert unnecessary change to test pkg name

* Use batching term in docs

* Document EnqueueExport

* Return from EnqueueExport if blocked

Do not wait for the enqueue to succeed.

* Do not drop failed flush log records

* Use cancelable ctx in concurrency test

* Fix comments

* Apply feedback

Do not spawn a goroutine for the flush operation.

* Return true from EnqueueExport when stopped

* Update sdk/log/batch.go

Co-authored-by: Robert Pająk <pellared@hotmail.com>

* Remove TODO

* Comment re-trigger in poll

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
This commit is contained in:
Tyler Yahn 2024-04-18 07:48:19 -07:00 committed by GitHub
parent fe3de7059e
commit 4af9c20a80
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 506 additions and 24 deletions

View File

@ -6,7 +6,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"container/ring"
"context"
"errors"
"slices"
"sync"
"sync/atomic"
"time"
)
@ -26,13 +29,70 @@ const (
var _ Processor = (*BatchingProcessor)(nil)
// BatchingProcessor is a processor that exports batches of log records.
// A BatchingProcessor must be created with [NewBatchingProcessor].
type BatchingProcessor struct {
exporter Exporter
// The BatchingProcessor is designed to provide the highest throughput of
// log records possible while being compatible with OpenTelemetry. The
// entry point of log records is the OnEmit method. This method is designed
// to receive records as fast as possible while still honoring shutdown
// commands. All records received are enqueued to queue.
//
// In order to block OnEmit as little as possible, a separate "poll"
// goroutine is spawned at the creation of a BatchingProcessor. This
// goroutine is responsible for batching the queue at regular polled
// intervals, or when it is directly signaled to.
//
// To keep the polling goroutine from backing up, all batches it makes are
// exported with a bufferedExporter. This exporter allows the poll
// goroutine to enqueue an export payload that will be handled in a
// separate goroutine dedicated to the export. This asynchronous behavior
// allows the poll goroutine to maintain accurate interval polling.
//
// __BatchingProcessor__ __Poll Goroutine__ __Export Goroutine__
// || || || || || ||
// || ********** || || || || ********** ||
// || Records=>* OnEmit * || || | - ticker || || * export * ||
// || ********** || || | - trigger || || ********** ||
// || || || || | || || || ||
// || || || || | || || || ||
// || __________\/___ || || |*********** || || ______/\_______ ||
// || (____queue______)>=||=||===|* batch *===||=||=>[_export_buffer_] ||
// || || || |*********** || || ||
// ||_____________________|| ||__________________|| ||____________________||
//
//
// The "release valve" in this processing is the record queue. This queue
// is a ring buffer. It will overwrite the oldest records first when writes
// to OnEmit are made faster than the queue can be flushed. If batches
// cannot be flushed to the export buffer, the records will remain in the
// queue.
maxQueueSize int
exportInterval time.Duration
exportTimeout time.Duration
exportMaxBatchSize int
// exporter is the bufferedExporter all batches are exported with.
exporter *bufferExporter
// q is the active queue of records that have not yet been exported.
q *queue
// batchSize is the minimum number of records needed before an export is
// triggered (unless the interval expires).
batchSize int
// pollTrigger triggers the poll goroutine to flush a batch from the queue.
// This is sent to when it is known that the queue contains at least one
// complete batch.
//
// When a send is made to the channel, the poll loop will be reset after
// the flush. If there is still enough records in the queue for another
// batch the reset of the poll loop will automatically re-trigger itself.
// There is no need for the original sender to monitor and resend.
pollTrigger chan struct{}
// pollKill kills the poll goroutine. This is only expected to be closed
// once by the Shutdown method.
pollKill chan struct{}
// pollDone signals the poll goroutine has completed.
pollDone chan struct{}
// stopped holds the stopped state of the BatchingProcessor.
stopped atomic.Bool
}
// NewBatchingProcessor decorates the provided exporter
@ -40,44 +100,151 @@ type BatchingProcessor struct {
//
// All of the exporter's methods are called synchronously.
func NewBatchingProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchingProcessor {
cfg := newBatchingConfig(opts)
if exporter == nil {
// Do not panic on nil export.
exporter = defaultNoopExporter
}
cfg := newBatchingConfig(opts)
return &BatchingProcessor{
exporter: exporter,
// Order is important here. Wrap the timeoutExporter with the chunkExporter
// to ensure each export completes in timeout (instead of all chuncked
// exports).
exporter = newTimeoutExporter(exporter, cfg.expTimeout.Value)
// Use a chunkExporter to ensure ForceFlush and Shutdown calls are batched
// appropriately on export.
exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value)
maxQueueSize: cfg.maxQSize.Value,
exportInterval: cfg.expInterval.Value,
exportTimeout: cfg.expTimeout.Value,
exportMaxBatchSize: cfg.expMaxBatchSize.Value,
b := &BatchingProcessor{
// TODO: explore making the size of this configurable.
exporter: newBufferExporter(exporter, 1),
q: newQueue(cfg.maxQSize.Value),
batchSize: cfg.expMaxBatchSize.Value,
pollTrigger: make(chan struct{}, 1),
pollKill: make(chan struct{}),
}
b.pollDone = b.poll(cfg.expInterval.Value)
return b
}
// poll spawns a goroutine to handle interval polling and batch exporting. The
// returned done chan is closed when the spawned goroutine completes.
func (b *BatchingProcessor) poll(interval time.Duration) (done chan struct{}) {
done = make(chan struct{})
ticker := time.NewTicker(interval)
// TODO: investigate using a sync.Pool instead of cloning.
buf := make([]Record, b.batchSize)
go func() {
defer close(done)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-b.pollTrigger:
ticker.Reset(interval)
case <-b.pollKill:
return
}
qLen := b.q.TryDequeue(buf, func(r []Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
buf = slices.Clone(buf)
}
return ok
})
if qLen >= b.batchSize {
// There is another full batch ready. Immediately trigger
// another export attempt.
select {
case b.pollTrigger <- struct{}{}:
default:
// Another flush signal already received.
}
}
}
}()
return done
}
// OnEmit batches provided log record.
func (b *BatchingProcessor) OnEmit(ctx context.Context, r Record) error {
// TODO (#5063): Implement.
func (b *BatchingProcessor) OnEmit(_ context.Context, r Record) error {
if b.stopped.Load() {
return nil
}
if n := b.q.Enqueue(r); n >= b.batchSize {
select {
case b.pollTrigger <- struct{}{}:
default:
// Flush chan full. The poll goroutine will handle this by
// re-sending any trigger until the queue has less than batchSize
// records.
}
}
return nil
}
// Enabled returns true.
// Enabled returns if b is enabled.
func (b *BatchingProcessor) Enabled(context.Context, Record) bool {
return true
return !b.stopped.Load()
}
// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchingProcessor) Shutdown(ctx context.Context) error {
// TODO (#5063): Implement.
if b.stopped.Swap(true) {
return nil
}
// Stop the poll goroutine.
close(b.pollKill)
select {
case <-b.pollDone:
case <-ctx.Done():
// Out of time.
return errors.Join(ctx.Err(), b.exporter.Shutdown(ctx))
}
// Flush remaining queued before exporter shutdown.
err := b.exporter.Export(ctx, b.q.Flush())
return errors.Join(err, b.exporter.Shutdown(ctx))
}
var errPartialFlush = errors.New("partial flush: export buffer full")
// Used for testing.
var ctxErr = func(ctx context.Context) error {
return ctx.Err()
}
// ForceFlush flushes queued log records and flushes the decorated exporter.
func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
// TODO (#5063): Implement.
if b.stopped.Load() {
return nil
}
buf := make([]Record, b.q.cap)
notFlushed := func() bool {
var flushed bool
_ = b.q.TryDequeue(buf, func(r []Record) bool {
flushed = b.exporter.EnqueueExport(r)
return flushed
})
return !flushed
}
var err error
// For as long as ctx allows, try to make a single flush of the queue.
for notFlushed() {
// Use ctxErr instead of calling ctx.Err directly so we can test
// the partial error return.
if e := ctxErr(ctx); e != nil {
err = errors.Join(e, errPartialFlush)
break
}
}
return errors.Join(err, b.exporter.ForceFlush(ctx))
}
// queue holds a queue of logging records.
//
// When the queue becomes full, the oldest records in the queue are

View File

@ -4,6 +4,7 @@
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"slices"
"strconv"
"sync"
@ -144,6 +145,301 @@ func TestNewBatchingConfig(t *testing.T) {
}
}
func TestBatchingProcessor(t *testing.T) {
ctx := context.Background()
t.Run("NilExporter", func(t *testing.T) {
assert.NotPanics(t, func() { NewBatchingProcessor(nil) })
})
t.Run("Polling", func(t *testing.T) {
e := newTestExporter(nil)
const size = 15
b := NewBatchingProcessor(
e,
WithMaxQueueSize(2*size),
WithExportMaxBatchSize(2*size),
WithExportInterval(time.Nanosecond),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, size) {
assert.NoError(t, b.OnEmit(ctx, r))
}
var got []Record
assert.Eventually(t, func() bool {
for _, r := range e.Records() {
got = append(got, r...)
}
return len(got) == size
}, 2*time.Second, time.Microsecond)
_ = b.Shutdown(ctx)
})
t.Run("OnEmit", func(t *testing.T) {
const batch = 10
e := newTestExporter(nil)
b := NewBatchingProcessor(
e,
WithMaxQueueSize(10*batch),
WithExportMaxBatchSize(batch),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 10*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 1
}, 2*time.Second, time.Microsecond, "multi-batch flush")
assert.NoError(t, b.Shutdown(ctx))
assert.GreaterOrEqual(t, e.ExportN(), 10)
})
t.Run("RetriggerFlushNonBlocking", func(t *testing.T) {
e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
const batch = 10
b := NewBatchingProcessor(
e,
WithMaxQueueSize(3*batch),
WithExportMaxBatchSize(batch),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for _, r := range make([]Record, 2*batch) {
assert.NoError(t, b.OnEmit(ctx, r))
}
var n int
require.Eventually(t, func() bool {
n = e.ExportN()
return n > 0
}, 2*time.Second, time.Microsecond, "blocked export not attempted")
var err error
require.Eventually(t, func() bool {
err = b.OnEmit(ctx, Record{})
return true
}, time.Second, time.Microsecond, "OnEmit blocked")
assert.NoError(t, err)
e.ExportTrigger <- struct{}{}
assert.Eventually(t, func() bool {
return e.ExportN() > n
}, 2*time.Second, time.Microsecond, "flush not retriggered")
close(e.ExportTrigger)
assert.NoError(t, b.Shutdown(ctx))
assert.Equal(t, 3, e.ExportN())
})
t.Run("Enabled", func(t *testing.T) {
b := NewBatchingProcessor(defaultNoopExporter)
assert.True(t, b.Enabled(ctx, Record{}))
_ = b.Shutdown(ctx)
assert.False(t, b.Enabled(ctx, Record{}))
})
t.Run("Shutdown", func(t *testing.T) {
t.Run("Error", func(t *testing.T) {
e := newTestExporter(assert.AnError)
b := NewBatchingProcessor(e)
assert.ErrorIs(t, b.Shutdown(ctx), assert.AnError, "exporter error not returned")
assert.NoError(t, b.Shutdown(ctx))
})
t.Run("Multiple", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchingProcessor(e)
const shutdowns = 3
for i := 0; i < shutdowns; i++ {
assert.NoError(t, b.Shutdown(ctx))
}
assert.Equal(t, 1, e.ShutdownN(), "exporter Shutdown calls")
})
t.Run("OnEmit", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchingProcessor(e)
assert.NoError(t, b.Shutdown(ctx))
want := e.ExportN()
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.Equal(t, want, e.ExportN(), "Export called after shutdown")
})
t.Run("ForceFlush", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchingProcessor(e)
assert.NoError(t, b.OnEmit(ctx, Record{}))
assert.NoError(t, b.Shutdown(ctx))
assert.NoError(t, b.ForceFlush(ctx))
assert.Equal(t, 0, e.ForceFlushN(), "ForceFlush called after shutdown")
})
t.Run("CanceledContext", func(t *testing.T) {
e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
t.Cleanup(func() { close(e.ExportTrigger) })
b := NewBatchingProcessor(e)
ctx := context.Background()
c, cancel := context.WithCancel(ctx)
cancel()
assert.ErrorIs(t, b.Shutdown(c), context.Canceled)
})
})
t.Run("ForceFlush", func(t *testing.T) {
t.Run("Flush", func(t *testing.T) {
e := newTestExporter(assert.AnError)
b := NewBatchingProcessor(
e,
WithMaxQueueSize(100),
WithExportMaxBatchSize(10),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
var r Record
r.SetBody(log.BoolValue(true))
require.NoError(t, b.OnEmit(ctx, r))
assert.ErrorIs(t, b.ForceFlush(ctx), assert.AnError, "exporter error not returned")
assert.Equal(t, 1, e.ForceFlushN(), "exporter ForceFlush calls")
if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") {
got := e.Records()
if assert.Len(t, got[0], 1, "records received") {
assert.Equal(t, r, got[0][0])
}
}
})
t.Run("ErrorPartialFlush", func(t *testing.T) {
e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
ctxErrCalled := make(chan struct{})
orig := ctxErr
ctxErr = func(ctx context.Context) error {
close(ctxErrCalled)
return orig(ctx)
}
t.Cleanup(func() { ctxErr = orig })
const batch = 1
b := NewBatchingProcessor(
e,
WithMaxQueueSize(10*batch),
WithExportMaxBatchSize(batch),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
// Enqueue 10 x "batch size" amount of records.
for i := 0; i < 10*batch; i++ {
require.NoError(t, b.OnEmit(ctx, Record{}))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 0
}, 2*time.Second, time.Microsecond)
// 1 export being performed, 1 export in buffer chan, >1 batch
// still in queue that an attempt to flush will be made on.
//
// Stop the poll routine to prevent contention with the queue lock.
// This is outside of "normal" operations, but we are testing if
// ForceFlush will return the correct error when an EnqueueExport
// fails and not if ForceFlush will ever get the queue lock in high
// throughput situations.
close(b.pollDone)
<-b.pollDone
// Cancel the flush ctx from the start so errPartialFlush is
// returned right away.
fCtx, cancel := context.WithCancel(ctx)
cancel()
errCh := make(chan error, 1)
go func() {
errCh <- b.ForceFlush(fCtx)
close(errCh)
}()
// Wait for ctxErrCalled to close before closing ExportTrigger so
// we know the errPartialFlush will be returned in ForceFlush.
<-ctxErrCalled
close(e.ExportTrigger)
err := <-errCh
assert.ErrorIs(t, err, errPartialFlush, "partial flush error")
assert.ErrorIs(t, err, context.Canceled, "ctx canceled error")
})
t.Run("CanceledContext", func(t *testing.T) {
e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
b := NewBatchingProcessor(e)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
var r Record
r.SetBody(log.BoolValue(true))
_ = b.OnEmit(ctx, r)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
t.Cleanup(func() { close(e.ExportTrigger) })
c, cancel := context.WithCancel(ctx)
cancel()
assert.ErrorIs(t, b.ForceFlush(c), context.Canceled)
})
})
t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10
e := newTestExporter(nil)
b := NewBatchingProcessor(e)
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for i := 0; i < goRoutines-1; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
assert.NoError(t, b.OnEmit(ctx, Record{}))
// Ignore partial flush errors.
_ = b.ForceFlush(ctx)
}
}
}()
}
require.Eventually(t, func() bool {
return e.ExportN() > 0
}, 2*time.Second, time.Microsecond, "export before shutdown")
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, b.Shutdown(ctx))
cancel()
}()
wg.Wait()
})
}
func TestQueue(t *testing.T) {
var r Record
r.SetBody(log.BoolValue(true))

View File

@ -219,14 +219,33 @@ func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan
}
// EnqueueExport enqueues an export of records in the context of ctx to be
// performed asynchronously. This will return true if the exported is
// successfully enqueued, false otherwise.
// performed asynchronously. This will return true if the records are
// successfully enqueued (or the bufferExporter is shut down), false otherwise.
//
// The passed records are held after this call returns.
func (e *bufferExporter) EnqueueExport(records []Record) bool {
if len(records) == 0 {
// Nothing to enqueue, do not waste input space.
return true
}
return e.enqueue(context.Background(), records, nil) == nil
data := exportData{ctx: context.Background(), records: records}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// Check stopped before enqueueing now that e.inputMu is held. This
// prevents sends on a closed chan when Shutdown is called concurrently.
if e.stopped.Load() {
return true
}
select {
case e.input <- data:
return true
default:
return false
}
}
// Export synchronously exports records in the context of ctx. This will not

View File

@ -583,7 +583,7 @@ func TestBufferExporter(t *testing.T) {
e := newBufferExporter(exp, 1)
_ = e.Shutdown(context.Background())
assert.False(t, e.EnqueueExport(make([]Record, 1)))
assert.True(t, e.EnqueueExport(make([]Record, 1)))
})
})
}