1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-08-10 22:31:50 +02:00

sdk/trace: Fix gorountine leak in batchSpanProcessor.ForceFlush (#6369)

Fixes https://github.com/open-telemetry/opentelemetry-go/issues/6360

Per
https://github.com/open-telemetry/opentelemetry-go/issues/6360#issuecomment-2678080742:

> So I'd fix this issue with the first proposed solution, and open an
issue to change the behavior in a separate PR.

```
$ go test -run=TestBatchSpanProcessorForceFlushTimeout -count=1000
PASS
ok      go.opentelemetry.io/otel/sdk/trace      1.701s
$ go test -run=TestBatchSpanProcessorForceFlushTimeout -count=1000 -race
PASS
ok      go.opentelemetry.io/otel/sdk/trace      4.056s
```

@peterbourgon, thank you for your contribution 🏅
This commit is contained in:
Robert Pająk
2025-02-27 11:28:31 +01:00
committed by GitHub
parent 23c76d3849
commit 9be18c14cb
3 changed files with 32 additions and 15 deletions

View File

@@ -42,6 +42,7 @@ The next release will require at least [Go 1.23].
### Fixes ### Fixes
- Eliminate goroutine leak for the processor returned by `NewSimpleSpanProcessor` when `Shutdown` is called and the passed `ctx` is canceled and `SpanExporter.Shutdown` has not returned. (#6368) - Eliminate goroutine leak for the processor returned by `NewSimpleSpanProcessor` when `Shutdown` is called and the passed `ctx` is canceled and `SpanExporter.Shutdown` has not returned. (#6368)
- Eliminate goroutine leak for the processor returned by `NewBatchSpanProcessor` when `ForceFlush` is called and the passed `ctx` is canceled and `SpanExporter.Export` has not returned. (#6369)
<!-- Released section --> <!-- Released section -->
<!-- Don't change this section unless doing release --> <!-- Don't change this section unless doing release -->

View File

@@ -201,10 +201,9 @@ func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
} }
} }
wait := make(chan error) wait := make(chan error, 1)
go func() { go func() {
wait <- bsp.exportSpans(ctx) wait <- bsp.exportSpans(ctx)
close(wait)
}() }()
// Wait until the export is finished or the context is cancelled/timed out // Wait until the export is finished or the context is cancelled/timed out
select { select {

View File

@@ -529,11 +529,24 @@ func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) {
} }
} }
type indefiniteExporter struct{} type indefiniteExporter struct {
stop chan (struct{})
}
func (indefiniteExporter) Shutdown(context.Context) error { return nil } func newIndefiniteExporter(t *testing.T) indefiniteExporter {
func (indefiniteExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error { e := indefiniteExporter{stop: make(chan struct{})}
<-ctx.Done() t.Cleanup(func() {
go close(e.stop)
})
return e
}
func (e indefiniteExporter) Shutdown(context.Context) error {
return nil
}
func (e indefiniteExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error {
<-e.stop
return ctx.Err() return ctx.Err()
} }
@@ -542,25 +555,29 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
// Cancel the context // Cancel the context
cancel() cancel()
bsp := sdktrace.NewBatchSpanProcessor(indefiniteExporter{}) bsp := sdktrace.NewBatchSpanProcessor(newIndefiniteExporter(t))
t.Cleanup(func() { t.Cleanup(func() {
assert.NoError(t, bsp.Shutdown(context.Background())) assert.NoError(t, bsp.Shutdown(context.Background()))
}) })
if got, want := bsp.ForceFlush(ctx), context.Canceled; !errors.Is(got, want) { if got, want := bsp.ForceFlush(ctx), context.Canceled; !errors.Is(got, want) {
t.Errorf("expected %q error, got %v", want, got) t.Errorf("expected %q error, got %v", want, got)
} }
} }
func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) { func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
// Add timeout to context to test deadline tp := basicTracerProvider(t)
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) exp := newIndefiniteExporter(t)
defer cancel() bsp := sdktrace.NewBatchSpanProcessor(exp)
<-ctx.Done() tp.RegisterSpanProcessor(bsp)
tr := tp.Tracer(t.Name())
_, span := tr.Start(context.Background(), "foo")
span.End()
// Add timeout to context to test deadline
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
bsp := sdktrace.NewBatchSpanProcessor(indefiniteExporter{})
t.Cleanup(func() {
assert.NoError(t, bsp.Shutdown(context.Background()))
})
if got, want := bsp.ForceFlush(ctx), context.DeadlineExceeded; !errors.Is(got, want) { if got, want := bsp.ForceFlush(ctx), context.DeadlineExceeded; !errors.Is(got, want) {
t.Errorf("expected %q error, got %v", want, got) t.Errorf("expected %q error, got %v", want, got)
} }