You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
TracerProvider ForceFlush() Error Fix (#7856)
Previously upon a SpanProcessor's ForceFlush returning an error, it would return that error and not attempt to flush subsequent SpanProcessors. Now when an error is encountered, it will Join the new error with the existing errors and continue iterating through the SpanProcessors and return the consolidated error at the end of iteration. This is in line with the workflow found in LoggerProvider's ForceFlush. --------- Co-authored-by: Robert Pająk <pellared@hotmail.com>
This commit is contained in:
@@ -14,6 +14,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
The package contains semantic conventions from the `v1.40.0` version of the OpenTelemetry Semantic Conventions.
|
||||
See the [migration documentation](./semconv/v1.40.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.39.0`. (#7985)
|
||||
|
||||
### Changed
|
||||
|
||||
- `TracerProvider.ForceFlush` in `go.opentelemetry.io/otel/sdk/trace` joins errors together and continues iteration through SpanProcessors as opposed to returning the first encountered error without attempting exports on subsequent SpanProcessors. (#7856)
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fix missing `request.GetBody` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` to correctly handle HTTP2 GOAWAY frame. (#7931)
|
||||
|
||||
@@ -5,6 +5,7 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@@ -262,6 +263,7 @@ func (p *TracerProvider) ForceFlush(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, sps := range spss {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -269,11 +271,9 @@ func (p *TracerProvider) ForceFlush(ctx context.Context) error {
|
||||
default:
|
||||
}
|
||||
|
||||
if err := sps.sp.ForceFlush(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
err = errors.Join(err, sps.sp.ForceFlush(ctx))
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// Shutdown shuts down TracerProvider. All registered span processors are shut down
|
||||
|
||||
@@ -28,6 +28,7 @@ type basicSpanProcessor struct {
|
||||
flushed bool
|
||||
closed bool
|
||||
injectShutdownError error
|
||||
injectExportError error
|
||||
}
|
||||
|
||||
func (t *basicSpanProcessor) Shutdown(context.Context) error {
|
||||
@@ -39,7 +40,7 @@ func (*basicSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
|
||||
func (*basicSpanProcessor) OnEnd(ReadOnlySpan) {}
|
||||
func (t *basicSpanProcessor) ForceFlush(context.Context) error {
|
||||
t.flushed = true
|
||||
return nil
|
||||
return t.injectExportError
|
||||
}
|
||||
|
||||
type shutdownSpanProcessor struct {
|
||||
@@ -227,6 +228,59 @@ func TestRegisterAfterShutdownWithProcessors(t *testing.T) {
|
||||
assert.Empty(t, stp.getSpanProcessors())
|
||||
}
|
||||
|
||||
func TestTracerProviderForceFlush(t *testing.T) {
|
||||
t.Run("AfterShutdown", func(t *testing.T) {
|
||||
stp := NewTracerProvider()
|
||||
sp1 := &basicSpanProcessor{}
|
||||
stp.RegisterSpanProcessor(sp1)
|
||||
ctx := t.Context()
|
||||
|
||||
require.NoError(t, stp.ForceFlush(ctx))
|
||||
require.True(t, sp1.flushed, "SpanProcessor ForceFlush not called")
|
||||
|
||||
sp1.flushed = false
|
||||
require.NoError(t, stp.Shutdown(ctx))
|
||||
|
||||
require.NoError(t, stp.ForceFlush(ctx))
|
||||
assert.False(t, sp1.flushed, "SpanProcessor ForceFlush called after Shutdown")
|
||||
})
|
||||
|
||||
t.Run("Multi", func(t *testing.T) {
|
||||
stp := NewTracerProvider()
|
||||
sp1 := &basicSpanProcessor{}
|
||||
sp2 := &basicSpanProcessor{}
|
||||
stp.RegisterSpanProcessor(sp1)
|
||||
stp.RegisterSpanProcessor(sp2)
|
||||
ctx := t.Context()
|
||||
|
||||
require.NoError(t, stp.ForceFlush(ctx))
|
||||
require.True(t, sp1.flushed, "SpanProcessor ForceFlush not called")
|
||||
require.True(t, sp2.flushed, "SpanProcessor ForceFlush not called")
|
||||
})
|
||||
|
||||
t.Run("Error", func(t *testing.T) {
|
||||
stp := NewTracerProvider()
|
||||
sp1 := &basicSpanProcessor{injectExportError: assert.AnError}
|
||||
sp2 := &basicSpanProcessor{}
|
||||
stp.RegisterSpanProcessor(sp1)
|
||||
stp.RegisterSpanProcessor(sp2)
|
||||
ctx := t.Context()
|
||||
|
||||
assert.ErrorIs(t, stp.ForceFlush(ctx), assert.AnError, "span processor error not returned")
|
||||
require.True(t, sp1.flushed, "SpanProcessor ForceFlush not called")
|
||||
require.True(t, sp2.flushed, "SpanProcessor ForceFlush not called")
|
||||
})
|
||||
|
||||
t.Run("WithCancel", func(t *testing.T) {
|
||||
stp := NewTracerProvider()
|
||||
sp1 := &basicSpanProcessor{}
|
||||
stp.RegisterSpanProcessor(sp1)
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
cancel()
|
||||
assert.ErrorIs(t, stp.ForceFlush(ctx), context.Canceled)
|
||||
})
|
||||
}
|
||||
|
||||
func TestTracerProviderSamplerConfigFromEnv(t *testing.T) {
|
||||
type testCase struct {
|
||||
sampler string
|
||||
|
||||
Reference in New Issue
Block a user