From 9be18c14cb7876e22f81b5d4eae95188e6e37304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Thu, 27 Feb 2025 11:28:31 +0100 Subject: [PATCH] sdk/trace: Fix gorountine leak in batchSpanProcessor.ForceFlush (#6369) Fixes https://github.com/open-telemetry/opentelemetry-go/issues/6360 Per https://github.com/open-telemetry/opentelemetry-go/issues/6360#issuecomment-2678080742: > So I'd fix this issue with the first proposed solution, and open an issue to change the behavior in a separate PR. ``` $ go test -run=TestBatchSpanProcessorForceFlushTimeout -count=1000 PASS ok go.opentelemetry.io/otel/sdk/trace 1.701s $ go test -run=TestBatchSpanProcessorForceFlushTimeout -count=1000 -race PASS ok go.opentelemetry.io/otel/sdk/trace 4.056s ``` @peterbourgon, thank you for your contribution :medal_sports: --- CHANGELOG.md | 1 + sdk/trace/batch_span_processor.go | 3 +- sdk/trace/batch_span_processor_test.go | 43 ++++++++++++++++++-------- 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b4eef8be..f781fc0ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ The next release will require at least [Go 1.23]. ### Fixes - Eliminate goroutine leak for the processor returned by `NewSimpleSpanProcessor` when `Shutdown` is called and the passed `ctx` is canceled and `SpanExporter.Shutdown` has not returned. (#6368) +- Eliminate goroutine leak for the processor returned by `NewBatchSpanProcessor` when `ForceFlush` is called and the passed `ctx` is canceled and `SpanExporter.Export` has not returned. (#6369) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index ccc97e1b6..6872cbb4e 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -201,10 +201,9 @@ func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { } } - wait := make(chan error) + wait := make(chan error, 1) go func() { wait <- bsp.exportSpans(ctx) - close(wait) }() // Wait until the export is finished or the context is cancelled/timed out select { diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 49575e841..2259fac45 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -529,11 +529,24 @@ func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) { } } -type indefiniteExporter struct{} +type indefiniteExporter struct { + stop chan (struct{}) +} -func (indefiniteExporter) Shutdown(context.Context) error { return nil } -func (indefiniteExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error { - <-ctx.Done() +func newIndefiniteExporter(t *testing.T) indefiniteExporter { + e := indefiniteExporter{stop: make(chan struct{})} + t.Cleanup(func() { + go close(e.stop) + }) + return e +} + +func (e indefiniteExporter) Shutdown(context.Context) error { + return nil +} + +func (e indefiniteExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error { + <-e.stop return ctx.Err() } @@ -542,25 +555,29 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) { // Cancel the context cancel() - bsp := sdktrace.NewBatchSpanProcessor(indefiniteExporter{}) + bsp := sdktrace.NewBatchSpanProcessor(newIndefiniteExporter(t)) t.Cleanup(func() { assert.NoError(t, bsp.Shutdown(context.Background())) }) + if got, want := bsp.ForceFlush(ctx), context.Canceled; !errors.Is(got, want) { t.Errorf("expected %q error, got %v", want, got) } } func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) { - // Add timeout to context to test deadline - ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) - defer cancel() - <-ctx.Done() + tp := basicTracerProvider(t) + exp := newIndefiniteExporter(t) + bsp := sdktrace.NewBatchSpanProcessor(exp) + tp.RegisterSpanProcessor(bsp) + tr := tp.Tracer(t.Name()) + _, span := tr.Start(context.Background(), "foo") + span.End() + + // Add timeout to context to test deadline + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() - bsp := sdktrace.NewBatchSpanProcessor(indefiniteExporter{}) - t.Cleanup(func() { - assert.NoError(t, bsp.Shutdown(context.Background())) - }) if got, want := bsp.ForceFlush(ctx), context.DeadlineExceeded; !errors.Is(got, want) { t.Errorf("expected %q error, got %v", want, got) }