1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-02-07 13:31:42 +02:00

Fix sdktrace.TraceProvider Shutdown/ForceFlush when no processor register (#3268)

* Fix sdktrace.TraceProvider Shutdown/ForceFlush when no processor register

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Update CHANGELOG.md

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Bogdan Drutu 2022-10-12 13:44:18 -07:00 committed by GitHub
parent 4a3adaafd4
commit e4bdfe7b56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 60 additions and 58 deletions

View File

@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [Unreleased] ## [Unreleased]
### Changed
- `sdktrace.TraceProvider.Shutdown` and `sdktrace.TraceProvider.ForceFlush` to not return error when no processor register. (#3268)
## [1.11.0/0.32.3] 2022-10-12 ## [1.11.0/0.32.3] 2022-10-12
### Added ### Added

View File

@ -116,12 +116,13 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
spanLimits: o.spanLimits, spanLimits: o.spanLimits,
resource: o.resource, resource: o.resource,
} }
global.Info("TracerProvider created", "config", o) global.Info("TracerProvider created", "config", o)
spss := spanProcessorStates{}
for _, sp := range o.processors { for _, sp := range o.processors {
tp.RegisterSpanProcessor(sp) spss = append(spss, newSpanProcessorState(sp))
} }
tp.spanProcessors.Store(spss)
return tp return tp
} }
@ -159,44 +160,38 @@ func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.T
} }
// RegisterSpanProcessor adds the given SpanProcessor to the list of SpanProcessors. // RegisterSpanProcessor adds the given SpanProcessor to the list of SpanProcessors.
func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) { func (p *TracerProvider) RegisterSpanProcessor(sp SpanProcessor) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
newSPS := spanProcessorStates{} newSPS := spanProcessorStates{}
if old, ok := p.spanProcessors.Load().(spanProcessorStates); ok { newSPS = append(newSPS, p.spanProcessors.Load().(spanProcessorStates)...)
newSPS = append(newSPS, old...) newSPS = append(newSPS, newSpanProcessorState(sp))
}
newSpanSync := &spanProcessorState{
sp: s,
state: &sync.Once{},
}
newSPS = append(newSPS, newSpanSync)
p.spanProcessors.Store(newSPS) p.spanProcessors.Store(newSPS)
} }
// UnregisterSpanProcessor removes the given SpanProcessor from the list of SpanProcessors. // UnregisterSpanProcessor removes the given SpanProcessor from the list of SpanProcessors.
func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) { func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
spss := spanProcessorStates{} old := p.spanProcessors.Load().(spanProcessorStates)
old, ok := p.spanProcessors.Load().(spanProcessorStates) if len(old) == 0 {
if !ok || len(old) == 0 {
return return
} }
spss := spanProcessorStates{}
spss = append(spss, old...) spss = append(spss, old...)
// stop the span processor if it is started and remove it from the list // stop the span processor if it is started and remove it from the list
var stopOnce *spanProcessorState var stopOnce *spanProcessorState
var idx int var idx int
for i, sps := range spss { for i, sps := range spss {
if sps.sp == s { if sps.sp == sp {
stopOnce = sps stopOnce = sps
idx = i idx = i
} }
} }
if stopOnce != nil { if stopOnce != nil {
stopOnce.state.Do(func() { stopOnce.state.Do(func() {
if err := s.Shutdown(context.Background()); err != nil { if err := sp.Shutdown(context.Background()); err != nil {
otel.Handle(err) otel.Handle(err)
} }
}) })
@ -213,10 +208,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
// ForceFlush immediately exports all spans that have not yet been exported for // ForceFlush immediately exports all spans that have not yet been exported for
// all the registered span processors. // all the registered span processors.
func (p *TracerProvider) ForceFlush(ctx context.Context) error { func (p *TracerProvider) ForceFlush(ctx context.Context) error {
spss, ok := p.spanProcessors.Load().(spanProcessorStates) spss := p.spanProcessors.Load().(spanProcessorStates)
if !ok {
return fmt.Errorf("failed to load span processors")
}
if len(spss) == 0 { if len(spss) == 0 {
return nil return nil
} }
@ -237,10 +229,11 @@ func (p *TracerProvider) ForceFlush(ctx context.Context) error {
// Shutdown shuts down the span processors in the order they were registered. // Shutdown shuts down the span processors in the order they were registered.
func (p *TracerProvider) Shutdown(ctx context.Context) error { func (p *TracerProvider) Shutdown(ctx context.Context) error {
spss, ok := p.spanProcessors.Load().(spanProcessorStates) spss := p.spanProcessors.Load().(spanProcessorStates)
if !ok { if len(spss) == 0 {
return fmt.Errorf("failed to load span processors") return nil
} }
var retErr error var retErr error
for _, sps := range spss { for _, sps := range spss {
select { select {

View File

@ -28,41 +28,45 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
) )
type basicSpanProcesor struct { type basicSpanProcessor struct {
running bool flushed bool
closed bool
injectShutdownError error injectShutdownError error
} }
func (t *basicSpanProcesor) Shutdown(context.Context) error { func (t *basicSpanProcessor) Shutdown(context.Context) error {
t.running = false t.closed = true
return t.injectShutdownError return t.injectShutdownError
} }
func (t *basicSpanProcesor) OnStart(context.Context, ReadWriteSpan) {} func (t *basicSpanProcessor) OnStart(context.Context, ReadWriteSpan) {}
func (t *basicSpanProcesor) OnEnd(ReadOnlySpan) {} func (t *basicSpanProcessor) OnEnd(ReadOnlySpan) {}
func (t *basicSpanProcesor) ForceFlush(context.Context) error { func (t *basicSpanProcessor) ForceFlush(context.Context) error {
t.flushed = true
return nil return nil
} }
func TestForceFlushAndShutdownTraceProviderWithoutProcessor(t *testing.T) {
stp := NewTracerProvider()
assert.NoError(t, stp.ForceFlush(context.Background()))
assert.NoError(t, stp.Shutdown(context.Background()))
}
func TestShutdownTraceProvider(t *testing.T) { func TestShutdownTraceProvider(t *testing.T) {
stp := NewTracerProvider() stp := NewTracerProvider()
sp := &basicSpanProcesor{} sp := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp) stp.RegisterSpanProcessor(sp)
sp.running = true assert.NoError(t, stp.ForceFlush(context.Background()))
assert.True(t, sp.flushed, "error ForceFlush basicSpanProcessor")
_ = stp.Shutdown(context.Background()) assert.NoError(t, stp.Shutdown(context.Background()))
assert.True(t, sp.closed, "error Shutdown basicSpanProcessor")
if sp.running {
t.Errorf("Error shutdown basicSpanProcesor\n")
}
} }
func TestFailedProcessorShutdown(t *testing.T) { func TestFailedProcessorShutdown(t *testing.T) {
stp := NewTracerProvider() stp := NewTracerProvider()
spErr := errors.New("basic span processor shutdown failure") spErr := errors.New("basic span processor shutdown failure")
sp := &basicSpanProcesor{ sp := &basicSpanProcessor{
running: true,
injectShutdownError: spErr, injectShutdownError: spErr,
} }
stp.RegisterSpanProcessor(sp) stp.RegisterSpanProcessor(sp)
@ -76,12 +80,10 @@ func TestFailedProcessorsShutdown(t *testing.T) {
stp := NewTracerProvider() stp := NewTracerProvider()
spErr1 := errors.New("basic span processor shutdown failure1") spErr1 := errors.New("basic span processor shutdown failure1")
spErr2 := errors.New("basic span processor shutdown failure2") spErr2 := errors.New("basic span processor shutdown failure2")
sp1 := &basicSpanProcesor{ sp1 := &basicSpanProcessor{
running: true,
injectShutdownError: spErr1, injectShutdownError: spErr1,
} }
sp2 := &basicSpanProcesor{ sp2 := &basicSpanProcessor{
running: true,
injectShutdownError: spErr2, injectShutdownError: spErr2,
} }
stp.RegisterSpanProcessor(sp1) stp.RegisterSpanProcessor(sp1)
@ -90,16 +92,15 @@ func TestFailedProcessorsShutdown(t *testing.T) {
err := stp.Shutdown(context.Background()) err := stp.Shutdown(context.Background())
assert.Error(t, err) assert.Error(t, err)
assert.EqualError(t, err, "basic span processor shutdown failure1; basic span processor shutdown failure2") assert.EqualError(t, err, "basic span processor shutdown failure1; basic span processor shutdown failure2")
assert.False(t, sp1.running) assert.True(t, sp1.closed)
assert.False(t, sp2.running) assert.True(t, sp2.closed)
} }
func TestFailedProcessorShutdownInUnregister(t *testing.T) { func TestFailedProcessorShutdownInUnregister(t *testing.T) {
handler.Reset() handler.Reset()
stp := NewTracerProvider() stp := NewTracerProvider()
spErr := errors.New("basic span processor shutdown failure") spErr := errors.New("basic span processor shutdown failure")
sp := &basicSpanProcesor{ sp := &basicSpanProcessor{
running: true,
injectShutdownError: spErr, injectShutdownError: spErr,
} }
stp.RegisterSpanProcessor(sp) stp.RegisterSpanProcessor(sp)

View File

@ -423,14 +423,13 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) {
} }
s.mu.Unlock() s.mu.Unlock()
if sps, ok := s.tracer.provider.spanProcessors.Load().(spanProcessorStates); ok { sps := s.tracer.provider.spanProcessors.Load().(spanProcessorStates)
if len(sps) == 0 { if len(sps) == 0 {
return return
} }
snap := s.snapshot() snap := s.snapshot()
for _, sp := range sps { for _, sp := range sps {
sp.sp.OnEnd(snap) sp.sp.OnEnd(snap)
}
} }
} }

View File

@ -64,4 +64,9 @@ type spanProcessorState struct {
sp SpanProcessor sp SpanProcessor
state *sync.Once state *sync.Once
} }
func newSpanProcessorState(sp SpanProcessor) *spanProcessorState {
return &spanProcessorState{sp: sp, state: &sync.Once{}}
}
type spanProcessorStates []*spanProcessorState type spanProcessorStates []*spanProcessorState

View File

@ -51,7 +51,7 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...trace.SpanS
s := tr.newSpan(ctx, name, &config) s := tr.newSpan(ctx, name, &config)
if rw, ok := s.(ReadWriteSpan); ok && s.IsRecording() { if rw, ok := s.(ReadWriteSpan); ok && s.IsRecording() {
sps, _ := tr.provider.spanProcessors.Load().(spanProcessorStates) sps := tr.provider.spanProcessors.Load().(spanProcessorStates)
for _, sp := range sps { for _, sp := range sps {
sp.sp.OnStart(ctx, rw) sp.sp.OnStart(ctx, rw)
} }