1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-11-27 22:49:15 +02:00

Unify TracerProvider span processor lookups (#3942)

* Pre-allocate spanProcessorStates slice

* Make sync.Once a non-pointer

It doesn't need to be a pointer, can be part of the struct to avoid allocating a separate object for it

* getSpanProcessors() helper

* Add tests for UnregisterSpanProcessor()
This commit is contained in:
Mikhail Mazurskiy
2023-04-02 01:57:35 +11:00
committed by GitHub
parent 271df1dc01
commit 22fd10447d
5 changed files with 72 additions and 16 deletions

View File

@@ -120,7 +120,7 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
} }
global.Info("TracerProvider created", "config", o) global.Info("TracerProvider created", "config", o)
spss := spanProcessorStates{} spss := make(spanProcessorStates, 0, len(o.processors))
for _, sp := range o.processors { for _, sp := range o.processors {
spss = append(spss, newSpanProcessorState(sp)) spss = append(spss, newSpanProcessorState(sp))
} }
@@ -192,8 +192,10 @@ func (p *TracerProvider) RegisterSpanProcessor(sp SpanProcessor) {
if p.isShutdown.Load() { if p.isShutdown.Load() {
return return
} }
newSPS := spanProcessorStates{}
newSPS = append(newSPS, *(p.spanProcessors.Load())...) current := p.getSpanProcessors()
newSPS := make(spanProcessorStates, 0, len(current)+1)
newSPS = append(newSPS, current...)
newSPS = append(newSPS, newSpanProcessorState(sp)) newSPS = append(newSPS, newSpanProcessorState(sp))
p.spanProcessors.Store(&newSPS) p.spanProcessors.Store(&newSPS)
} }
@@ -210,12 +212,12 @@ func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) {
if p.isShutdown.Load() { if p.isShutdown.Load() {
return return
} }
old := *(p.spanProcessors.Load()) old := p.getSpanProcessors()
if len(old) == 0 { if len(old) == 0 {
return return
} }
spss := spanProcessorStates{} spss := make(spanProcessorStates, len(old))
spss = append(spss, old...) copy(spss, old)
// stop the span processor if it is started and remove it from the list // stop the span processor if it is started and remove it from the list
var stopOnce *spanProcessorState var stopOnce *spanProcessorState
@@ -245,7 +247,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) {
// ForceFlush immediately exports all spans that have not yet been exported for // ForceFlush immediately exports all spans that have not yet been exported for
// all the registered span processors. // all the registered span processors.
func (p *TracerProvider) ForceFlush(ctx context.Context) error { func (p *TracerProvider) ForceFlush(ctx context.Context) error {
spss := *(p.spanProcessors.Load()) spss := p.getSpanProcessors()
if len(spss) == 0 { if len(spss) == 0 {
return nil return nil
} }
@@ -278,10 +280,9 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error {
if !p.isShutdown.CompareAndSwap(false, true) { // did toggle? if !p.isShutdown.CompareAndSwap(false, true) { // did toggle?
return nil return nil
} }
spss := *(p.spanProcessors.Load())
var retErr error var retErr error
for _, sps := range spss { for _, sps := range p.getSpanProcessors() {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
@@ -305,6 +306,10 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error {
return retErr return retErr
} }
func (p *TracerProvider) getSpanProcessors() spanProcessorStates {
return *(p.spanProcessors.Load())
}
// TracerProviderOption configures a TracerProvider. // TracerProviderOption configures a TracerProvider.
type TracerProviderOption interface { type TracerProviderOption interface {
apply(tracerProviderConfig) tracerProviderConfig apply(tracerProviderConfig) tracerProviderConfig

View File

@@ -80,6 +80,57 @@ func TestForceFlushAndShutdownTraceProviderWithoutProcessor(t *testing.T) {
assert.True(t, stp.isShutdown.Load()) assert.True(t, stp.isShutdown.Load())
} }
func TestUnregisterFirst(t *testing.T) {
stp := NewTracerProvider()
sp1 := &basicSpanProcessor{}
sp2 := &basicSpanProcessor{}
sp3 := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp1)
stp.RegisterSpanProcessor(sp2)
stp.RegisterSpanProcessor(sp3)
stp.UnregisterSpanProcessor(sp1)
sps := stp.getSpanProcessors()
require.Len(t, sps, 2)
assert.Same(t, sp2, sps[0].sp)
assert.Same(t, sp3, sps[1].sp)
}
func TestUnregisterMiddle(t *testing.T) {
stp := NewTracerProvider()
sp1 := &basicSpanProcessor{}
sp2 := &basicSpanProcessor{}
sp3 := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp1)
stp.RegisterSpanProcessor(sp2)
stp.RegisterSpanProcessor(sp3)
stp.UnregisterSpanProcessor(sp2)
sps := stp.getSpanProcessors()
require.Len(t, sps, 2)
assert.Same(t, sp1, sps[0].sp)
assert.Same(t, sp3, sps[1].sp)
}
func TestUnregisterLast(t *testing.T) {
stp := NewTracerProvider()
sp1 := &basicSpanProcessor{}
sp2 := &basicSpanProcessor{}
sp3 := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp1)
stp.RegisterSpanProcessor(sp2)
stp.RegisterSpanProcessor(sp3)
stp.UnregisterSpanProcessor(sp3)
sps := stp.getSpanProcessors()
require.Len(t, sps, 2)
assert.Same(t, sp1, sps[0].sp)
assert.Same(t, sp2, sps[1].sp)
}
func TestShutdownTraceProvider(t *testing.T) { func TestShutdownTraceProvider(t *testing.T) {
stp := NewTracerProvider() stp := NewTracerProvider()
sp := &basicSpanProcessor{} sp := &basicSpanProcessor{}
@@ -162,7 +213,7 @@ func TestRegisterAfterShutdownWithoutProcessors(t *testing.T) {
sp := &basicSpanProcessor{} sp := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp) // no-op stp.RegisterSpanProcessor(sp) // no-op
assert.Empty(t, stp.spanProcessors.Load()) assert.Empty(t, stp.getSpanProcessors())
} }
func TestRegisterAfterShutdownWithProcessors(t *testing.T) { func TestRegisterAfterShutdownWithProcessors(t *testing.T) {
@@ -173,11 +224,11 @@ func TestRegisterAfterShutdownWithProcessors(t *testing.T) {
err := stp.Shutdown(context.Background()) err := stp.Shutdown(context.Background())
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, stp.isShutdown.Load()) assert.True(t, stp.isShutdown.Load())
assert.Empty(t, stp.spanProcessors.Load()) assert.Empty(t, stp.getSpanProcessors())
sp2 := &basicSpanProcessor{} sp2 := &basicSpanProcessor{}
stp.RegisterSpanProcessor(sp2) // no-op stp.RegisterSpanProcessor(sp2) // no-op
assert.Empty(t, stp.spanProcessors.Load()) assert.Empty(t, stp.getSpanProcessors())
} }
func TestTracerProviderSamplerConfigFromEnv(t *testing.T) { func TestTracerProviderSamplerConfigFromEnv(t *testing.T) {

View File

@@ -410,7 +410,7 @@ func (s *recordingSpan) End(options ...trace.SpanEndOption) {
} }
s.mu.Unlock() s.mu.Unlock()
sps := *(s.tracer.provider.spanProcessors.Load()) sps := s.tracer.provider.getSpanProcessors()
if len(sps) == 0 { if len(sps) == 0 {
return return
} }

View File

@@ -62,11 +62,11 @@ type SpanProcessor interface {
type spanProcessorState struct { type spanProcessorState struct {
sp SpanProcessor sp SpanProcessor
state *sync.Once state sync.Once
} }
func newSpanProcessorState(sp SpanProcessor) *spanProcessorState { func newSpanProcessorState(sp SpanProcessor) *spanProcessorState {
return &spanProcessorState{sp: sp, state: &sync.Once{}} return &spanProcessorState{sp: sp}
} }
type spanProcessorStates []*spanProcessorState type spanProcessorStates []*spanProcessorState

View File

@@ -51,7 +51,7 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...trace.SpanS
s := tr.newSpan(ctx, name, &config) s := tr.newSpan(ctx, name, &config)
if rw, ok := s.(ReadWriteSpan); ok && s.IsRecording() { if rw, ok := s.(ReadWriteSpan); ok && s.IsRecording() {
sps := *(tr.provider.spanProcessors.Load()) sps := tr.provider.getSpanProcessors()
for _, sp := range sps { for _, sp := range sps {
sp.sp.OnStart(ctx, rw) sp.sp.OnStart(ctx, rw)
} }