mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-03-29 21:47:00 +02:00
Add chunkExporter (#5104)
* Add chunker exporter The batching log processor needs to be able to export payloads in chuncks. This adds a chunker type that will forward all Shutdown and ForceFlush calls to the embedded exporter and chunk data passed to Export. * Concurrent safe testExporter * Add test for zero size * Fix lint * Refactor chunker into chunkExporter * Remove ExportTrigger
This commit is contained in:
parent
e6e44dee90
commit
c4dffbf888
@ -54,6 +54,37 @@ func (noopExporter) Shutdown(context.Context) error { return nil }
|
||||
|
||||
func (noopExporter) ForceFlush(context.Context) error { return nil }
|
||||
|
||||
// chunkExporter wraps an Exporter's Export method so it is called with
|
||||
// appropriately sized export payloads. Any payload larger than a defined size
|
||||
// is chunked into smaller payloads and exported sequentially.
|
||||
type chunkExporter struct {
|
||||
Exporter
|
||||
|
||||
// size is the maximum batch size exported.
|
||||
size int
|
||||
}
|
||||
|
||||
// newChunkExporter wraps exporter. Calls to the Export will have their records
|
||||
// payload chuncked so they do not exceed size. If size is less than or equal
|
||||
// to 0, exporter is returned directly.
|
||||
func newChunkExporter(exporter Exporter, size int) Exporter {
|
||||
if size <= 0 {
|
||||
return exporter
|
||||
}
|
||||
return &chunkExporter{Exporter: exporter, size: size}
|
||||
}
|
||||
|
||||
// Export exports records in chuncks no larger than c.size.
|
||||
func (c chunkExporter) Export(ctx context.Context, records []Record) error {
|
||||
n := len(records)
|
||||
for i, j := 0, min(c.size, n); i < n; i, j = i+c.size, min(j+c.size, n) {
|
||||
if err := c.Exporter.Export(ctx, records[i:j]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// timeoutExporter wraps an Exporter and ensures any call to Export will have a
|
||||
// timeout for the context.
|
||||
type timeoutExporter struct {
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/log"
|
||||
@ -115,6 +116,66 @@ func (e *testExporter) ForceFlushN() int {
|
||||
return int(atomic.LoadInt32(e.forceFlushN))
|
||||
}
|
||||
|
||||
func TestChunker(t *testing.T) {
|
||||
t.Run("ZeroSize", func(t *testing.T) {
|
||||
exp := newTestExporter(nil)
|
||||
t.Cleanup(exp.Stop)
|
||||
c := newChunkExporter(exp, 0)
|
||||
const size = 100
|
||||
_ = c.Export(context.Background(), make([]Record, size))
|
||||
|
||||
assert.Equal(t, 1, exp.ExportN())
|
||||
records := exp.Records()
|
||||
assert.Len(t, records, 1)
|
||||
assert.Len(t, records[0], size)
|
||||
})
|
||||
|
||||
t.Run("ForceFlush", func(t *testing.T) {
|
||||
exp := newTestExporter(nil)
|
||||
t.Cleanup(exp.Stop)
|
||||
c := newChunkExporter(exp, 0)
|
||||
_ = c.ForceFlush(context.Background())
|
||||
assert.Equal(t, 1, exp.ForceFlushN(), "ForceFlush not passed through")
|
||||
})
|
||||
|
||||
t.Run("Shutdown", func(t *testing.T) {
|
||||
exp := newTestExporter(nil)
|
||||
t.Cleanup(exp.Stop)
|
||||
c := newChunkExporter(exp, 0)
|
||||
_ = c.Shutdown(context.Background())
|
||||
assert.Equal(t, 1, exp.ShutdownN(), "Shutdown not passed through")
|
||||
})
|
||||
|
||||
t.Run("Chunk", func(t *testing.T) {
|
||||
exp := newTestExporter(nil)
|
||||
t.Cleanup(exp.Stop)
|
||||
c := newChunkExporter(exp, 10)
|
||||
assert.NoError(t, c.Export(context.Background(), make([]Record, 5)))
|
||||
assert.NoError(t, c.Export(context.Background(), make([]Record, 25)))
|
||||
|
||||
wantLens := []int{5, 10, 10, 5}
|
||||
records := exp.Records()
|
||||
require.Len(t, records, len(wantLens), "chunks")
|
||||
for i, n := range wantLens {
|
||||
assert.Lenf(t, records[i], n, "chunk %d", i)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ExportError", func(t *testing.T) {
|
||||
exp := newTestExporter(assert.AnError)
|
||||
t.Cleanup(exp.Stop)
|
||||
c := newChunkExporter(exp, 0)
|
||||
ctx := context.Background()
|
||||
records := make([]Record, 25)
|
||||
err := c.Export(ctx, records)
|
||||
assert.ErrorIs(t, err, assert.AnError, "no chunking")
|
||||
|
||||
c = newChunkExporter(exp, 10)
|
||||
err = c.Export(ctx, records)
|
||||
assert.ErrorIs(t, err, assert.AnError, "with chunking")
|
||||
})
|
||||
}
|
||||
|
||||
func TestExportSync(t *testing.T) {
|
||||
eventuallyDone := func(t *testing.T, done chan struct{}) {
|
||||
assert.Eventually(t, func() bool {
|
||||
|
Loading…
x
Reference in New Issue
Block a user