mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-16 02:47:20 +02:00
600 lines
14 KiB
Go
600 lines
14 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package log
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
stdlog "log"
|
|
"slices"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/log"
|
|
)
|
|
|
|
type instruction struct {
|
|
Record *[]Record
|
|
Flush chan [][]Record
|
|
}
|
|
|
|
type testExporter struct {
|
|
// Err is the error returned by all methods of the testExporter.
|
|
Err error
|
|
// ExportTrigger is read from prior to returning from the Export method if
|
|
// non-nil.
|
|
ExportTrigger chan struct{}
|
|
|
|
// Counts of method calls.
|
|
exportN, shutdownN, forceFlushN *int32
|
|
|
|
stopped atomic.Bool
|
|
inputMu sync.Mutex
|
|
input chan instruction
|
|
done chan struct{}
|
|
}
|
|
|
|
func newTestExporter(err error) *testExporter {
|
|
e := &testExporter{
|
|
Err: err,
|
|
exportN: new(int32),
|
|
shutdownN: new(int32),
|
|
forceFlushN: new(int32),
|
|
input: make(chan instruction),
|
|
}
|
|
e.done = run(e.input)
|
|
|
|
return e
|
|
}
|
|
|
|
func run(input chan instruction) chan struct{} {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
|
|
var records [][]Record
|
|
for in := range input {
|
|
if in.Record != nil {
|
|
records = append(records, *in.Record)
|
|
}
|
|
if in.Flush != nil {
|
|
cp := slices.Clone(records)
|
|
records = records[:0]
|
|
in.Flush <- cp
|
|
}
|
|
}
|
|
}()
|
|
return done
|
|
}
|
|
|
|
func (e *testExporter) Records() [][]Record {
|
|
out := make(chan [][]Record, 1)
|
|
e.input <- instruction{Flush: out}
|
|
return <-out
|
|
}
|
|
|
|
func (e *testExporter) Export(ctx context.Context, r []Record) error {
|
|
atomic.AddInt32(e.exportN, 1)
|
|
if e.ExportTrigger != nil {
|
|
select {
|
|
case <-e.ExportTrigger:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
e.inputMu.Lock()
|
|
defer e.inputMu.Unlock()
|
|
if !e.stopped.Load() {
|
|
e.input <- instruction{Record: &r}
|
|
}
|
|
return e.Err
|
|
}
|
|
|
|
func (e *testExporter) ExportN() int {
|
|
return int(atomic.LoadInt32(e.exportN))
|
|
}
|
|
|
|
func (e *testExporter) Stop() {
|
|
if e.stopped.Swap(true) {
|
|
return
|
|
}
|
|
e.inputMu.Lock()
|
|
defer e.inputMu.Unlock()
|
|
|
|
close(e.input)
|
|
<-e.done
|
|
}
|
|
|
|
func (e *testExporter) Shutdown(ctx context.Context) error {
|
|
atomic.AddInt32(e.shutdownN, 1)
|
|
return e.Err
|
|
}
|
|
|
|
func (e *testExporter) ShutdownN() int {
|
|
return int(atomic.LoadInt32(e.shutdownN))
|
|
}
|
|
|
|
func (e *testExporter) ForceFlush(ctx context.Context) error {
|
|
atomic.AddInt32(e.forceFlushN, 1)
|
|
return e.Err
|
|
}
|
|
|
|
func (e *testExporter) ForceFlushN() int {
|
|
return int(atomic.LoadInt32(e.forceFlushN))
|
|
}
|
|
|
|
func TestChunker(t *testing.T) {
|
|
t.Run("ZeroSize", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
c := newChunkExporter(exp, 0)
|
|
const size = 100
|
|
_ = c.Export(context.Background(), make([]Record, size))
|
|
|
|
assert.Equal(t, 1, exp.ExportN())
|
|
records := exp.Records()
|
|
assert.Len(t, records, 1)
|
|
assert.Len(t, records[0], size)
|
|
})
|
|
|
|
t.Run("ForceFlush", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
c := newChunkExporter(exp, 0)
|
|
_ = c.ForceFlush(context.Background())
|
|
assert.Equal(t, 1, exp.ForceFlushN(), "ForceFlush not passed through")
|
|
})
|
|
|
|
t.Run("Shutdown", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
c := newChunkExporter(exp, 0)
|
|
_ = c.Shutdown(context.Background())
|
|
assert.Equal(t, 1, exp.ShutdownN(), "Shutdown not passed through")
|
|
})
|
|
|
|
t.Run("Chunk", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
c := newChunkExporter(exp, 10)
|
|
assert.NoError(t, c.Export(context.Background(), make([]Record, 5)))
|
|
assert.NoError(t, c.Export(context.Background(), make([]Record, 25)))
|
|
|
|
wantLens := []int{5, 10, 10, 5}
|
|
records := exp.Records()
|
|
require.Len(t, records, len(wantLens), "chunks")
|
|
for i, n := range wantLens {
|
|
assert.Lenf(t, records[i], n, "chunk %d", i)
|
|
}
|
|
})
|
|
|
|
t.Run("ExportError", func(t *testing.T) {
|
|
exp := newTestExporter(assert.AnError)
|
|
t.Cleanup(exp.Stop)
|
|
c := newChunkExporter(exp, 0)
|
|
ctx := context.Background()
|
|
records := make([]Record, 25)
|
|
err := c.Export(ctx, records)
|
|
assert.ErrorIs(t, err, assert.AnError, "no chunking")
|
|
|
|
c = newChunkExporter(exp, 10)
|
|
err = c.Export(ctx, records)
|
|
assert.ErrorIs(t, err, assert.AnError, "with chunking")
|
|
})
|
|
}
|
|
|
|
func TestExportSync(t *testing.T) {
|
|
eventuallyDone := func(t *testing.T, done chan struct{}) {
|
|
assert.Eventually(t, func() bool {
|
|
select {
|
|
case <-done:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}, 2*time.Second, time.Microsecond)
|
|
}
|
|
|
|
t.Run("ErrorHandler", func(t *testing.T) {
|
|
var got error
|
|
handler := otel.ErrorHandlerFunc(func(err error) { got = err })
|
|
otel.SetErrorHandler(handler)
|
|
t.Cleanup(func() {
|
|
l := stdlog.New(io.Discard, "", stdlog.LstdFlags)
|
|
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
|
|
l.Print(err)
|
|
}))
|
|
})
|
|
|
|
in := make(chan exportData, 1)
|
|
exp := newTestExporter(assert.AnError)
|
|
t.Cleanup(exp.Stop)
|
|
done := exportSync(in, exp)
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
in <- exportData{
|
|
ctx: context.Background(),
|
|
records: make([]Record, 1),
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
close(in)
|
|
eventuallyDone(t, done)
|
|
|
|
assert.ErrorIs(t, got, assert.AnError, "error not passed to ErrorHandler")
|
|
})
|
|
|
|
t.Run("ConcurrentSafe", func(t *testing.T) {
|
|
in := make(chan exportData, 1)
|
|
exp := newTestExporter(assert.AnError)
|
|
t.Cleanup(exp.Stop)
|
|
done := exportSync(in, exp)
|
|
|
|
const goRoutines = 10
|
|
var wg sync.WaitGroup
|
|
wg.Add(goRoutines)
|
|
for i := 0; i < goRoutines; i++ {
|
|
go func(n int) {
|
|
defer wg.Done()
|
|
|
|
var r Record
|
|
r.SetBody(log.IntValue(n))
|
|
|
|
resp := make(chan error, 1)
|
|
in <- exportData{
|
|
ctx: context.Background(),
|
|
records: []Record{r},
|
|
respCh: resp,
|
|
}
|
|
|
|
assert.ErrorIs(t, <-resp, assert.AnError)
|
|
}(i)
|
|
}
|
|
|
|
// Empty records should be ignored.
|
|
in <- exportData{ctx: context.Background()}
|
|
|
|
wg.Wait()
|
|
|
|
close(in)
|
|
eventuallyDone(t, done)
|
|
|
|
assert.Equal(t, goRoutines, exp.ExportN(), "Export calls")
|
|
|
|
want := make([]log.Value, goRoutines)
|
|
for i := range want {
|
|
want[i] = log.IntValue(i)
|
|
}
|
|
records := exp.Records()
|
|
got := make([]log.Value, len(records))
|
|
for i := range got {
|
|
if assert.Len(t, records[i], 1, "number of records exported") {
|
|
got[i] = records[i][0].Body()
|
|
}
|
|
}
|
|
assert.ElementsMatch(t, want, got, "record bodies")
|
|
})
|
|
}
|
|
|
|
func TestTimeoutExporter(t *testing.T) {
|
|
t.Run("ZeroTimeout", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
e := newTimeoutExporter(exp, 0)
|
|
assert.Same(t, exp, e)
|
|
})
|
|
|
|
t.Run("Timeout", func(t *testing.T) {
|
|
trigger := make(chan struct{})
|
|
t.Cleanup(func() { close(trigger) })
|
|
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
exp.ExportTrigger = trigger
|
|
e := newTimeoutExporter(exp, time.Nanosecond)
|
|
|
|
out := make(chan error, 1)
|
|
go func() {
|
|
out <- e.Export(context.Background(), make([]Record, 1))
|
|
}()
|
|
|
|
var err error
|
|
assert.Eventually(t, func() bool {
|
|
select {
|
|
case err = <-out:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}, 2*time.Second, time.Microsecond)
|
|
|
|
assert.ErrorIs(t, err, context.DeadlineExceeded)
|
|
close(out)
|
|
})
|
|
}
|
|
|
|
func TestBufferExporter(t *testing.T) {
|
|
t.Run("ConcurrentSafe", func(t *testing.T) {
|
|
const goRoutines = 10
|
|
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
e := newBufferExporter(exp, goRoutines)
|
|
|
|
ctx := context.Background()
|
|
records := make([]Record, 10)
|
|
|
|
stop := make(chan struct{})
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < goRoutines; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
return
|
|
default:
|
|
_ = e.EnqueueExport(records)
|
|
_ = e.Export(ctx, records)
|
|
_ = e.ForceFlush(ctx)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
assert.Eventually(t, func() bool {
|
|
return exp.ExportN() > 0
|
|
}, 2*time.Second, time.Microsecond)
|
|
|
|
assert.NoError(t, e.Shutdown(ctx))
|
|
close(stop)
|
|
wg.Wait()
|
|
})
|
|
|
|
t.Run("Shutdown", func(t *testing.T) {
|
|
t.Run("Multiple", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
assert.NoError(t, e.Shutdown(context.Background()))
|
|
assert.Equal(t, 1, exp.ShutdownN(), "first Shutdown")
|
|
|
|
assert.NoError(t, e.Shutdown(context.Background()))
|
|
assert.Equal(t, 1, exp.ShutdownN(), "second Shutdown")
|
|
})
|
|
|
|
t.Run("ContextCancelled", func(t *testing.T) {
|
|
// Discard error logs.
|
|
defer func(orig otel.ErrorHandler) {
|
|
otel.SetErrorHandler(orig)
|
|
}(otel.GetErrorHandler())
|
|
handler := otel.ErrorHandlerFunc(func(err error) {})
|
|
otel.SetErrorHandler(handler)
|
|
|
|
exp := newTestExporter(assert.AnError)
|
|
t.Cleanup(exp.Stop)
|
|
|
|
trigger := make(chan struct{})
|
|
exp.ExportTrigger = trigger
|
|
t.Cleanup(func() { close(trigger) })
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
// Make sure there is something to flush.
|
|
require.True(t, e.EnqueueExport(make([]Record, 1)))
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
|
|
err := e.Shutdown(ctx)
|
|
assert.ErrorIs(t, err, context.Canceled)
|
|
assert.ErrorIs(t, err, assert.AnError)
|
|
})
|
|
|
|
t.Run("Error", func(t *testing.T) {
|
|
exp := newTestExporter(assert.AnError)
|
|
t.Cleanup(exp.Stop)
|
|
|
|
e := newBufferExporter(exp, 1)
|
|
assert.ErrorIs(t, e.Shutdown(context.Background()), assert.AnError)
|
|
})
|
|
})
|
|
|
|
t.Run("ForceFlush", func(t *testing.T) {
|
|
t.Run("Multiple", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
e := newBufferExporter(exp, 2)
|
|
|
|
ctx := context.Background()
|
|
records := make([]Record, 1)
|
|
require.NoError(t, e.enqueue(ctx, records, nil), "enqueue")
|
|
|
|
assert.NoError(t, e.ForceFlush(ctx), "ForceFlush records")
|
|
assert.Equal(t, 1, exp.ExportN(), "Export number incremented")
|
|
assert.Len(t, exp.Records(), 1, "exported Record batches")
|
|
|
|
// Nothing to flush.
|
|
assert.NoError(t, e.ForceFlush(ctx), "ForceFlush empty")
|
|
assert.Equal(t, 1, exp.ExportN(), "Export number changed")
|
|
assert.Len(t, exp.Records(), 0, "exported non-zero Records")
|
|
})
|
|
|
|
t.Run("ContextCancelled", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
|
|
trigger := make(chan struct{})
|
|
exp.ExportTrigger = trigger
|
|
t.Cleanup(func() { close(trigger) })
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
require.True(t, e.EnqueueExport(make([]Record, 1)))
|
|
|
|
got := make(chan error, 1)
|
|
go func() { got <- e.ForceFlush(ctx) }()
|
|
require.Eventually(t, func() bool {
|
|
return exp.ExportN() > 0
|
|
}, 2*time.Second, time.Microsecond)
|
|
cancel() // Canceled before export response.
|
|
err := <-got
|
|
assert.ErrorIs(t, err, context.Canceled, "enqueued")
|
|
_ = e.Shutdown(ctx)
|
|
|
|
// Zero length buffer
|
|
e = newBufferExporter(exp, 0)
|
|
assert.ErrorIs(t, e.ForceFlush(ctx), context.Canceled, "not enqueued")
|
|
})
|
|
|
|
t.Run("Error", func(t *testing.T) {
|
|
exp := newTestExporter(assert.AnError)
|
|
t.Cleanup(exp.Stop)
|
|
|
|
e := newBufferExporter(exp, 1)
|
|
assert.ErrorIs(t, e.ForceFlush(context.Background()), assert.AnError)
|
|
})
|
|
|
|
t.Run("Stopped", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
ctx := context.Background()
|
|
_ = e.Shutdown(ctx)
|
|
assert.NoError(t, e.ForceFlush(ctx))
|
|
})
|
|
})
|
|
|
|
t.Run("Export", func(t *testing.T) {
|
|
t.Run("ZeroRecords", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
assert.NoError(t, e.Export(context.Background(), nil))
|
|
assert.Equal(t, 0, exp.ExportN())
|
|
})
|
|
|
|
t.Run("Multiple", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
ctx := context.Background()
|
|
records := make([]Record, 1)
|
|
records[0].SetBody(log.BoolValue(true))
|
|
|
|
assert.NoError(t, e.Export(ctx, records))
|
|
|
|
n := exp.ExportN()
|
|
assert.Equal(t, 1, n, "first Export number")
|
|
assert.Equal(t, [][]Record{records}, exp.Records())
|
|
|
|
assert.NoError(t, e.Export(ctx, records))
|
|
assert.Equal(t, n+1, exp.ExportN(), "second Export number")
|
|
assert.Equal(t, [][]Record{records}, exp.Records())
|
|
})
|
|
|
|
t.Run("ContextCancelled", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
|
|
trigger := make(chan struct{})
|
|
exp.ExportTrigger = trigger
|
|
t.Cleanup(func() { close(trigger) })
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
records := make([]Record, 1)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
got := make(chan error, 1)
|
|
go func() { got <- e.Export(ctx, records) }()
|
|
require.Eventually(t, func() bool {
|
|
return exp.ExportN() > 0
|
|
}, 2*time.Second, time.Microsecond)
|
|
cancel() // Canceled before export response.
|
|
err := <-got
|
|
assert.ErrorIs(t, err, context.Canceled, "enqueued")
|
|
_ = e.Shutdown(ctx)
|
|
|
|
// Zero length buffer
|
|
e = newBufferExporter(exp, 0)
|
|
assert.ErrorIs(t, e.Export(ctx, records), context.Canceled, "not enqueued")
|
|
})
|
|
|
|
t.Run("Error", func(t *testing.T) {
|
|
exp := newTestExporter(assert.AnError)
|
|
t.Cleanup(exp.Stop)
|
|
|
|
e := newBufferExporter(exp, 1)
|
|
ctx, records := context.Background(), make([]Record, 1)
|
|
assert.ErrorIs(t, e.Export(ctx, records), assert.AnError)
|
|
})
|
|
|
|
t.Run("Stopped", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
ctx := context.Background()
|
|
_ = e.Shutdown(ctx)
|
|
assert.NoError(t, e.Export(ctx, make([]Record, 1)))
|
|
assert.Equal(t, 0, exp.ExportN(), "Export called")
|
|
})
|
|
})
|
|
|
|
t.Run("EnqueueExport", func(t *testing.T) {
|
|
t.Run("ZeroRecords", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
assert.True(t, e.EnqueueExport(nil))
|
|
e.ForceFlush(context.Background())
|
|
assert.Equal(t, 0, exp.ExportN(), "empty batch enqueued")
|
|
})
|
|
|
|
t.Run("Multiple", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
e := newBufferExporter(exp, 2)
|
|
|
|
records := make([]Record, 1)
|
|
records[0].SetBody(log.BoolValue(true))
|
|
|
|
assert.True(t, e.EnqueueExport(records))
|
|
assert.True(t, e.EnqueueExport(records))
|
|
e.ForceFlush(context.Background())
|
|
|
|
n := exp.ExportN()
|
|
assert.Equal(t, 2, n, "Export number")
|
|
assert.Equal(t, [][]Record{records, records}, exp.Records())
|
|
})
|
|
|
|
t.Run("Stopped", func(t *testing.T) {
|
|
exp := newTestExporter(nil)
|
|
t.Cleanup(exp.Stop)
|
|
e := newBufferExporter(exp, 1)
|
|
|
|
_ = e.Shutdown(context.Background())
|
|
assert.True(t, e.EnqueueExport(make([]Record, 1)))
|
|
})
|
|
})
|
|
}
|