1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-01 22:09:57 +02:00

fix data race in BatchedSpanProcessor (#518)

* fix data race in BatchedSpanProcessor

- fixes #517

* fix ci.

* fix another test.

* move wait group to generateSpan func.
This commit is contained in:
Rahul Patel 2020-03-05 13:41:00 -08:00 committed by GitHub
parent 161556aab8
commit 79de90a313
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 21 deletions

View File

@ -18,6 +18,7 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
export "go.opentelemetry.io/otel/sdk/export/trace"
@ -206,7 +207,7 @@ func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
ok = false
}
if !ok {
bsp.dropped++
atomic.AddUint32(&bsp.dropped, 1)
}
}
}

View File

@ -55,12 +55,6 @@ func (t *testBatchExporter) getBatchCount() int {
return t.batchCount
}
func (t *testBatchExporter) get(idx int) *export.SpanData {
t.mu.Lock()
defer t.mu.Unlock()
return t.spans[idx]
}
var _ export.SpanBatcher = (*testBatchExporter)(nil)
func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
@ -77,6 +71,7 @@ type testOption struct {
wantBatchCount int
genNumSpans int
waitTime time.Duration
parallel bool
}
func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
@ -136,18 +131,30 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
genNumSpans: 205,
waitTime: waitTime,
},
{
name: "parallel span generation",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithScheduleDelayMillis(schDelay),
sdktrace.WithMaxQueueSize(200),
},
wantNumSpans: 200,
wantBatchCount: 1,
genNumSpans: 205,
waitTime: waitTime,
parallel: true,
},
}
for _, option := range options {
te := testBatchExporter{}
tp := basicProvider(t)
ssp := createAndRegisterBatchSP(t, option, &te)
if ssp == nil {
t.Errorf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOptions")
generateSpan(t, tr, option)
generateSpan(t, option.parallel, tr, option)
time.Sleep(option.waitTime)
@ -162,14 +169,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
t.Errorf("Batches %v\n", te.sizes)
}
// Check first Span is reported. Most recent one is dropped.
sc := getSpanContext()
wantTraceID := sc.TraceID
binary.BigEndian.PutUint64(wantTraceID[0:8], uint64(1))
gotTraceID := te.get(0).SpanContext.TraceID
if wantTraceID != gotTraceID {
t.Errorf("%s: first exported span: got %+v, want %+v\n", option.name, gotTraceID, wantTraceID)
}
tp.UnregisterSpanProcessor(ssp)
}
}
@ -182,15 +181,26 @@ func createAndRegisterBatchSP(t *testing.T, option testOption, te *testBatchExpo
return ssp
}
func generateSpan(t *testing.T, tr apitrace.Tracer, option testOption) {
func generateSpan(t *testing.T, parallel bool, tr apitrace.Tracer, option testOption) {
sc := getSpanContext()
wg := &sync.WaitGroup{}
for i := 0; i < option.genNumSpans; i++ {
binary.BigEndian.PutUint64(sc.TraceID[0:8], uint64(i+1))
ctx := apitrace.ContextWithRemoteSpanContext(context.Background(), sc)
_, span := tr.Start(ctx, option.name)
span.End()
wg.Add(1)
f := func(sc core.SpanContext) {
ctx := apitrace.ContextWithRemoteSpanContext(context.Background(), sc)
_, span := tr.Start(ctx, option.name)
span.End()
wg.Done()
}
if parallel {
go f(sc)
} else {
f(sc)
}
}
wg.Wait()
}
func getSpanContext() core.SpanContext {