1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-16 02:47:20 +02:00
opentelemetry-go/sdk/log/batch_test.go
Robert Pająk 8e8ad092cc
sdk/log: Processor.OnEmit accetps a Record pointer (#5636)
## What

Change `Processor.OnEmit` methods to accept a record pointer.

## Why

Fixes https://github.com/open-telemetry/opentelemetry-go/issues/5219

This would be specification compliant according to discussions around
https://github.com/open-telemetry/opentelemetry-specification/pull/4067

This is inline of how processors Go span processors works and how log
processors work in other languages.

If the performance (an additional heap allocation during log processing)
would occur to be a significant problem for some users, we have at few
possible solutions:

1. Utilize PGO which may also lead to decreasing heap allocations
(sources:
https://landontclipp.github.io/blog/2023/08/25/profile-guided-optimizations-in-go/#devirtualization,
https://andrewwphillips.github.io/blog/pgo.html). Currently it does not
but I expect it may change in future.
2. Improve the Go compilers escape analysis (related to previous point)
3. introduce new "chaining processor" which can be also desirable in
other languages

## Benchstat

`old` is from `main`.
`new` is from current branch.

`new-pgo` is from current branch with PGO optimization. I first run
benchmarks to generate a CPU profile using `go test -run=^$ -bench=.
-count=10 -cpuprofile default.pgo` and then I rerun the tests with PGO.
Currently, the number of heap allocations is the same.

```
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/log
cpu: Intel(R) Core(TM) i9-10885H CPU @ 2.40GHz
                                  │    old.txt    │                new.txt                │              new-pgo.txt              │
                                  │    sec/op     │    sec/op     vs base                 │    sec/op     vs base                 │
BatchProcessorOnEmit-16              402.7n ± 18%   382.0n ±  7%         ~ (p=0.247 n=10)   376.7n ± 14%         ~ (p=0.210 n=10)
Processor/Simple-16                  350.9n ±  9%   782.5n ±  6%  +123.00% (p=0.000 n=10)   755.1n ±  5%  +115.19% (p=0.000 n=10)
Processor/Batch-16                   1.333µ ± 15%   1.497µ ± 11%   +12.27% (p=0.000 n=10)   1.528µ ±  8%   +14.63% (p=0.000 n=10)
Processor/SetTimestampSimple-16      329.5n ± 15%   711.6n ±  4%  +115.93% (p=0.000 n=10)   721.9n ±  5%  +119.04% (p=0.000 n=10)
Processor/SetTimestampBatch-16       1.163µ ±  2%   1.524µ ±  3%   +31.03% (p=0.000 n=10)   1.461µ ±  5%   +25.57% (p=0.000 n=10)
Processor/AddAttributesSimple-16     408.7n ±  3%   810.1n ±  4%   +98.21% (p=0.000 n=10)   830.1n ±  4%  +103.11% (p=0.000 n=10)
Processor/AddAttributesBatch-16      1.270µ ±  2%   1.623µ ±  4%   +27.71% (p=0.000 n=10)   1.597µ ±  7%   +25.66% (p=0.000 n=10)
Processor/SetAttributesSimple-16     436.2n ± 10%   796.1n ±  3%   +82.50% (p=0.000 n=10)   817.6n ±  4%   +87.43% (p=0.000 n=10)
Processor/SetAttributesBatch-16      1.202µ ±  2%   1.552µ ±  2%   +29.06% (p=0.000 n=10)   1.659µ ± 11%   +37.96% (p=0.000 n=10)
LoggerNewRecord/5_attributes-16      366.6n ±  3%   363.7n ±  7%         ~ (p=0.952 n=10)   426.2n ±  7%   +16.27% (p=0.000 n=10)
LoggerNewRecord/10_attributes-16     1.711µ ±  2%   1.909µ ± 18%   +11.57% (p=0.000 n=10)   2.077µ ± 10%   +21.39% (p=0.000 n=10)
LoggerProviderLogger-16              650.1n ±  4%   690.1n ±  8%    +6.15% (p=0.019 n=10)   737.6n ± 13%   +13.47% (p=0.004 n=10)
WalkAttributes/1_attributes-16       5.264n ± 12%   5.510n ±  8%         ~ (p=0.812 n=10)   5.865n ±  5%   +11.41% (p=0.011 n=10)
WalkAttributes/10_attributes-16      5.440n ±  8%   5.881n ±  7%    +8.12% (p=0.004 n=10)   6.104n ±  7%   +12.21% (p=0.005 n=10)
WalkAttributes/100_attributes-16     5.403n ±  9%   5.894n ±  9%    +9.10% (p=0.029 n=10)   5.783n ±  6%         ~ (p=0.052 n=10)
WalkAttributes/1000_attributes-16    5.196n ±  4%   5.860n ±  8%   +12.79% (p=0.000 n=10)   5.981n ± 13%   +15.13% (p=0.002 n=10)
SetAddAttributes/SetAttributes-16    181.2n ± 14%   208.1n ± 12%   +14.85% (p=0.005 n=10)   209.9n ± 11%   +15.87% (p=0.007 n=10)
SetAddAttributes/AddAttributes-16    156.7n ± 14%   161.1n ± 16%         ~ (p=0.190 n=10)   165.5n ± 15%         ~ (p=0.315 n=10)
SimpleProcessorOnEmit-16            11.775n ± 10%   9.027n ± 17%   -23.33% (p=0.000 n=10)   9.389n ± 18%   -20.26% (p=0.002 n=10)
geomean                              169.1n         209.6n         +23.98%                  215.5n         +27.48%

                        │     old.txt     │               new.txt                │              new-pgo.txt              │
                        │       B/s       │     B/s       vs base                │      B/s       vs base                │
BatchProcessorOnEmit-16   1004.39Mi ± 15%   79.88Mi ± 7%  -92.05% (p=0.000 n=10)   81.06Mi ± 12%  -91.93% (p=0.000 n=10)

                                  │   old.txt    │                new.txt                 │               new-pgo.txt               │
                                  │     B/op     │    B/op      vs base                   │     B/op      vs base                   │
BatchProcessorOnEmit-16             0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
Processor/Simple-16                   0.0 ± 0%      417.0 ± 0%         ? (p=0.000 n=10)      417.0 ±  0%         ? (p=0.000 n=10)
Processor/Batch-16                  621.5 ± 2%     1057.5 ± 1%   +70.15% (p=0.000 n=10)     1064.5 ±  1%   +71.28% (p=0.000 n=10)
Processor/SetTimestampSimple-16       0.0 ± 0%      417.0 ± 0%         ? (p=0.000 n=10)      418.0 ±  0%         ? (p=0.000 n=10)
Processor/SetTimestampBatch-16      626.5 ± 3%     1049.5 ± 1%   +67.52% (p=0.000 n=10)     1057.5 ±  2%   +68.79% (p=0.000 n=10)
Processor/AddAttributesSimple-16      0.0 ± 0%      417.0 ± 0%         ? (p=0.000 n=10)      417.0 ±  0%         ? (p=0.000 n=10)
Processor/AddAttributesBatch-16     616.5 ± 3%     1053.0 ± 2%   +70.80% (p=0.000 n=10)     1048.5 ±  2%   +70.07% (p=0.000 n=10)
Processor/SetAttributesSimple-16    48.00 ± 0%     466.00 ± 0%  +870.83% (p=0.000 n=10)     466.00 ±  0%  +870.83% (p=0.000 n=10)
Processor/SetAttributesBatch-16     648.0 ± 3%     1089.5 ± 1%   +68.13% (p=0.000 n=10)     1087.5 ±  4%   +67.82% (p=0.000 n=10)
LoggerNewRecord/5_attributes-16     0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
LoggerNewRecord/10_attributes-16    610.0 ± 0%      610.0 ± 0%         ~ (p=1.000 n=10) ¹    610.0 ±  0%         ~ (p=1.000 n=10) ¹
LoggerProviderLogger-16             354.5 ± 6%      368.0 ± 7%         ~ (p=0.288 n=10)      391.0 ± 29%         ~ (p=0.239 n=10)
WalkAttributes/1_attributes-16      0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
WalkAttributes/10_attributes-16     0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
WalkAttributes/100_attributes-16    0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
WalkAttributes/1000_attributes-16   0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/SetAttributes-16   48.00 ± 0%      48.00 ± 0%         ~ (p=1.000 n=10) ¹    48.00 ±  0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/AddAttributes-16   0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
SimpleProcessorOnEmit-16            0.000 ± 0%      0.000 ± 0%         ~ (p=1.000 n=10) ¹    0.000 ±  0%         ~ (p=1.000 n=10) ¹
geomean                                        ²                ?                       ²                 ?                       ²
¹ all samples are equal
² summaries must be >0 to compute geomean

                                  │   old.txt    │                new.txt                │              new-pgo.txt              │
                                  │  allocs/op   │ allocs/op   vs base                   │ allocs/op   vs base                   │
BatchProcessorOnEmit-16             0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
Processor/Simple-16                 0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/Batch-16                  0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/SetTimestampSimple-16     0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/SetTimestampBatch-16      0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/AddAttributesSimple-16    0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/AddAttributesBatch-16     0.000 ± 0%     1.000 ± 0%         ? (p=0.000 n=10)     1.000 ± 0%         ? (p=0.000 n=10)
Processor/SetAttributesSimple-16    1.000 ± 0%     2.000 ± 0%  +100.00% (p=0.000 n=10)     2.000 ± 0%  +100.00% (p=0.000 n=10)
Processor/SetAttributesBatch-16     1.000 ± 0%     2.000 ± 0%  +100.00% (p=0.000 n=10)     2.000 ± 0%  +100.00% (p=0.000 n=10)
LoggerNewRecord/5_attributes-16     0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
LoggerNewRecord/10_attributes-16    4.000 ± 0%     4.000 ± 0%         ~ (p=1.000 n=10) ¹   4.000 ± 0%         ~ (p=1.000 n=10) ¹
LoggerProviderLogger-16             1.000 ± 0%     1.000 ± 0%         ~ (p=1.000 n=10) ¹   1.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/1_attributes-16      0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/10_attributes-16     0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/100_attributes-16    0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
WalkAttributes/1000_attributes-16   0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/SetAttributes-16   1.000 ± 0%     1.000 ± 0%         ~ (p=1.000 n=10) ¹   1.000 ± 0%         ~ (p=1.000 n=10) ¹
SetAddAttributes/AddAttributes-16   0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
SimpleProcessorOnEmit-16            0.000 ± 0%     0.000 ± 0%         ~ (p=1.000 n=10) ¹   0.000 ± 0%         ~ (p=1.000 n=10) ¹
geomean                                        ²               ?                       ²               ?                       ²
¹ all samples are equal
² summaries must be >0 to compute geomean
```
2024-08-01 10:13:43 +02:00

671 lines
17 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"bytes"
"context"
stdlog "log"
"slices"
"strconv"
"strings"
"sync"
"testing"
"time"
"unsafe"
"github.com/go-logr/stdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/log"
)
type concurrentBuffer struct {
b bytes.Buffer
m sync.Mutex
}
func (b *concurrentBuffer) Write(p []byte) (n int, err error) {
b.m.Lock()
defer b.m.Unlock()
return b.b.Write(p)
}
func (b *concurrentBuffer) String() string {
b.m.Lock()
defer b.m.Unlock()
return b.b.String()
}
func TestEmptyBatchConfig(t *testing.T) {
assert.NotPanics(t, func() {
var bp BatchProcessor
ctx := context.Background()
record := new(Record)
assert.NoError(t, bp.OnEmit(ctx, record), "OnEmit")
assert.False(t, bp.Enabled(ctx, *record), "Enabled")
assert.NoError(t, bp.ForceFlush(ctx), "ForceFlush")
assert.NoError(t, bp.Shutdown(ctx), "Shutdown")
})
}
func TestNewBatchConfig(t *testing.T) {
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
t.Log(err)
}))
testcases := []struct {
name string
envars map[string]string
options []BatchProcessorOption
want batchConfig
}{
{
name: "Defaults",
want: batchConfig{
maxQSize: newSetting(dfltMaxQSize),
expInterval: newSetting(dfltExpInterval),
expTimeout: newSetting(dfltExpTimeout),
expMaxBatchSize: newSetting(dfltExpMaxBatchSize),
},
},
{
name: "Options",
options: []BatchProcessorOption{
WithMaxQueueSize(10),
WithExportInterval(time.Microsecond),
WithExportTimeout(time.Hour),
WithExportMaxBatchSize(2),
},
want: batchConfig{
maxQSize: newSetting(10),
expInterval: newSetting(time.Microsecond),
expTimeout: newSetting(time.Hour),
expMaxBatchSize: newSetting(2),
},
},
{
name: "Environment",
envars: map[string]string{
envarMaxQSize: strconv.Itoa(10),
envarExpInterval: strconv.Itoa(100),
envarExpTimeout: strconv.Itoa(1000),
envarExpMaxBatchSize: strconv.Itoa(1),
},
want: batchConfig{
maxQSize: newSetting(10),
expInterval: newSetting(100 * time.Millisecond),
expTimeout: newSetting(1000 * time.Millisecond),
expMaxBatchSize: newSetting(1),
},
},
{
name: "InvalidOptions",
options: []BatchProcessorOption{
WithMaxQueueSize(-11),
WithExportInterval(-1 * time.Microsecond),
WithExportTimeout(-1 * time.Hour),
WithExportMaxBatchSize(-2),
},
want: batchConfig{
maxQSize: newSetting(dfltMaxQSize),
expInterval: newSetting(dfltExpInterval),
expTimeout: newSetting(dfltExpTimeout),
expMaxBatchSize: newSetting(dfltExpMaxBatchSize),
},
},
{
name: "InvalidEnvironment",
envars: map[string]string{
envarMaxQSize: "-1",
envarExpInterval: "-1",
envarExpTimeout: "-1",
envarExpMaxBatchSize: "-1",
},
want: batchConfig{
maxQSize: newSetting(dfltMaxQSize),
expInterval: newSetting(dfltExpInterval),
expTimeout: newSetting(dfltExpTimeout),
expMaxBatchSize: newSetting(dfltExpMaxBatchSize),
},
},
{
name: "Precedence",
envars: map[string]string{
envarMaxQSize: strconv.Itoa(1),
envarExpInterval: strconv.Itoa(100),
envarExpTimeout: strconv.Itoa(1000),
envarExpMaxBatchSize: strconv.Itoa(10),
},
options: []BatchProcessorOption{
// These override the environment variables.
WithMaxQueueSize(3),
WithExportInterval(time.Microsecond),
WithExportTimeout(time.Hour),
WithExportMaxBatchSize(2),
},
want: batchConfig{
maxQSize: newSetting(3),
expInterval: newSetting(time.Microsecond),
expTimeout: newSetting(time.Hour),
expMaxBatchSize: newSetting(2),
},
},
{
name: "BatchLessThanOrEqualToQSize",
options: []BatchProcessorOption{
WithMaxQueueSize(1),
WithExportMaxBatchSize(10),
},
want: batchConfig{
maxQSize: newSetting(1),
expInterval: newSetting(dfltExpInterval),
expTimeout: newSetting(dfltExpTimeout),
expMaxBatchSize: newSetting(1),
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
for key, value := range tc.envars {
t.Setenv(key, value)
}
assert.Equal(t, tc.want, newBatchConfig(tc.options))
})
}
}
func TestBatchProcessor(t *testing.T) {
ctx := context.Background()
t.Run("NilExporter", func(t *testing.T) {
assert.NotPanics(t, func() { NewBatchProcessor(nil) })
})
t.Run("Polling", func(t *testing.T) {
e := newTestExporter(nil)
const size = 15
b := NewBatchProcessor(
e,
WithMaxQueueSize(2*size),
WithExportMaxBatchSize(2*size),
WithExportInterval(time.Nanosecond),
WithExportTimeout(time.Hour),
)
for i := 0; i < size; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
var got []Record
assert.Eventually(t, func() bool {
for _, r := range e.Records() {
got = append(got, r...)
}
return len(got) == size
}, 2*time.Second, time.Microsecond)
_ = b.Shutdown(ctx)
})
t.Run("OnEmit", func(t *testing.T) {
const batch = 10
e := newTestExporter(nil)
b := NewBatchProcessor(
e,
WithMaxQueueSize(10*batch),
WithExportMaxBatchSize(batch),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for i := 0; i < 10*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 1
}, 2*time.Second, time.Microsecond, "multi-batch flush")
assert.NoError(t, b.Shutdown(ctx))
assert.GreaterOrEqual(t, e.ExportN(), 10)
})
t.Run("RetriggerFlushNonBlocking", func(t *testing.T) {
e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
const batch = 10
b := NewBatchProcessor(
e,
WithMaxQueueSize(3*batch),
WithExportMaxBatchSize(batch),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
for i := 0; i < 2*batch; i++ {
assert.NoError(t, b.OnEmit(ctx, new(Record)))
}
var n int
require.Eventually(t, func() bool {
n = e.ExportN()
return n > 0
}, 2*time.Second, time.Microsecond, "blocked export not attempted")
var err error
require.Eventually(t, func() bool {
err = b.OnEmit(ctx, new(Record))
return true
}, time.Second, time.Microsecond, "OnEmit blocked")
assert.NoError(t, err)
e.ExportTrigger <- struct{}{}
assert.Eventually(t, func() bool {
return e.ExportN() > n
}, 2*time.Second, time.Microsecond, "flush not retriggered")
close(e.ExportTrigger)
assert.NoError(t, b.Shutdown(ctx))
assert.Equal(t, 3, e.ExportN())
})
t.Run("Enabled", func(t *testing.T) {
b := NewBatchProcessor(defaultNoopExporter)
assert.True(t, b.Enabled(ctx, Record{}))
_ = b.Shutdown(ctx)
assert.False(t, b.Enabled(ctx, Record{}))
})
t.Run("Shutdown", func(t *testing.T) {
t.Run("Error", func(t *testing.T) {
e := newTestExporter(assert.AnError)
b := NewBatchProcessor(e)
assert.ErrorIs(t, b.Shutdown(ctx), assert.AnError, "exporter error not returned")
assert.NoError(t, b.Shutdown(ctx))
})
t.Run("Multiple", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchProcessor(e)
const shutdowns = 3
for i := 0; i < shutdowns; i++ {
assert.NoError(t, b.Shutdown(ctx))
}
assert.Equal(t, 1, e.ShutdownN(), "exporter Shutdown calls")
})
t.Run("OnEmit", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchProcessor(e)
assert.NoError(t, b.Shutdown(ctx))
want := e.ExportN()
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.Equal(t, want, e.ExportN(), "Export called after shutdown")
})
t.Run("ForceFlush", func(t *testing.T) {
e := newTestExporter(nil)
b := NewBatchProcessor(e)
assert.NoError(t, b.OnEmit(ctx, new(Record)))
assert.NoError(t, b.Shutdown(ctx))
assert.NoError(t, b.ForceFlush(ctx))
assert.Equal(t, 0, e.ForceFlushN(), "ForceFlush called after shutdown")
})
t.Run("CanceledContext", func(t *testing.T) {
e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
t.Cleanup(func() { close(e.ExportTrigger) })
b := NewBatchProcessor(e)
ctx := context.Background()
c, cancel := context.WithCancel(ctx)
cancel()
assert.ErrorIs(t, b.Shutdown(c), context.Canceled)
})
})
t.Run("ForceFlush", func(t *testing.T) {
t.Run("Flush", func(t *testing.T) {
e := newTestExporter(assert.AnError)
b := NewBatchProcessor(
e,
WithMaxQueueSize(100),
WithExportMaxBatchSize(10),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
r := new(Record)
r.SetBody(log.BoolValue(true))
require.NoError(t, b.OnEmit(ctx, r))
assert.ErrorIs(t, b.ForceFlush(ctx), assert.AnError, "exporter error not returned")
assert.Equal(t, 1, e.ForceFlushN(), "exporter ForceFlush calls")
if assert.Equal(t, 1, e.ExportN(), "exporter Export calls") {
got := e.Records()
if assert.Len(t, got[0], 1, "records received") {
assert.Equal(t, *r, got[0][0])
}
}
})
t.Run("ErrorPartialFlush", func(t *testing.T) {
e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
ctxErrCalled := make(chan struct{})
orig := ctxErr
ctxErr = func(ctx context.Context) error {
close(ctxErrCalled)
return orig(ctx)
}
t.Cleanup(func() { ctxErr = orig })
const batch = 1
b := NewBatchProcessor(
e,
WithMaxQueueSize(10*batch),
WithExportMaxBatchSize(batch),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
// Enqueue 10 x "batch size" amount of records.
for i := 0; i < 10*batch; i++ {
require.NoError(t, b.OnEmit(ctx, new(Record)))
}
assert.Eventually(t, func() bool {
return e.ExportN() > 0 && len(b.exporter.input) == cap(b.exporter.input)
}, 2*time.Second, time.Microsecond)
// 1 export being performed, 1 export in buffer chan, >1 batch
// still in queue that an attempt to flush will be made on.
//
// Stop the poll routine to prevent contention with the queue lock.
// This is outside of "normal" operations, but we are testing if
// ForceFlush will return the correct error when an EnqueueExport
// fails and not if ForceFlush will ever get the queue lock in high
// throughput situations.
close(b.pollDone)
<-b.pollDone
// Cancel the flush ctx from the start so errPartialFlush is
// returned right away.
fCtx, cancel := context.WithCancel(ctx)
cancel()
errCh := make(chan error, 1)
go func() {
errCh <- b.ForceFlush(fCtx)
close(errCh)
}()
// Wait for ctxErrCalled to close before closing ExportTrigger so
// we know the errPartialFlush will be returned in ForceFlush.
<-ctxErrCalled
close(e.ExportTrigger)
err := <-errCh
assert.ErrorIs(t, err, errPartialFlush, "partial flush error")
assert.ErrorIs(t, err, context.Canceled, "ctx canceled error")
})
t.Run("CanceledContext", func(t *testing.T) {
e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
b := NewBatchProcessor(e)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
r := new(Record)
r.SetBody(log.BoolValue(true))
_ = b.OnEmit(ctx, r)
t.Cleanup(func() { _ = b.Shutdown(ctx) })
t.Cleanup(func() { close(e.ExportTrigger) })
c, cancel := context.WithCancel(ctx)
cancel()
assert.ErrorIs(t, b.ForceFlush(c), context.Canceled)
})
})
t.Run("DroppedLogs", func(t *testing.T) {
orig := global.GetLogger()
t.Cleanup(func() { global.SetLogger(orig) })
// Use concurrentBuffer for concurrent-safe reading.
buf := new(concurrentBuffer)
stdr.SetVerbosity(1)
global.SetLogger(stdr.New(stdlog.New(buf, "", 0)))
e := newTestExporter(nil)
e.ExportTrigger = make(chan struct{})
b := NewBatchProcessor(
e,
WithMaxQueueSize(1),
WithExportMaxBatchSize(1),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
r := new(Record)
// First record will be blocked by testExporter.Export
assert.NoError(t, b.OnEmit(ctx, r), "exported record")
require.Eventually(t, func() bool {
return e.ExportN() > 0
}, 2*time.Second, time.Microsecond, "blocked export not attempted")
// Second record will be written to export queue
assert.NoError(t, b.OnEmit(ctx, r), "export queue record")
require.Eventually(t, func() bool {
return len(b.exporter.input) == cap(b.exporter.input)
}, 2*time.Second, time.Microsecond, "blocked queue read not attempted")
// Third record will be written to BatchProcessor.q
assert.NoError(t, b.OnEmit(ctx, r), "first queued")
// The previous record will be dropped, as the new one will be written to BatchProcessor.q
assert.NoError(t, b.OnEmit(ctx, r), "second queued")
wantMsg := `"level"=1 "msg"="dropped log records" "dropped"=1`
assert.Eventually(t, func() bool {
return strings.Contains(buf.String(), wantMsg)
}, 2*time.Second, time.Microsecond)
close(e.ExportTrigger)
_ = b.Shutdown(ctx)
})
t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10
e := newTestExporter(nil)
b := NewBatchProcessor(e)
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for i := 0; i < goRoutines-1; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
assert.NoError(t, b.OnEmit(ctx, new(Record)))
// Ignore partial flush errors.
_ = b.ForceFlush(ctx)
}
}
}()
}
require.Eventually(t, func() bool {
return e.ExportN() > 0
}, 2*time.Second, time.Microsecond, "export before shutdown")
wg.Add(1)
go func() {
defer wg.Done()
assert.NoError(t, b.Shutdown(ctx))
cancel()
}()
wg.Wait()
})
}
func TestQueue(t *testing.T) {
var r Record
r.SetBody(log.BoolValue(true))
t.Run("newQueue", func(t *testing.T) {
const size = 1
q := newQueue(size)
assert.Equal(t, q.len, 0)
assert.Equal(t, size, q.cap, "capacity")
assert.Equal(t, size, q.read.Len(), "read ring")
assert.Same(t, q.read, q.write, "different rings")
})
t.Run("Enqueue", func(t *testing.T) {
const size = 2
q := newQueue(size)
var notR Record
notR.SetBody(log.IntValue(10))
assert.Equal(t, 1, q.Enqueue(notR), "incomplete batch")
assert.Equal(t, 1, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")
assert.Equal(t, 2, q.Enqueue(r), "complete batch")
assert.Equal(t, 2, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")
assert.Equal(t, 2, q.Enqueue(r), "overflow batch")
assert.Equal(t, 2, q.len, "length")
assert.Equal(t, size, q.cap, "capacity")
assert.Equal(t, []Record{r, r}, q.Flush(), "flushed Records")
})
t.Run("Dropped", func(t *testing.T) {
q := newQueue(1)
_ = q.Enqueue(r)
_ = q.Enqueue(r)
assert.Equal(t, uint64(1), q.Dropped(), "fist")
_ = q.Enqueue(r)
_ = q.Enqueue(r)
assert.Equal(t, uint64(2), q.Dropped(), "second")
})
t.Run("Flush", func(t *testing.T) {
const size = 2
q := newQueue(size)
q.write.Value = r
q.write = q.write.Next()
q.len = 1
assert.Equal(t, []Record{r}, q.Flush(), "flushed")
})
t.Run("TryFlush", func(t *testing.T) {
const size = 3
q := newQueue(size)
for i := 0; i < size-1; i++ {
q.write.Value = r
q.write = q.write.Next()
q.len++
}
buf := make([]Record, 1)
f := func([]Record) bool { return false }
assert.Equal(t, size-1, q.TryDequeue(buf, f), "not flushed")
require.Equal(t, size-1, q.len, "length")
require.NotSame(t, q.read, q.write, "read ring advanced")
var flushed []Record
f = func(r []Record) bool {
flushed = append(flushed, r...)
return true
}
if assert.Equal(t, size-2, q.TryDequeue(buf, f), "did not flush len(buf)") {
assert.Equal(t, []Record{r}, flushed, "Records")
}
buf = slices.Grow(buf, size)
flushed = flushed[:0]
if assert.Equal(t, 0, q.TryDequeue(buf, f), "did not flush len(queue)") {
assert.Equal(t, []Record{r}, flushed, "Records")
}
})
t.Run("ConcurrentSafe", func(t *testing.T) {
const goRoutines = 10
flushed := make(chan []Record, goRoutines)
out := make([]Record, 0, goRoutines)
done := make(chan struct{})
go func() {
defer close(done)
for recs := range flushed {
out = append(out, recs...)
}
}()
var wg sync.WaitGroup
wg.Add(goRoutines)
b := newQueue(goRoutines)
for i := 0; i < goRoutines; i++ {
go func() {
defer wg.Done()
b.Enqueue(Record{})
flushed <- b.Flush()
}()
}
wg.Wait()
close(flushed)
<-done
assert.Len(t, out, goRoutines, "flushed Records")
})
}
func BenchmarkBatchProcessorOnEmit(b *testing.B) {
r := new(Record)
body := log.BoolValue(true)
r.SetBody(body)
rSize := unsafe.Sizeof(r) + unsafe.Sizeof(body)
ctx := context.Background()
bp := NewBatchProcessor(
defaultNoopExporter,
WithMaxQueueSize(b.N+1),
WithExportMaxBatchSize(b.N+1),
WithExportInterval(time.Hour),
WithExportTimeout(time.Hour),
)
b.Cleanup(func() { _ = bp.Shutdown(ctx) })
b.SetBytes(int64(rSize))
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
var err error
for pb.Next() {
err = bp.OnEmit(ctx, r)
}
_ = err
})
}