1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-03-31 21:55:32 +02:00

Add the bufferExporter (#5119)

* Add the bufferExporter

* Fix TestExportSync

Reset default ErrorHandler

* Comment

* Clean up tests

* Remove context arg from EnqueueExport

* Join wrapped exporter error
This commit is contained in:
Tyler Yahn 2024-04-02 08:36:18 -07:00 committed by GitHub
parent c4dffbf888
commit 5449f083aa
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 419 additions and 3 deletions

View File

@ -5,6 +5,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
@ -158,3 +162,132 @@ func (e exportData) respond(err error) {
}
}
}
// bufferExporter provides asynchronous and synchronous export functionality by
// buffering export requests.
type bufferExporter struct {
Exporter
input chan exportData
inputMu sync.Mutex
done chan struct{}
stopped atomic.Bool
}
// newBufferExporter returns a new bufferExporter that wraps exporter. The
// returned bufferExporter will buffer at most size number of export requests.
// If size is less than zero, zero will be used (i.e. only synchronous
// exporting will be supported).
func newBufferExporter(exporter Exporter, size int) *bufferExporter {
if size < 0 {
size = 0
}
input := make(chan exportData, size)
return &bufferExporter{
Exporter: exporter,
input: input,
done: exportSync(input, exporter),
}
}
var errStopped = errors.New("exporter stopped")
func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error {
data := exportData{ctx, records, rCh}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// Check stopped before enqueueing now that e.inputMu is held. This
// prevents sends on a closed chan when Shutdown is called concurrently.
if e.stopped.Load() {
return errStopped
}
select {
case e.input <- data:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// EnqueueExport enqueues an export of records in the context of ctx to be
// performed asynchronously. This will return true if the exported is
// successfully enqueued, false otherwise.
func (e *bufferExporter) EnqueueExport(records []Record) bool {
if len(records) == 0 {
// Nothing to enqueue, do not waste input space.
return true
}
return e.enqueue(context.Background(), records, nil) == nil
}
// Export synchronously exports records in the context of ctx. This will not
// return until the export has been completed.
func (e *bufferExporter) Export(ctx context.Context, records []Record) error {
if len(records) == 0 {
return nil
}
resp := make(chan error, 1)
err := e.enqueue(ctx, records, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return fmt.Errorf("%w: dropping %d records", err, len(records))
}
select {
case err := <-resp:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// ForceFlush flushes buffered exports. Any existing exports that is buffered
// is flushed before this returns.
func (e *bufferExporter) ForceFlush(ctx context.Context) error {
resp := make(chan error, 1)
err := e.enqueue(ctx, nil, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return err
}
select {
case <-resp:
case <-ctx.Done():
return ctx.Err()
}
return e.Exporter.ForceFlush(ctx)
}
// Shutdown shuts down e.
//
// Any buffered exports are flushed before this returns.
//
// All calls to EnqueueExport or Exporter will return nil without any export
// after this is called.
func (e *bufferExporter) Shutdown(ctx context.Context) error {
if e.stopped.Swap(true) {
return nil
}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// No more sends will be made.
close(e.input)
select {
case <-e.done:
case <-ctx.Done():
return errors.Join(ctx.Err(), e.Exporter.Shutdown(ctx))
}
return e.Exporter.Shutdown(ctx)
}

View File

@ -5,6 +5,8 @@ package log
import (
"context"
"io"
stdlog "log"
"slices"
"sync"
"sync/atomic"
@ -33,8 +35,10 @@ type testExporter struct {
// Counts of method calls.
exportN, shutdownN, forceFlushN *int32
input chan instruction
done chan struct{}
stopped atomic.Bool
inputMu sync.Mutex
input chan instruction
done chan struct{}
}
func newTestExporter(err error) *testExporter {
@ -85,7 +89,11 @@ func (e *testExporter) Export(ctx context.Context, r []Record) error {
return ctx.Err()
}
}
e.input <- instruction{Record: &r}
e.inputMu.Lock()
defer e.inputMu.Unlock()
if !e.stopped.Load() {
e.input <- instruction{Record: &r}
}
return e.Err
}
@ -94,6 +102,12 @@ func (e *testExporter) ExportN() int {
}
func (e *testExporter) Stop() {
if e.stopped.Swap(true) {
return
}
e.inputMu.Lock()
defer e.inputMu.Unlock()
close(e.input)
<-e.done
}
@ -192,6 +206,12 @@ func TestExportSync(t *testing.T) {
var got error
handler := otel.ErrorHandlerFunc(func(err error) { got = err })
otel.SetErrorHandler(handler)
t.Cleanup(func() {
l := stdlog.New(io.Discard, "", stdlog.LstdFlags)
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
l.Print(err)
}))
})
in := make(chan exportData, 1)
exp := newTestExporter(assert.AnError)
@ -304,3 +324,266 @@ func TestTimeoutExporter(t *testing.T) {
close(out)
})
}
func TestBufferExporter(t *testing.T) {
t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, goRoutines)
ctx := context.Background()
records := make([]Record, 10)
stop := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < goRoutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
_ = e.EnqueueExport(records)
_ = e.Export(ctx, records)
_ = e.ForceFlush(ctx)
}
}
}()
}
assert.Eventually(t, func() bool {
return exp.ExportN() > 0
}, 2*time.Second, time.Microsecond)
assert.NoError(t, e.Shutdown(ctx))
close(stop)
wg.Wait()
})
t.Run("Shutdown", func(t *testing.T) {
t.Run("Multiple", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
assert.NoError(t, e.Shutdown(context.Background()))
assert.Equal(t, 1, exp.ShutdownN(), "first Shutdown")
assert.NoError(t, e.Shutdown(context.Background()))
assert.Equal(t, 1, exp.ShutdownN(), "second Shutdown")
})
t.Run("ContextCancelled", func(t *testing.T) {
exp := newTestExporter(assert.AnError)
t.Cleanup(exp.Stop)
trigger := make(chan struct{})
exp.ExportTrigger = trigger
t.Cleanup(func() { close(trigger) })
e := newBufferExporter(exp, 1)
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := e.Shutdown(ctx)
assert.ErrorIs(t, err, context.Canceled)
assert.ErrorIs(t, err, assert.AnError)
})
t.Run("Error", func(t *testing.T) {
exp := newTestExporter(assert.AnError)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
assert.ErrorIs(t, e.Shutdown(context.Background()), assert.AnError)
})
})
t.Run("ForceFlush", func(t *testing.T) {
t.Run("Multiple", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 2)
ctx := context.Background()
records := make([]Record, 1)
require.NoError(t, e.enqueue(ctx, records, nil), "enqueue")
assert.NoError(t, e.ForceFlush(ctx), "ForceFlush records")
assert.Equal(t, 1, exp.ExportN(), "Export number incremented")
assert.Len(t, exp.Records(), 1, "exported Record batches")
// Nothing to flush.
assert.NoError(t, e.ForceFlush(ctx), "ForceFlush empty")
assert.Equal(t, 1, exp.ExportN(), "Export number changed")
assert.Len(t, exp.Records(), 0, "exported non-zero Records")
})
t.Run("ContextCancelled", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
trigger := make(chan struct{})
exp.ExportTrigger = trigger
t.Cleanup(func() { close(trigger) })
e := newBufferExporter(exp, 1)
ctx, cancel := context.WithCancel(context.Background())
require.True(t, e.EnqueueExport(make([]Record, 1)))
got := make(chan error, 1)
go func() { got <- e.ForceFlush(ctx) }()
require.Eventually(t, func() bool {
return exp.ExportN() > 0
}, 2*time.Second, time.Microsecond)
cancel() // Canceled before export response.
err := <-got
assert.ErrorIs(t, err, context.Canceled, "enqueued")
_ = e.Shutdown(ctx)
// Zero length buffer
e = newBufferExporter(exp, 0)
assert.ErrorIs(t, e.ForceFlush(ctx), context.Canceled, "not enqueued")
})
t.Run("Error", func(t *testing.T) {
exp := newTestExporter(assert.AnError)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
assert.ErrorIs(t, e.ForceFlush(context.Background()), assert.AnError)
})
t.Run("Stopped", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
ctx := context.Background()
_ = e.Shutdown(ctx)
assert.NoError(t, e.ForceFlush(ctx))
})
})
t.Run("Export", func(t *testing.T) {
t.Run("ZeroRecords", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
assert.NoError(t, e.Export(context.Background(), nil))
assert.Equal(t, 0, exp.ExportN())
})
t.Run("Multiple", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
ctx := context.Background()
records := make([]Record, 1)
records[0].SetBody(log.BoolValue(true))
assert.NoError(t, e.Export(ctx, records))
n := exp.ExportN()
assert.Equal(t, 1, n, "first Export number")
assert.Equal(t, [][]Record{records}, exp.Records())
assert.NoError(t, e.Export(ctx, records))
assert.Equal(t, n+1, exp.ExportN(), "second Export number")
assert.Equal(t, [][]Record{records}, exp.Records())
})
t.Run("ContextCancelled", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
trigger := make(chan struct{})
exp.ExportTrigger = trigger
t.Cleanup(func() { close(trigger) })
e := newBufferExporter(exp, 1)
records := make([]Record, 1)
ctx, cancel := context.WithCancel(context.Background())
got := make(chan error, 1)
go func() { got <- e.Export(ctx, records) }()
require.Eventually(t, func() bool {
return exp.ExportN() > 0
}, 2*time.Second, time.Microsecond)
cancel() // Canceled before export response.
err := <-got
assert.ErrorIs(t, err, context.Canceled, "enqueued")
_ = e.Shutdown(ctx)
// Zero length buffer
e = newBufferExporter(exp, 0)
assert.ErrorIs(t, e.Export(ctx, records), context.Canceled, "not enqueued")
})
t.Run("Error", func(t *testing.T) {
exp := newTestExporter(assert.AnError)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
ctx, records := context.Background(), make([]Record, 1)
assert.ErrorIs(t, e.Export(ctx, records), assert.AnError)
})
t.Run("Stopped", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
ctx := context.Background()
_ = e.Shutdown(ctx)
assert.NoError(t, e.Export(ctx, make([]Record, 1)))
assert.Equal(t, 0, exp.ExportN(), "Export called")
})
})
t.Run("EnqueueExport", func(t *testing.T) {
t.Run("ZeroRecords", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
assert.True(t, e.EnqueueExport(nil))
e.ForceFlush(context.Background())
assert.Equal(t, 0, exp.ExportN(), "empty batch enqueued")
})
t.Run("Multiple", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 2)
records := make([]Record, 1)
records[0].SetBody(log.BoolValue(true))
assert.True(t, e.EnqueueExport(records))
assert.True(t, e.EnqueueExport(records))
e.ForceFlush(context.Background())
n := exp.ExportN()
assert.Equal(t, 2, n, "Export number")
assert.Equal(t, [][]Record{records, records}, exp.Records())
})
t.Run("Stopped", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 1)
_ = e.Shutdown(context.Background())
assert.False(t, e.EnqueueExport(make([]Record, 1)))
})
})
}