From b97533a74bd0c436d527d4630680b7f1891a18e6 Mon Sep 17 00:00:00 2001 From: huikang Date: Thu, 24 Sep 2020 14:43:23 -0400 Subject: [PATCH] Register/unregister in the fixed order (#1198) - change the processors' map to array - increase test coverage Signed-off-by: Hui Kang Co-authored-by: Tyler Yahn Co-authored-by: Tyler Yahn --- CHANGELOG.md | 1 + sdk/trace/provider.go | 43 ++++++++++++++++------- sdk/trace/span.go | 8 ++--- sdk/trace/span_processor.go | 6 +++- sdk/trace/span_processor_test.go | 59 +++++++++++++++++++++----------- sdk/trace/tracer.go | 6 ++-- 6 files changed, 82 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 435e0fcb2..81b7cfc43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Rename `NewProvider` to `NewTracerProvider` in the `go.opentelemetry.io/otel/sdk/trace` package. (#1190) - Renamed `SamplingDecision` values to comply with OpenTelemetry specification change. (#1192) - Renamed Zipkin attribute names from `ot.status_code & ot.status_description` to `otel.status_code & otel.status_description`. (#1201) +- The default SDK now invokes registered `SpanProcessor`s in the order they were registered with the `TracerProvider`. (#1195) ### Removed diff --git a/sdk/trace/provider.go b/sdk/trace/provider.go index e1ba09297..cc4cc81b0 100644 --- a/sdk/trace/provider.go +++ b/sdk/trace/provider.go @@ -109,13 +109,15 @@ func (p *TracerProvider) Tracer(name string, opts ...apitrace.TracerOption) apit func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) { p.mu.Lock() defer p.mu.Unlock() - new := make(spanProcessorMap) - if old, ok := p.spanProcessors.Load().(spanProcessorMap); ok { - for k, v := range old { - new[k] = v - } + new := spanProcessorStates{} + if old, ok := p.spanProcessors.Load().(spanProcessorStates); ok { + new = append(new, old...) } - new[s] = &sync.Once{} + newSpanSync := &spanProcessorState{ + sp: s, + state: &sync.Once{}, + } + new = append(new, newSpanSync) p.spanProcessors.Store(new) } @@ -123,18 +125,33 @@ func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) { func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) { p.mu.Lock() defer p.mu.Unlock() - new := make(spanProcessorMap) - if old, ok := p.spanProcessors.Load().(spanProcessorMap); ok { - for k, v := range old { - new[k] = v + new := spanProcessorStates{} + old, ok := p.spanProcessors.Load().(spanProcessorStates) + if !ok || len(old) == 0 { + return + } + new = append(new, old...) + + // stop the span processor if it is started and remove it from the list + var stopOnce *spanProcessorState + var idx int + for i, sps := range new { + if sps.sp == s { + stopOnce = sps + idx = i } } - if stopOnce, ok := new[s]; ok && stopOnce != nil { - stopOnce.Do(func() { + if stopOnce != nil { + stopOnce.state.Do(func() { s.Shutdown() }) } - delete(new, s) + if len(new) > 1 { + copy(new[idx:], new[idx+1:]) + } + new[len(new)-1] = nil + new = new[:len(new)-1] + p.spanProcessors.Store(new) } diff --git a/sdk/trace/span.go b/sdk/trace/span.go index a388c6741..b975c1382 100644 --- a/sdk/trace/span.go +++ b/sdk/trace/span.go @@ -139,8 +139,8 @@ func (s *span) End(options ...apitrace.SpanOption) { } config := apitrace.NewSpanConfig(options...) s.endOnce.Do(func() { - sps, _ := s.tracer.provider.spanProcessors.Load().(spanProcessorMap) - mustExportOrProcess := len(sps) > 0 + sps, ok := s.tracer.provider.spanProcessors.Load().(spanProcessorStates) + mustExportOrProcess := ok && len(sps) > 0 if mustExportOrProcess { sd := s.makeSpanData() if config.Timestamp.IsZero() { @@ -148,8 +148,8 @@ func (s *span) End(options ...apitrace.SpanOption) { } else { sd.EndTime = config.Timestamp } - for sp := range sps { - sp.OnEnd(sd) + for _, sp := range sps { + sp.sp.OnEnd(sd) } } }) diff --git a/sdk/trace/span_processor.go b/sdk/trace/span_processor.go index 2fbe4f88a..e707745f9 100644 --- a/sdk/trace/span_processor.go +++ b/sdk/trace/span_processor.go @@ -43,4 +43,8 @@ type SpanProcessor interface { ForceFlush() } -type spanProcessorMap map[SpanProcessor]*sync.Once +type spanProcessorState struct { + sp SpanProcessor + state *sync.Once +} +type spanProcessorStates []*spanProcessorState diff --git a/sdk/trace/span_processor_test.go b/sdk/trace/span_processor_test.go index 24cf59f00..efddad1e1 100644 --- a/sdk/trace/span_processor_test.go +++ b/sdk/trace/span_processor_test.go @@ -45,47 +45,66 @@ func (t *testSpanProcesor) ForceFlush() { func TestRegisterSpanProcessort(t *testing.T) { name := "Register span processor before span starts" tp := basicTracerProvider(t) - sp := NewTestSpanProcessor() - tp.RegisterSpanProcessor(sp) + sps := []*testSpanProcesor{ + NewTestSpanProcessor(), + NewTestSpanProcessor(), + } + + for _, sp := range sps { + tp.RegisterSpanProcessor(sp) + } tr := tp.Tracer("SpanProcessor") _, span := tr.Start(context.Background(), "OnStart") span.End() wantCount := 1 - gotCount := len(sp.spansStarted) - if gotCount != wantCount { - t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount) - } - gotCount = len(sp.spansEnded) - if gotCount != wantCount { - t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount) + + for _, sp := range sps { + gotCount := len(sp.spansStarted) + if gotCount != wantCount { + t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount) + } + gotCount = len(sp.spansEnded) + if gotCount != wantCount { + t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount) + } } } func TestUnregisterSpanProcessor(t *testing.T) { name := "Start span after unregistering span processor" tp := basicTracerProvider(t) - sp := NewTestSpanProcessor() - tp.RegisterSpanProcessor(sp) + sps := []*testSpanProcesor{ + NewTestSpanProcessor(), + NewTestSpanProcessor(), + } + + for _, sp := range sps { + tp.RegisterSpanProcessor(sp) + } tr := tp.Tracer("SpanProcessor") _, span := tr.Start(context.Background(), "OnStart") span.End() - tp.UnregisterSpanProcessor(sp) + for _, sp := range sps { + tp.UnregisterSpanProcessor(sp) + } // start another span after unregistering span processor. _, span = tr.Start(context.Background(), "Start span after unregister") span.End() - wantCount := 1 - gotCount := len(sp.spansStarted) - if gotCount != wantCount { - t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount) - } + for _, sp := range sps { + wantCount := 1 + gotCount := len(sp.spansStarted) + if gotCount != wantCount { + t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount) + } - gotCount = len(sp.spansEnded) - if gotCount != wantCount { - t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount) + gotCount = len(sp.spansEnded) + if gotCount != wantCount { + t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount) + } } } diff --git a/sdk/trace/tracer.go b/sdk/trace/tracer.go index e05b427e0..d826194ce 100644 --- a/sdk/trace/tracer.go +++ b/sdk/trace/tracer.go @@ -58,9 +58,9 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...apitrace.Sp span.tracer = tr if span.IsRecording() { - sps, _ := tr.provider.spanProcessors.Load().(spanProcessorMap) - for sp := range sps { - sp.OnStart(span.data) + sps, _ := tr.provider.spanProcessors.Load().(spanProcessorStates) + for _, sp := range sps { + sp.sp.OnStart(span.data) } }