mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-02-03 13:11:53 +02:00
Register/unregister in the fixed order (#1198)
- change the processors' map to array - increase test coverage Signed-off-by: Hui Kang <kangh@us.ibm.com> Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com> Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
parent
559fecd73e
commit
b97533a74b
@ -65,6 +65,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Rename `NewProvider` to `NewTracerProvider` in the `go.opentelemetry.io/otel/sdk/trace` package. (#1190)
|
||||
- Renamed `SamplingDecision` values to comply with OpenTelemetry specification change. (#1192)
|
||||
- Renamed Zipkin attribute names from `ot.status_code & ot.status_description` to `otel.status_code & otel.status_description`. (#1201)
|
||||
- The default SDK now invokes registered `SpanProcessor`s in the order they were registered with the `TracerProvider`. (#1195)
|
||||
|
||||
### Removed
|
||||
|
||||
|
@ -109,13 +109,15 @@ func (p *TracerProvider) Tracer(name string, opts ...apitrace.TracerOption) apit
|
||||
func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
new := make(spanProcessorMap)
|
||||
if old, ok := p.spanProcessors.Load().(spanProcessorMap); ok {
|
||||
for k, v := range old {
|
||||
new[k] = v
|
||||
}
|
||||
new := spanProcessorStates{}
|
||||
if old, ok := p.spanProcessors.Load().(spanProcessorStates); ok {
|
||||
new = append(new, old...)
|
||||
}
|
||||
new[s] = &sync.Once{}
|
||||
newSpanSync := &spanProcessorState{
|
||||
sp: s,
|
||||
state: &sync.Once{},
|
||||
}
|
||||
new = append(new, newSpanSync)
|
||||
p.spanProcessors.Store(new)
|
||||
}
|
||||
|
||||
@ -123,18 +125,33 @@ func (p *TracerProvider) RegisterSpanProcessor(s SpanProcessor) {
|
||||
func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
new := make(spanProcessorMap)
|
||||
if old, ok := p.spanProcessors.Load().(spanProcessorMap); ok {
|
||||
for k, v := range old {
|
||||
new[k] = v
|
||||
new := spanProcessorStates{}
|
||||
old, ok := p.spanProcessors.Load().(spanProcessorStates)
|
||||
if !ok || len(old) == 0 {
|
||||
return
|
||||
}
|
||||
new = append(new, old...)
|
||||
|
||||
// stop the span processor if it is started and remove it from the list
|
||||
var stopOnce *spanProcessorState
|
||||
var idx int
|
||||
for i, sps := range new {
|
||||
if sps.sp == s {
|
||||
stopOnce = sps
|
||||
idx = i
|
||||
}
|
||||
}
|
||||
if stopOnce, ok := new[s]; ok && stopOnce != nil {
|
||||
stopOnce.Do(func() {
|
||||
if stopOnce != nil {
|
||||
stopOnce.state.Do(func() {
|
||||
s.Shutdown()
|
||||
})
|
||||
}
|
||||
delete(new, s)
|
||||
if len(new) > 1 {
|
||||
copy(new[idx:], new[idx+1:])
|
||||
}
|
||||
new[len(new)-1] = nil
|
||||
new = new[:len(new)-1]
|
||||
|
||||
p.spanProcessors.Store(new)
|
||||
}
|
||||
|
||||
|
@ -139,8 +139,8 @@ func (s *span) End(options ...apitrace.SpanOption) {
|
||||
}
|
||||
config := apitrace.NewSpanConfig(options...)
|
||||
s.endOnce.Do(func() {
|
||||
sps, _ := s.tracer.provider.spanProcessors.Load().(spanProcessorMap)
|
||||
mustExportOrProcess := len(sps) > 0
|
||||
sps, ok := s.tracer.provider.spanProcessors.Load().(spanProcessorStates)
|
||||
mustExportOrProcess := ok && len(sps) > 0
|
||||
if mustExportOrProcess {
|
||||
sd := s.makeSpanData()
|
||||
if config.Timestamp.IsZero() {
|
||||
@ -148,8 +148,8 @@ func (s *span) End(options ...apitrace.SpanOption) {
|
||||
} else {
|
||||
sd.EndTime = config.Timestamp
|
||||
}
|
||||
for sp := range sps {
|
||||
sp.OnEnd(sd)
|
||||
for _, sp := range sps {
|
||||
sp.sp.OnEnd(sd)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
@ -43,4 +43,8 @@ type SpanProcessor interface {
|
||||
ForceFlush()
|
||||
}
|
||||
|
||||
type spanProcessorMap map[SpanProcessor]*sync.Once
|
||||
type spanProcessorState struct {
|
||||
sp SpanProcessor
|
||||
state *sync.Once
|
||||
}
|
||||
type spanProcessorStates []*spanProcessorState
|
||||
|
@ -45,47 +45,66 @@ func (t *testSpanProcesor) ForceFlush() {
|
||||
func TestRegisterSpanProcessort(t *testing.T) {
|
||||
name := "Register span processor before span starts"
|
||||
tp := basicTracerProvider(t)
|
||||
sp := NewTestSpanProcessor()
|
||||
tp.RegisterSpanProcessor(sp)
|
||||
sps := []*testSpanProcesor{
|
||||
NewTestSpanProcessor(),
|
||||
NewTestSpanProcessor(),
|
||||
}
|
||||
|
||||
for _, sp := range sps {
|
||||
tp.RegisterSpanProcessor(sp)
|
||||
}
|
||||
|
||||
tr := tp.Tracer("SpanProcessor")
|
||||
_, span := tr.Start(context.Background(), "OnStart")
|
||||
span.End()
|
||||
wantCount := 1
|
||||
gotCount := len(sp.spansStarted)
|
||||
if gotCount != wantCount {
|
||||
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
|
||||
}
|
||||
gotCount = len(sp.spansEnded)
|
||||
if gotCount != wantCount {
|
||||
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
|
||||
|
||||
for _, sp := range sps {
|
||||
gotCount := len(sp.spansStarted)
|
||||
if gotCount != wantCount {
|
||||
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
|
||||
}
|
||||
gotCount = len(sp.spansEnded)
|
||||
if gotCount != wantCount {
|
||||
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnregisterSpanProcessor(t *testing.T) {
|
||||
name := "Start span after unregistering span processor"
|
||||
tp := basicTracerProvider(t)
|
||||
sp := NewTestSpanProcessor()
|
||||
tp.RegisterSpanProcessor(sp)
|
||||
sps := []*testSpanProcesor{
|
||||
NewTestSpanProcessor(),
|
||||
NewTestSpanProcessor(),
|
||||
}
|
||||
|
||||
for _, sp := range sps {
|
||||
tp.RegisterSpanProcessor(sp)
|
||||
}
|
||||
|
||||
tr := tp.Tracer("SpanProcessor")
|
||||
_, span := tr.Start(context.Background(), "OnStart")
|
||||
span.End()
|
||||
tp.UnregisterSpanProcessor(sp)
|
||||
for _, sp := range sps {
|
||||
tp.UnregisterSpanProcessor(sp)
|
||||
}
|
||||
|
||||
// start another span after unregistering span processor.
|
||||
_, span = tr.Start(context.Background(), "Start span after unregister")
|
||||
span.End()
|
||||
|
||||
wantCount := 1
|
||||
gotCount := len(sp.spansStarted)
|
||||
if gotCount != wantCount {
|
||||
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
|
||||
}
|
||||
for _, sp := range sps {
|
||||
wantCount := 1
|
||||
gotCount := len(sp.spansStarted)
|
||||
if gotCount != wantCount {
|
||||
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
|
||||
}
|
||||
|
||||
gotCount = len(sp.spansEnded)
|
||||
if gotCount != wantCount {
|
||||
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
|
||||
gotCount = len(sp.spansEnded)
|
||||
if gotCount != wantCount {
|
||||
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,9 +58,9 @@ func (tr *tracer) Start(ctx context.Context, name string, options ...apitrace.Sp
|
||||
span.tracer = tr
|
||||
|
||||
if span.IsRecording() {
|
||||
sps, _ := tr.provider.spanProcessors.Load().(spanProcessorMap)
|
||||
for sp := range sps {
|
||||
sp.OnStart(span.data)
|
||||
sps, _ := tr.provider.spanProcessors.Load().(spanProcessorStates)
|
||||
for _, sp := range sps {
|
||||
sp.sp.OnStart(span.data)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user