You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-08-10 22:31:50 +02:00
Optimize Logs SDK BatchProcessor (#6569)
Towards: https://github.com/open-telemetry/opentelemetry-go/issues/6382 This PR optimizes Logs SDK BatchProcessor. ## Why? <!-- was i so blind --> Batch processor does not check if buffer exporter is full, instead it clones records to buffer, then tries to push them to buffer exporter, and then buffer exporter will return false from EnqueueExport if it is full. It does not cause any performance issues when using NOOP exporter, because it "exports" instantly. -> Buffer exporter input channel will almost never be full. But actual implementations of exporters are not going to act that way, they need some time for export operation. And so buffer exporter will get full fairly frequently. Each export attempt to buffer exporter costs us copying entire queue inside of batch processor. In order to catch this or similar performance bottleneck in the future, new benchmark (`BatchSimulateExport`) was added which will use exporter that will simulate non-instant export by sleeping for 5ms. ## Benchmarks ```sh goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/log cpu: AMD Ryzen 9 7900X 12-Core Processor │ base.txt │ new.txt │ │ sec/op │ sec/op vs base │ Processor/Simple-24 340.9n ± 2% 343.5n ± 5% ~ (p=0.971 n=10) Processor/Batch-24 667.6n ± 2% 666.3n ± 4% ~ (p=0.896 n=10) Processor/BatchSimulateExport-24 4441.5n ± 5% 526.4n ± 4% -88.15% (p=0.000 n=10) Processor/SetTimestampSimple-24 360.0n ± 4% 351.0n ± 1% ~ (p=0.247 n=10) Processor/SetTimestampBatch-24 669.8n ± 3% 647.0n ± 5% ~ (p=0.052 n=10) Processor/AddAttributesSimple-24 383.9n ± 1% 386.6n ± 5% ~ (p=0.138 n=10) Processor/AddAttributesBatch-24 759.9n ± 6% 734.5n ± 5% ~ (p=0.481 n=10) Processor/SetAttributesSimple-24 369.9n ± 2% 372.4n ± 3% ~ (p=0.643 n=10) Processor/SetAttributesBatch-24 684.2n ± 4% 674.0n ± 8% ~ (p=0.529 n=10) geomean 639.9n 499.9n -21.88% │ base.txt │ new.txt │ │ B/op │ B/op vs base │ Processor/Simple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/Batch-24 1.153Ki ± 2% 1.125Ki ± 1% -2.41% (p=0.001 n=10) Processor/BatchSimulateExport-24 649.0 ± 1% 473.0 ± 0% -27.12% (p=0.000 n=10) Processor/SetTimestampSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampBatch-24 1.145Ki ± 2% 1.122Ki ± 2% -2.01% (p=0.003 n=10) Processor/AddAttributesSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesBatch-24 1.169Ki ± 2% 1.148Ki ± 1% -1.80% (p=0.000 n=10) Processor/SetAttributesSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesBatch-24 1.141Ki ± 2% 1.128Ki ± 2% -1.11% (p=0.030 n=10) geomean 719.3 688.7 -4.24% ¹ all samples are equal │ base.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ Processor/Simple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/Batch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/BatchSimulateExport-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ geomean 1.000 1.000 +0.00% ¹ all samples are equal ``` ## Benchmarks from issue (external) ```sh goos: linux goarch: amd64 pkg: github.com/pellared/spanevents-vs-logs cpu: AMD Ryzen 9 7900X 12-Core Processor │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ sec/op │ sec/op vs base │ sec/op vs base │ /OTLP-24 41.40µ ± 3% 2935.15µ ± 9% +6989.05% (p=0.000 n=10) 84.19µ ± 4% +103.34% (p=0.000 n=10) /STDOUT-24 39.55µ ± 4% 623.45µ ± 1% +1476.21% (p=0.000 n=10) 89.51µ ± 3% +126.29% (p=0.000 n=10) /NOOP-24 13.179µ ± 2% 4.950µ ± 1% -62.44% (p=0.000 n=10) 4.932µ ± 1% -62.58% (p=0.000 n=10) geomean 27.84µ 208.4µ +648.68% 33.37µ +19.86% │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ B/op │ B/op vs base │ B/op vs base │ /OTLP-24 98.90Ki ± 8% 210.44Ki ± 0% +112.79% (p=0.000 n=10) 52.01Ki ± 0% -47.41% (p=0.000 n=10) /STDOUT-24 91.01Ki ± 0% 286.62Ki ± 0% +214.93% (p=0.000 n=10) 81.67Ki ± 1% -10.27% (p=0.000 n=10) /NOOP-24 29840.0 ± 0% 240.0 ± 0% -99.20% (p=0.000 n=10) 240.0 ± 0% -99.20% (p=0.000 n=10) geomean 64.01Ki 24.18Ki -62.23% 9.985Ki -84.40% │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ allocs/op │ allocs/op vs base │ allocs/op vs base │ /OTLP-24 699.0 ± 20% 2174.5 ± 1% +211.09% (p=0.000 n=10) 249.5 ± 1% -64.31% (p=0.000 n=10) /STDOUT-24 646.5 ± 1% 4912.5 ± 0% +659.86% (p=0.000 n=10) 872.5 ± 2% +34.96% (p=0.000 n=10) /NOOP-24 303.000 ± 0% 3.000 ± 0% -99.01% (p=0.000 n=10) 3.000 ± 0% -99.01% (p=0.000 n=10) geomean 515.4 317.6 -38.37% 86.76 -83.17% ``` ```sh goos: linux goarch: amd64 pkg: github.com/pellared/spanevents-vs-logs cpu: AMD Ryzen 9 7900X 12-Core Processor │ logs.base.txt │ logs.new.txt │ │ sec/op │ sec/op vs base │ /OTLP-24 2935.15µ ± 9% 84.19µ ± 4% -97.13% (p=0.000 n=10) /STDOUT-24 623.45µ ± 1% 89.51µ ± 3% -85.64% (p=0.000 n=10) /NOOP-24 4.950µ ± 1% 4.932µ ± 1% ~ (p=0.342 n=10) geomean 208.4µ 33.37µ -83.99% │ logs.base.txt │ logs.new.txt │ │ B/op │ B/op vs base │ /OTLP-24 210.44Ki ± 0% 52.01Ki ± 0% -75.28% (p=0.000 n=10) /STDOUT-24 286.62Ki ± 0% 81.67Ki ± 1% -71.51% (p=0.000 n=10) /NOOP-24 240.0 ± 0% 240.0 ± 0% ~ (p=1.000 n=10) ¹ geomean 24.18Ki 9.985Ki -58.70% ¹ all samples are equal │ logs.base.txt │ logs.new.txt │ │ allocs/op │ allocs/op vs base │ /OTLP-24 2174.5 ± 1% 249.5 ± 1% -88.53% (p=0.000 n=10) /STDOUT-24 4912.5 ± 0% 872.5 ± 2% -82.24% (p=0.000 n=10) /NOOP-24 3.000 ± 0% 3.000 ± 0% ~ (p=1.000 n=10) ¹ geomean 317.6 86.76 -72.69% ¹ all samples are equal ``` --------- Co-authored-by: Robert Pająk <pellared@hotmail.com> Co-authored-by: Damien Mathieu <42@dmathieu.com>
This commit is contained in:
@@ -29,6 +29,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Initialize map with `len(keys)` in `NewAllowKeysFilter` and `NewDenyKeysFilter` to avoid unnecessary allocations in `go.opentelemetry.io/otel/attribute`. (#6455)
|
||||
- `go.opentelemetry.io/otel/log/logtest` is now a separate Go module. (#6465)
|
||||
- `go.opentelemetry.io/otel/sdk/log/logtest` is now a separate Go module. (#6466)
|
||||
- Improve performance of `BatchProcessor` in `go.opentelemetry.io/otel/sdk/log` by not exporting when exporter cannot accept more. (#6569)
|
||||
|
||||
### Deprecated
|
||||
|
||||
|
@@ -156,13 +156,18 @@ func (b *BatchProcessor) poll(interval time.Duration) (done chan struct{}) {
|
||||
global.Warn("dropped log records", "dropped", d)
|
||||
}
|
||||
|
||||
qLen := b.q.TryDequeue(buf, func(r []Record) bool {
|
||||
ok := b.exporter.EnqueueExport(r)
|
||||
if ok {
|
||||
buf = slices.Clone(buf)
|
||||
}
|
||||
return ok
|
||||
})
|
||||
qLen := b.q.Len()
|
||||
// Don't copy data from queue unless exporter can accept more, it is very expensive.
|
||||
if b.exporter.Ready() {
|
||||
qLen = b.q.TryDequeue(buf, func(r []Record) bool {
|
||||
ok := b.exporter.EnqueueExport(r)
|
||||
if ok {
|
||||
buf = slices.Clone(buf)
|
||||
}
|
||||
return ok
|
||||
})
|
||||
}
|
||||
|
||||
if qLen >= b.batchSize {
|
||||
// There is another full batch ready. Immediately trigger
|
||||
// another export attempt.
|
||||
@@ -272,6 +277,13 @@ func newQueue(size int) *queue {
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queue) Len() int {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
return q.len
|
||||
}
|
||||
|
||||
// Dropped returns the number of Records dropped during enqueueing since the
|
||||
// last time Dropped was called.
|
||||
func (q *queue) Dropped() uint64 {
|
||||
|
@@ -13,6 +13,17 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type mockDelayExporter struct{}
|
||||
|
||||
func (mockDelayExporter) Export(context.Context, []Record) error {
|
||||
time.Sleep(time.Millisecond * 5)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mockDelayExporter) Shutdown(context.Context) error { return nil }
|
||||
|
||||
func (mockDelayExporter) ForceFlush(context.Context) error { return nil }
|
||||
|
||||
func BenchmarkProcessor(b *testing.B) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
@@ -30,6 +41,12 @@ func BenchmarkProcessor(b *testing.B) {
|
||||
return []LoggerProviderOption{WithProcessor(NewBatchProcessor(noopExporter{}))}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "BatchSimulateExport",
|
||||
f: func() []LoggerProviderOption {
|
||||
return []LoggerProviderOption{WithProcessor(NewBatchProcessor(mockDelayExporter{}))}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SetTimestampSimple",
|
||||
f: func() []LoggerProviderOption {
|
||||
|
@@ -186,11 +186,10 @@ type bufferExporter struct {
|
||||
|
||||
// newBufferExporter returns a new bufferExporter that wraps exporter. The
|
||||
// returned bufferExporter will buffer at most size number of export requests.
|
||||
// If size is less than zero, zero will be used (i.e. only synchronous
|
||||
// exporting will be supported).
|
||||
// If size is less than 1, 1 will be used.
|
||||
func newBufferExporter(exporter Exporter, size int) *bufferExporter {
|
||||
if size < 0 {
|
||||
size = 0
|
||||
if size < 1 {
|
||||
size = 1
|
||||
}
|
||||
input := make(chan exportData, size)
|
||||
return &bufferExporter{
|
||||
@@ -201,6 +200,10 @@ func newBufferExporter(exporter Exporter, size int) *bufferExporter {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *bufferExporter) Ready() bool {
|
||||
return len(e.input) != cap(e.input)
|
||||
}
|
||||
|
||||
var errStopped = errors.New("exporter stopped")
|
||||
|
||||
func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error {
|
||||
|
Reference in New Issue
Block a user