1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-12-07 23:32:49 +02:00
Files
opentelemetry-go/sdk/log/exporter.go

327 lines
9.1 KiB
Go
Raw Normal View History

2024-03-13 17:47:07 +01:00
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package log // import "go.opentelemetry.io/otel/sdk/log"
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
2024-04-01 09:17:07 -07:00
"time"
"go.opentelemetry.io/otel"
2024-03-13 17:47:07 +01:00
)
// Exporter handles the delivery of log records to external receivers.
type Exporter interface {
// Export transmits log records to a receiver.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// All retry logic must be contained in this function. The SDK does not
// implement any retry logic. All errors returned by this function are
// considered unrecoverable and will be reported to a configured error
// Handler.
//
// Implementations must not retain the records slice.
//
// Before modifying a Record, the implementation must use Record.Clone
// to create a copy that shares no state with the original.
2024-08-09 08:40:39 +02:00
//
// Export should never be called concurrently with other Export calls.
// However, it may be called concurrently with other methods.
2024-03-13 17:47:07 +01:00
Export(ctx context.Context, records []Record) error
2024-03-13 17:47:07 +01:00
// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// After Shutdown is called, calls to Export, Shutdown, or ForceFlush
// should perform no operation and return nil error.
2024-08-09 08:40:39 +02:00
//
// Shutdown may be called concurrently with itself or with other methods.
2024-03-13 17:47:07 +01:00
Shutdown(ctx context.Context) error
2024-03-13 17:47:07 +01:00
// ForceFlush exports log records to the configured Exporter that have not yet
// been exported.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
2024-08-09 08:40:39 +02:00
//
// ForceFlush may be called concurrently with itself or with other methods.
2024-03-13 17:47:07 +01:00
ForceFlush(ctx context.Context) error
}
var defaultNoopExporter = &noopExporter{}
type noopExporter struct{}
func (noopExporter) Export(context.Context, []Record) error { return nil }
func (noopExporter) Shutdown(context.Context) error { return nil }
func (noopExporter) ForceFlush(context.Context) error { return nil }
// chunkExporter wraps an Exporter's Export method so it is called with
// appropriately sized export payloads. Any payload larger than a defined size
// is chunked into smaller payloads and exported sequentially.
type chunkExporter struct {
Exporter
// size is the maximum batch size exported.
size int
}
// newChunkExporter wraps exporter. Calls to the Export will have their records
// payload chunked so they do not exceed size. If size is less than or equal
// to 0, exporter is returned directly.
func newChunkExporter(exporter Exporter, size int) Exporter {
if size <= 0 {
return exporter
}
return &chunkExporter{Exporter: exporter, size: size}
}
// Export exports records in chunks no larger than c.size.
func (c chunkExporter) Export(ctx context.Context, records []Record) error {
n := len(records)
for i, j := 0, min(c.size, n); i < n; i, j = i+c.size, min(j+c.size, n) {
if err := c.Exporter.Export(ctx, records[i:j]); err != nil {
return err
}
}
return nil
}
2024-04-01 09:17:07 -07:00
// timeoutExporter wraps an Exporter and ensures any call to Export will have a
// timeout for the context.
type timeoutExporter struct {
Exporter
// timeout is the maximum time an export is attempted.
timeout time.Duration
}
// newTimeoutExporter wraps exporter with an Exporter that limits the context
// lifetime passed to Export to be timeout. If timeout is less than or equal to
// zero, exporter will be returned directly.
func newTimeoutExporter(exp Exporter, timeout time.Duration) Exporter {
if timeout <= 0 {
return exp
}
return &timeoutExporter{Exporter: exp, timeout: timeout}
}
// Export sets the timeout of ctx before calling the Exporter e wraps.
func (e *timeoutExporter) Export(ctx context.Context, records []Record) error {
Use the cause of the context error in OTLP retry (#6898) Part of #6588 For a demo code like this ```go package main import ( "context" "fmt" "log" "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) func main() { ctx := context.Background() exp, err := newExporter(ctx) if err != nil { log.Fatalf("failed to initialize trace exporter: %v", err) } tp, err := newTracerProvider(exp) if err != nil { log.Fatalf("failed to initialize trace provider: %v", err) } defer func() { _ = tp.Shutdown(ctx) }() otel.SetTracerProvider(tp) generateSpan() select {} } func generateSpan() { log.Println("Generating a dummy span") _, span := otel.Tracer("").Start(context.Background(), "dummy") defer span.End() } func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) { return sdktrace.NewTracerProvider( sdktrace.WithBatcher(exp), ), nil } func newExporter(ctx context.Context) (*otlptrace.Exporter, error) { traceExporter, err := otlptrace.New( ctx, otlptracegrpc.NewClient( otlptracegrpc.WithEndpoint("127.0.0.1:4317"), otlptracegrpc.WithInsecure(), otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{ Enabled: true, InitialInterval: 1 * time.Second, MaxInterval: 30 * time.Second, MaxElapsedTime: time.Minute, }), ), ) if err != nil { return nil, fmt.Errorf("failed to create trace exporter: %w", err) } return traceExporter, nil } ``` the error result from ``` traces export: context deadline exceeded: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused" ``` become ``` traces export: exporter export timeout: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused" ```
2025-06-12 10:02:35 -07:00
// This only used by the batch processor, and it takes processor timeout config.
// Thus, the error message points to the processor. So users know they should adjust the processor timeout.
ctx, cancel := context.WithTimeoutCause(ctx, e.timeout, errors.New("processor export timeout"))
2024-04-01 09:17:07 -07:00
defer cancel()
return e.Exporter.Export(ctx, records)
}
// exportSync exports all data from input using exporter in a spawned
// goroutine. The returned chan will be closed when the spawned goroutine
// completes.
func exportSync(input <-chan exportData, exporter Exporter) (done chan struct{}) {
done = make(chan struct{})
go func() {
defer close(done)
for data := range input {
data.DoExport(exporter.Export)
}
}()
return done
}
// exportData is data related to an export.
type exportData struct {
ctx context.Context
records []Record
// respCh is the channel any error returned from the export will be sent
// on. If this is nil, and the export error is non-nil, the error will
// passed to the OTel error handler.
respCh chan<- error
}
// DoExport calls exportFn with the data contained in e. The error response
// will be returned on e's respCh if not nil. The error will be handled by the
// default OTel error handle if it is not nil and respCh is nil or full.
func (e exportData) DoExport(exportFn func(context.Context, []Record) error) {
if len(e.records) == 0 {
e.respond(nil)
return
}
e.respond(exportFn(e.ctx, e.records))
}
func (e exportData) respond(err error) {
select {
case e.respCh <- err:
default:
// e.respCh is nil or busy, default to otel.Handler.
if err != nil {
otel.Handle(err)
}
}
}
// bufferExporter provides asynchronous and synchronous export functionality by
// buffering export requests.
type bufferExporter struct {
Exporter
input chan exportData
inputMu sync.Mutex
done chan struct{}
stopped atomic.Bool
}
// newBufferExporter returns a new bufferExporter that wraps exporter. The
// returned bufferExporter will buffer at most size number of export requests.
Optimize Logs SDK BatchProcessor (#6569) Towards: https://github.com/open-telemetry/opentelemetry-go/issues/6382 This PR optimizes Logs SDK BatchProcessor. ## Why? <!-- was i so blind --> Batch processor does not check if buffer exporter is full, instead it clones records to buffer, then tries to push them to buffer exporter, and then buffer exporter will return false from EnqueueExport if it is full. It does not cause any performance issues when using NOOP exporter, because it "exports" instantly. -> Buffer exporter input channel will almost never be full. But actual implementations of exporters are not going to act that way, they need some time for export operation. And so buffer exporter will get full fairly frequently. Each export attempt to buffer exporter costs us copying entire queue inside of batch processor. In order to catch this or similar performance bottleneck in the future, new benchmark (`BatchSimulateExport`) was added which will use exporter that will simulate non-instant export by sleeping for 5ms. ## Benchmarks ```sh goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/log cpu: AMD Ryzen 9 7900X 12-Core Processor │ base.txt │ new.txt │ │ sec/op │ sec/op vs base │ Processor/Simple-24 340.9n ± 2% 343.5n ± 5% ~ (p=0.971 n=10) Processor/Batch-24 667.6n ± 2% 666.3n ± 4% ~ (p=0.896 n=10) Processor/BatchSimulateExport-24 4441.5n ± 5% 526.4n ± 4% -88.15% (p=0.000 n=10) Processor/SetTimestampSimple-24 360.0n ± 4% 351.0n ± 1% ~ (p=0.247 n=10) Processor/SetTimestampBatch-24 669.8n ± 3% 647.0n ± 5% ~ (p=0.052 n=10) Processor/AddAttributesSimple-24 383.9n ± 1% 386.6n ± 5% ~ (p=0.138 n=10) Processor/AddAttributesBatch-24 759.9n ± 6% 734.5n ± 5% ~ (p=0.481 n=10) Processor/SetAttributesSimple-24 369.9n ± 2% 372.4n ± 3% ~ (p=0.643 n=10) Processor/SetAttributesBatch-24 684.2n ± 4% 674.0n ± 8% ~ (p=0.529 n=10) geomean 639.9n 499.9n -21.88% │ base.txt │ new.txt │ │ B/op │ B/op vs base │ Processor/Simple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/Batch-24 1.153Ki ± 2% 1.125Ki ± 1% -2.41% (p=0.001 n=10) Processor/BatchSimulateExport-24 649.0 ± 1% 473.0 ± 0% -27.12% (p=0.000 n=10) Processor/SetTimestampSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampBatch-24 1.145Ki ± 2% 1.122Ki ± 2% -2.01% (p=0.003 n=10) Processor/AddAttributesSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesBatch-24 1.169Ki ± 2% 1.148Ki ± 1% -1.80% (p=0.000 n=10) Processor/SetAttributesSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesBatch-24 1.141Ki ± 2% 1.128Ki ± 2% -1.11% (p=0.030 n=10) geomean 719.3 688.7 -4.24% ¹ all samples are equal │ base.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ Processor/Simple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/Batch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/BatchSimulateExport-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ geomean 1.000 1.000 +0.00% ¹ all samples are equal ``` ## Benchmarks from issue (external) ```sh goos: linux goarch: amd64 pkg: github.com/pellared/spanevents-vs-logs cpu: AMD Ryzen 9 7900X 12-Core Processor │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ sec/op │ sec/op vs base │ sec/op vs base │ /OTLP-24 41.40µ ± 3% 2935.15µ ± 9% +6989.05% (p=0.000 n=10) 84.19µ ± 4% +103.34% (p=0.000 n=10) /STDOUT-24 39.55µ ± 4% 623.45µ ± 1% +1476.21% (p=0.000 n=10) 89.51µ ± 3% +126.29% (p=0.000 n=10) /NOOP-24 13.179µ ± 2% 4.950µ ± 1% -62.44% (p=0.000 n=10) 4.932µ ± 1% -62.58% (p=0.000 n=10) geomean 27.84µ 208.4µ +648.68% 33.37µ +19.86% │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ B/op │ B/op vs base │ B/op vs base │ /OTLP-24 98.90Ki ± 8% 210.44Ki ± 0% +112.79% (p=0.000 n=10) 52.01Ki ± 0% -47.41% (p=0.000 n=10) /STDOUT-24 91.01Ki ± 0% 286.62Ki ± 0% +214.93% (p=0.000 n=10) 81.67Ki ± 1% -10.27% (p=0.000 n=10) /NOOP-24 29840.0 ± 0% 240.0 ± 0% -99.20% (p=0.000 n=10) 240.0 ± 0% -99.20% (p=0.000 n=10) geomean 64.01Ki 24.18Ki -62.23% 9.985Ki -84.40% │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ allocs/op │ allocs/op vs base │ allocs/op vs base │ /OTLP-24 699.0 ± 20% 2174.5 ± 1% +211.09% (p=0.000 n=10) 249.5 ± 1% -64.31% (p=0.000 n=10) /STDOUT-24 646.5 ± 1% 4912.5 ± 0% +659.86% (p=0.000 n=10) 872.5 ± 2% +34.96% (p=0.000 n=10) /NOOP-24 303.000 ± 0% 3.000 ± 0% -99.01% (p=0.000 n=10) 3.000 ± 0% -99.01% (p=0.000 n=10) geomean 515.4 317.6 -38.37% 86.76 -83.17% ``` ```sh goos: linux goarch: amd64 pkg: github.com/pellared/spanevents-vs-logs cpu: AMD Ryzen 9 7900X 12-Core Processor │ logs.base.txt │ logs.new.txt │ │ sec/op │ sec/op vs base │ /OTLP-24 2935.15µ ± 9% 84.19µ ± 4% -97.13% (p=0.000 n=10) /STDOUT-24 623.45µ ± 1% 89.51µ ± 3% -85.64% (p=0.000 n=10) /NOOP-24 4.950µ ± 1% 4.932µ ± 1% ~ (p=0.342 n=10) geomean 208.4µ 33.37µ -83.99% │ logs.base.txt │ logs.new.txt │ │ B/op │ B/op vs base │ /OTLP-24 210.44Ki ± 0% 52.01Ki ± 0% -75.28% (p=0.000 n=10) /STDOUT-24 286.62Ki ± 0% 81.67Ki ± 1% -71.51% (p=0.000 n=10) /NOOP-24 240.0 ± 0% 240.0 ± 0% ~ (p=1.000 n=10) ¹ geomean 24.18Ki 9.985Ki -58.70% ¹ all samples are equal │ logs.base.txt │ logs.new.txt │ │ allocs/op │ allocs/op vs base │ /OTLP-24 2174.5 ± 1% 249.5 ± 1% -88.53% (p=0.000 n=10) /STDOUT-24 4912.5 ± 0% 872.5 ± 2% -82.24% (p=0.000 n=10) /NOOP-24 3.000 ± 0% 3.000 ± 0% ~ (p=1.000 n=10) ¹ geomean 317.6 86.76 -72.69% ¹ all samples are equal ``` --------- Co-authored-by: Robert Pająk <pellared@hotmail.com> Co-authored-by: Damien Mathieu <42@dmathieu.com>
2025-04-09 13:36:25 +01:00
// If size is less than 1, 1 will be used.
func newBufferExporter(exporter Exporter, size int) *bufferExporter {
Optimize Logs SDK BatchProcessor (#6569) Towards: https://github.com/open-telemetry/opentelemetry-go/issues/6382 This PR optimizes Logs SDK BatchProcessor. ## Why? <!-- was i so blind --> Batch processor does not check if buffer exporter is full, instead it clones records to buffer, then tries to push them to buffer exporter, and then buffer exporter will return false from EnqueueExport if it is full. It does not cause any performance issues when using NOOP exporter, because it "exports" instantly. -> Buffer exporter input channel will almost never be full. But actual implementations of exporters are not going to act that way, they need some time for export operation. And so buffer exporter will get full fairly frequently. Each export attempt to buffer exporter costs us copying entire queue inside of batch processor. In order to catch this or similar performance bottleneck in the future, new benchmark (`BatchSimulateExport`) was added which will use exporter that will simulate non-instant export by sleeping for 5ms. ## Benchmarks ```sh goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/log cpu: AMD Ryzen 9 7900X 12-Core Processor │ base.txt │ new.txt │ │ sec/op │ sec/op vs base │ Processor/Simple-24 340.9n ± 2% 343.5n ± 5% ~ (p=0.971 n=10) Processor/Batch-24 667.6n ± 2% 666.3n ± 4% ~ (p=0.896 n=10) Processor/BatchSimulateExport-24 4441.5n ± 5% 526.4n ± 4% -88.15% (p=0.000 n=10) Processor/SetTimestampSimple-24 360.0n ± 4% 351.0n ± 1% ~ (p=0.247 n=10) Processor/SetTimestampBatch-24 669.8n ± 3% 647.0n ± 5% ~ (p=0.052 n=10) Processor/AddAttributesSimple-24 383.9n ± 1% 386.6n ± 5% ~ (p=0.138 n=10) Processor/AddAttributesBatch-24 759.9n ± 6% 734.5n ± 5% ~ (p=0.481 n=10) Processor/SetAttributesSimple-24 369.9n ± 2% 372.4n ± 3% ~ (p=0.643 n=10) Processor/SetAttributesBatch-24 684.2n ± 4% 674.0n ± 8% ~ (p=0.529 n=10) geomean 639.9n 499.9n -21.88% │ base.txt │ new.txt │ │ B/op │ B/op vs base │ Processor/Simple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/Batch-24 1.153Ki ± 2% 1.125Ki ± 1% -2.41% (p=0.001 n=10) Processor/BatchSimulateExport-24 649.0 ± 1% 473.0 ± 0% -27.12% (p=0.000 n=10) Processor/SetTimestampSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampBatch-24 1.145Ki ± 2% 1.122Ki ± 2% -2.01% (p=0.003 n=10) Processor/AddAttributesSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesBatch-24 1.169Ki ± 2% 1.148Ki ± 1% -1.80% (p=0.000 n=10) Processor/SetAttributesSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesBatch-24 1.141Ki ± 2% 1.128Ki ± 2% -1.11% (p=0.030 n=10) geomean 719.3 688.7 -4.24% ¹ all samples are equal │ base.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ Processor/Simple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/Batch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/BatchSimulateExport-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ geomean 1.000 1.000 +0.00% ¹ all samples are equal ``` ## Benchmarks from issue (external) ```sh goos: linux goarch: amd64 pkg: github.com/pellared/spanevents-vs-logs cpu: AMD Ryzen 9 7900X 12-Core Processor │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ sec/op │ sec/op vs base │ sec/op vs base │ /OTLP-24 41.40µ ± 3% 2935.15µ ± 9% +6989.05% (p=0.000 n=10) 84.19µ ± 4% +103.34% (p=0.000 n=10) /STDOUT-24 39.55µ ± 4% 623.45µ ± 1% +1476.21% (p=0.000 n=10) 89.51µ ± 3% +126.29% (p=0.000 n=10) /NOOP-24 13.179µ ± 2% 4.950µ ± 1% -62.44% (p=0.000 n=10) 4.932µ ± 1% -62.58% (p=0.000 n=10) geomean 27.84µ 208.4µ +648.68% 33.37µ +19.86% │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ B/op │ B/op vs base │ B/op vs base │ /OTLP-24 98.90Ki ± 8% 210.44Ki ± 0% +112.79% (p=0.000 n=10) 52.01Ki ± 0% -47.41% (p=0.000 n=10) /STDOUT-24 91.01Ki ± 0% 286.62Ki ± 0% +214.93% (p=0.000 n=10) 81.67Ki ± 1% -10.27% (p=0.000 n=10) /NOOP-24 29840.0 ± 0% 240.0 ± 0% -99.20% (p=0.000 n=10) 240.0 ± 0% -99.20% (p=0.000 n=10) geomean 64.01Ki 24.18Ki -62.23% 9.985Ki -84.40% │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ allocs/op │ allocs/op vs base │ allocs/op vs base │ /OTLP-24 699.0 ± 20% 2174.5 ± 1% +211.09% (p=0.000 n=10) 249.5 ± 1% -64.31% (p=0.000 n=10) /STDOUT-24 646.5 ± 1% 4912.5 ± 0% +659.86% (p=0.000 n=10) 872.5 ± 2% +34.96% (p=0.000 n=10) /NOOP-24 303.000 ± 0% 3.000 ± 0% -99.01% (p=0.000 n=10) 3.000 ± 0% -99.01% (p=0.000 n=10) geomean 515.4 317.6 -38.37% 86.76 -83.17% ``` ```sh goos: linux goarch: amd64 pkg: github.com/pellared/spanevents-vs-logs cpu: AMD Ryzen 9 7900X 12-Core Processor │ logs.base.txt │ logs.new.txt │ │ sec/op │ sec/op vs base │ /OTLP-24 2935.15µ ± 9% 84.19µ ± 4% -97.13% (p=0.000 n=10) /STDOUT-24 623.45µ ± 1% 89.51µ ± 3% -85.64% (p=0.000 n=10) /NOOP-24 4.950µ ± 1% 4.932µ ± 1% ~ (p=0.342 n=10) geomean 208.4µ 33.37µ -83.99% │ logs.base.txt │ logs.new.txt │ │ B/op │ B/op vs base │ /OTLP-24 210.44Ki ± 0% 52.01Ki ± 0% -75.28% (p=0.000 n=10) /STDOUT-24 286.62Ki ± 0% 81.67Ki ± 1% -71.51% (p=0.000 n=10) /NOOP-24 240.0 ± 0% 240.0 ± 0% ~ (p=1.000 n=10) ¹ geomean 24.18Ki 9.985Ki -58.70% ¹ all samples are equal │ logs.base.txt │ logs.new.txt │ │ allocs/op │ allocs/op vs base │ /OTLP-24 2174.5 ± 1% 249.5 ± 1% -88.53% (p=0.000 n=10) /STDOUT-24 4912.5 ± 0% 872.5 ± 2% -82.24% (p=0.000 n=10) /NOOP-24 3.000 ± 0% 3.000 ± 0% ~ (p=1.000 n=10) ¹ geomean 317.6 86.76 -72.69% ¹ all samples are equal ``` --------- Co-authored-by: Robert Pająk <pellared@hotmail.com> Co-authored-by: Damien Mathieu <42@dmathieu.com>
2025-04-09 13:36:25 +01:00
if size < 1 {
size = 1
}
input := make(chan exportData, size)
return &bufferExporter{
Exporter: exporter,
input: input,
done: exportSync(input, exporter),
}
}
Optimize Logs SDK BatchProcessor (#6569) Towards: https://github.com/open-telemetry/opentelemetry-go/issues/6382 This PR optimizes Logs SDK BatchProcessor. ## Why? <!-- was i so blind --> Batch processor does not check if buffer exporter is full, instead it clones records to buffer, then tries to push them to buffer exporter, and then buffer exporter will return false from EnqueueExport if it is full. It does not cause any performance issues when using NOOP exporter, because it "exports" instantly. -> Buffer exporter input channel will almost never be full. But actual implementations of exporters are not going to act that way, they need some time for export operation. And so buffer exporter will get full fairly frequently. Each export attempt to buffer exporter costs us copying entire queue inside of batch processor. In order to catch this or similar performance bottleneck in the future, new benchmark (`BatchSimulateExport`) was added which will use exporter that will simulate non-instant export by sleeping for 5ms. ## Benchmarks ```sh goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/log cpu: AMD Ryzen 9 7900X 12-Core Processor │ base.txt │ new.txt │ │ sec/op │ sec/op vs base │ Processor/Simple-24 340.9n ± 2% 343.5n ± 5% ~ (p=0.971 n=10) Processor/Batch-24 667.6n ± 2% 666.3n ± 4% ~ (p=0.896 n=10) Processor/BatchSimulateExport-24 4441.5n ± 5% 526.4n ± 4% -88.15% (p=0.000 n=10) Processor/SetTimestampSimple-24 360.0n ± 4% 351.0n ± 1% ~ (p=0.247 n=10) Processor/SetTimestampBatch-24 669.8n ± 3% 647.0n ± 5% ~ (p=0.052 n=10) Processor/AddAttributesSimple-24 383.9n ± 1% 386.6n ± 5% ~ (p=0.138 n=10) Processor/AddAttributesBatch-24 759.9n ± 6% 734.5n ± 5% ~ (p=0.481 n=10) Processor/SetAttributesSimple-24 369.9n ± 2% 372.4n ± 3% ~ (p=0.643 n=10) Processor/SetAttributesBatch-24 684.2n ± 4% 674.0n ± 8% ~ (p=0.529 n=10) geomean 639.9n 499.9n -21.88% │ base.txt │ new.txt │ │ B/op │ B/op vs base │ Processor/Simple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/Batch-24 1.153Ki ± 2% 1.125Ki ± 1% -2.41% (p=0.001 n=10) Processor/BatchSimulateExport-24 649.0 ± 1% 473.0 ± 0% -27.12% (p=0.000 n=10) Processor/SetTimestampSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampBatch-24 1.145Ki ± 2% 1.122Ki ± 2% -2.01% (p=0.003 n=10) Processor/AddAttributesSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesBatch-24 1.169Ki ± 2% 1.148Ki ± 1% -1.80% (p=0.000 n=10) Processor/SetAttributesSimple-24 450.0 ± 0% 450.0 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesBatch-24 1.141Ki ± 2% 1.128Ki ± 2% -1.11% (p=0.030 n=10) geomean 719.3 688.7 -4.24% ¹ all samples are equal │ base.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ Processor/Simple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/Batch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/BatchSimulateExport-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetTimestampBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/AddAttributesBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesSimple-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ Processor/SetAttributesBatch-24 1.000 ± 0% 1.000 ± 0% ~ (p=1.000 n=10) ¹ geomean 1.000 1.000 +0.00% ¹ all samples are equal ``` ## Benchmarks from issue (external) ```sh goos: linux goarch: amd64 pkg: github.com/pellared/spanevents-vs-logs cpu: AMD Ryzen 9 7900X 12-Core Processor │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ sec/op │ sec/op vs base │ sec/op vs base │ /OTLP-24 41.40µ ± 3% 2935.15µ ± 9% +6989.05% (p=0.000 n=10) 84.19µ ± 4% +103.34% (p=0.000 n=10) /STDOUT-24 39.55µ ± 4% 623.45µ ± 1% +1476.21% (p=0.000 n=10) 89.51µ ± 3% +126.29% (p=0.000 n=10) /NOOP-24 13.179µ ± 2% 4.950µ ± 1% -62.44% (p=0.000 n=10) 4.932µ ± 1% -62.58% (p=0.000 n=10) geomean 27.84µ 208.4µ +648.68% 33.37µ +19.86% │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ B/op │ B/op vs base │ B/op vs base │ /OTLP-24 98.90Ki ± 8% 210.44Ki ± 0% +112.79% (p=0.000 n=10) 52.01Ki ± 0% -47.41% (p=0.000 n=10) /STDOUT-24 91.01Ki ± 0% 286.62Ki ± 0% +214.93% (p=0.000 n=10) 81.67Ki ± 1% -10.27% (p=0.000 n=10) /NOOP-24 29840.0 ± 0% 240.0 ± 0% -99.20% (p=0.000 n=10) 240.0 ± 0% -99.20% (p=0.000 n=10) geomean 64.01Ki 24.18Ki -62.23% 9.985Ki -84.40% │ spanevents.txt │ logs.base.txt │ logs.new.txt │ │ allocs/op │ allocs/op vs base │ allocs/op vs base │ /OTLP-24 699.0 ± 20% 2174.5 ± 1% +211.09% (p=0.000 n=10) 249.5 ± 1% -64.31% (p=0.000 n=10) /STDOUT-24 646.5 ± 1% 4912.5 ± 0% +659.86% (p=0.000 n=10) 872.5 ± 2% +34.96% (p=0.000 n=10) /NOOP-24 303.000 ± 0% 3.000 ± 0% -99.01% (p=0.000 n=10) 3.000 ± 0% -99.01% (p=0.000 n=10) geomean 515.4 317.6 -38.37% 86.76 -83.17% ``` ```sh goos: linux goarch: amd64 pkg: github.com/pellared/spanevents-vs-logs cpu: AMD Ryzen 9 7900X 12-Core Processor │ logs.base.txt │ logs.new.txt │ │ sec/op │ sec/op vs base │ /OTLP-24 2935.15µ ± 9% 84.19µ ± 4% -97.13% (p=0.000 n=10) /STDOUT-24 623.45µ ± 1% 89.51µ ± 3% -85.64% (p=0.000 n=10) /NOOP-24 4.950µ ± 1% 4.932µ ± 1% ~ (p=0.342 n=10) geomean 208.4µ 33.37µ -83.99% │ logs.base.txt │ logs.new.txt │ │ B/op │ B/op vs base │ /OTLP-24 210.44Ki ± 0% 52.01Ki ± 0% -75.28% (p=0.000 n=10) /STDOUT-24 286.62Ki ± 0% 81.67Ki ± 1% -71.51% (p=0.000 n=10) /NOOP-24 240.0 ± 0% 240.0 ± 0% ~ (p=1.000 n=10) ¹ geomean 24.18Ki 9.985Ki -58.70% ¹ all samples are equal │ logs.base.txt │ logs.new.txt │ │ allocs/op │ allocs/op vs base │ /OTLP-24 2174.5 ± 1% 249.5 ± 1% -88.53% (p=0.000 n=10) /STDOUT-24 4912.5 ± 0% 872.5 ± 2% -82.24% (p=0.000 n=10) /NOOP-24 3.000 ± 0% 3.000 ± 0% ~ (p=1.000 n=10) ¹ geomean 317.6 86.76 -72.69% ¹ all samples are equal ``` --------- Co-authored-by: Robert Pająk <pellared@hotmail.com> Co-authored-by: Damien Mathieu <42@dmathieu.com>
2025-04-09 13:36:25 +01:00
func (e *bufferExporter) Ready() bool {
return len(e.input) != cap(e.input)
}
var errStopped = errors.New("exporter stopped")
func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error {
data := exportData{ctx, records, rCh}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// Check stopped before enqueueing now that e.inputMu is held. This
// prevents sends on a closed chan when Shutdown is called concurrently.
if e.stopped.Load() {
return errStopped
}
select {
case e.input <- data:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// EnqueueExport enqueues an export of records in the context of ctx to be
Implement the BatchingProcessor (#5093) * [WIP] Implement the BatchingProcessor * Add TestExportSync * Add TestChunker * Test export error default to ErrorHandler * Fix lint * Fix chunk smaller than size error * Add batch tests * Fix lint * Update OnEmit test Check the len of records in eventually assertion given that is what we are going to measure. * Revert unneeded change to BatchingProcessor doc * Add batch type * Refactor testing of batching config The BatchingProcessor is not expected to ultimately contain configuration fields for queue size or export parameters (see #5093). This will break TestNewBatchingProcessorConfiguration which tests the configuration by evaluating the BatchingProcessor directly. Instead, test the batchingConfig and rename the test to TestNewBatchingConfig to match what is being tested. * Implement the BatchingProcessor without polling * Add TestBatchingProcessor * Add ConcurrentSafe test * Expand Shutdown tests * Test context canceled for ForceFlush * Refactor batch to queue * Use exportSync * Update docs and naming * Split buffered export to its own type * Update comments and naming * Fix lint * Remove redundant triggered type * Add interval polling * Refactor test structure * Add custom ring implimementation * Add BenchmarkBatchingProcessor * Fix merge * Remove custom ring impl * Remove BenchmarkBatchingProcessor * Update dev docs * Test nil exporter * Update OnEmit test Ensure the poll goroutine will completely flush the queue of batches. * Test RetriggerFlushNonBlocking * Update ascii diagram * Fix flaky OnEmit * Revert unnecessary change to test pkg name * Use batching term in docs * Document EnqueueExport * Return from EnqueueExport if blocked Do not wait for the enqueue to succeed. * Do not drop failed flush log records * Use cancelable ctx in concurrency test * Fix comments * Apply feedback Do not spawn a goroutine for the flush operation. * Return true from EnqueueExport when stopped * Update sdk/log/batch.go Co-authored-by: Robert Pająk <pellared@hotmail.com> * Remove TODO * Comment re-trigger in poll --------- Co-authored-by: Robert Pająk <pellared@hotmail.com>
2024-04-18 07:48:19 -07:00
// performed asynchronously. This will return true if the records are
// successfully enqueued (or the bufferExporter is shut down), false otherwise.
//
// The passed records are held after this call returns.
func (e *bufferExporter) EnqueueExport(records []Record) bool {
if len(records) == 0 {
// Nothing to enqueue, do not waste input space.
return true
}
Implement the BatchingProcessor (#5093) * [WIP] Implement the BatchingProcessor * Add TestExportSync * Add TestChunker * Test export error default to ErrorHandler * Fix lint * Fix chunk smaller than size error * Add batch tests * Fix lint * Update OnEmit test Check the len of records in eventually assertion given that is what we are going to measure. * Revert unneeded change to BatchingProcessor doc * Add batch type * Refactor testing of batching config The BatchingProcessor is not expected to ultimately contain configuration fields for queue size or export parameters (see #5093). This will break TestNewBatchingProcessorConfiguration which tests the configuration by evaluating the BatchingProcessor directly. Instead, test the batchingConfig and rename the test to TestNewBatchingConfig to match what is being tested. * Implement the BatchingProcessor without polling * Add TestBatchingProcessor * Add ConcurrentSafe test * Expand Shutdown tests * Test context canceled for ForceFlush * Refactor batch to queue * Use exportSync * Update docs and naming * Split buffered export to its own type * Update comments and naming * Fix lint * Remove redundant triggered type * Add interval polling * Refactor test structure * Add custom ring implimementation * Add BenchmarkBatchingProcessor * Fix merge * Remove custom ring impl * Remove BenchmarkBatchingProcessor * Update dev docs * Test nil exporter * Update OnEmit test Ensure the poll goroutine will completely flush the queue of batches. * Test RetriggerFlushNonBlocking * Update ascii diagram * Fix flaky OnEmit * Revert unnecessary change to test pkg name * Use batching term in docs * Document EnqueueExport * Return from EnqueueExport if blocked Do not wait for the enqueue to succeed. * Do not drop failed flush log records * Use cancelable ctx in concurrency test * Fix comments * Apply feedback Do not spawn a goroutine for the flush operation. * Return true from EnqueueExport when stopped * Update sdk/log/batch.go Co-authored-by: Robert Pająk <pellared@hotmail.com> * Remove TODO * Comment re-trigger in poll --------- Co-authored-by: Robert Pająk <pellared@hotmail.com>
2024-04-18 07:48:19 -07:00
data := exportData{ctx: context.Background(), records: records}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// Check stopped before enqueueing now that e.inputMu is held. This
// prevents sends on a closed chan when Shutdown is called concurrently.
if e.stopped.Load() {
return true
}
select {
case e.input <- data:
return true
default:
return false
}
}
// Export synchronously exports records in the context of ctx. This will not
// return until the export has been completed.
func (e *bufferExporter) Export(ctx context.Context, records []Record) error {
if len(records) == 0 {
return nil
}
resp := make(chan error, 1)
err := e.enqueue(ctx, records, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return fmt.Errorf("%w: dropping %d records", err, len(records))
}
select {
case err := <-resp:
return err
case <-ctx.Done():
return ctx.Err()
}
}
// ForceFlush flushes buffered exports. Any existing exports that is buffered
// is flushed before this returns.
func (e *bufferExporter) ForceFlush(ctx context.Context) error {
resp := make(chan error, 1)
err := e.enqueue(ctx, nil, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return err
}
select {
case <-resp:
case <-ctx.Done():
return ctx.Err()
}
return e.Exporter.ForceFlush(ctx)
}
// Shutdown shuts down e.
//
// Any buffered exports are flushed before this returns.
//
// All calls to EnqueueExport or Exporter will return nil without any export
// after this is called.
func (e *bufferExporter) Shutdown(ctx context.Context) error {
if e.stopped.Swap(true) {
return nil
}
e.inputMu.Lock()
defer e.inputMu.Unlock()
// No more sends will be made.
close(e.input)
select {
case <-e.done:
case <-ctx.Done():
return errors.Join(ctx.Err(), e.Exporter.Shutdown(ctx))
}
return e.Exporter.Shutdown(ctx)
}