From 79de90a31305151698d8136ae2848b1c01371160 Mon Sep 17 00:00:00 2001 From: Rahul Patel Date: Thu, 5 Mar 2020 13:41:00 -0800 Subject: [PATCH] fix data race in BatchedSpanProcessor (#518) * fix data race in BatchedSpanProcessor - fixes #517 * fix ci. * fix another test. * move wait group to generateSpan func. --- sdk/trace/batch_span_processor.go | 3 +- sdk/trace/batch_span_processor_test.go | 50 +++++++++++++++----------- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 05a039830..302fdd216 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -18,6 +18,7 @@ import ( "context" "errors" "sync" + "sync/atomic" "time" export "go.opentelemetry.io/otel/sdk/export/trace" @@ -206,7 +207,7 @@ func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) { ok = false } if !ok { - bsp.dropped++ + atomic.AddUint32(&bsp.dropped, 1) } } } diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 763317023..d291ab1ca 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -55,12 +55,6 @@ func (t *testBatchExporter) getBatchCount() int { return t.batchCount } -func (t *testBatchExporter) get(idx int) *export.SpanData { - t.mu.Lock() - defer t.mu.Unlock() - return t.spans[idx] -} - var _ export.SpanBatcher = (*testBatchExporter)(nil) func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) { @@ -77,6 +71,7 @@ type testOption struct { wantBatchCount int genNumSpans int waitTime time.Duration + parallel bool } func TestNewBatchSpanProcessorWithOptions(t *testing.T) { @@ -136,18 +131,30 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) { genNumSpans: 205, waitTime: waitTime, }, + { + name: "parallel span generation", + o: []sdktrace.BatchSpanProcessorOption{ + sdktrace.WithScheduleDelayMillis(schDelay), + sdktrace.WithMaxQueueSize(200), + }, + wantNumSpans: 200, + wantBatchCount: 1, + genNumSpans: 205, + waitTime: waitTime, + parallel: true, + }, } for _, option := range options { te := testBatchExporter{} tp := basicProvider(t) ssp := createAndRegisterBatchSP(t, option, &te) if ssp == nil { - t.Errorf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) + t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) } tp.RegisterSpanProcessor(ssp) tr := tp.Tracer("BatchSpanProcessorWithOptions") - generateSpan(t, tr, option) + generateSpan(t, option.parallel, tr, option) time.Sleep(option.waitTime) @@ -162,14 +169,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) { t.Errorf("Batches %v\n", te.sizes) } - // Check first Span is reported. Most recent one is dropped. - sc := getSpanContext() - wantTraceID := sc.TraceID - binary.BigEndian.PutUint64(wantTraceID[0:8], uint64(1)) - gotTraceID := te.get(0).SpanContext.TraceID - if wantTraceID != gotTraceID { - t.Errorf("%s: first exported span: got %+v, want %+v\n", option.name, gotTraceID, wantTraceID) - } tp.UnregisterSpanProcessor(ssp) } } @@ -182,15 +181,26 @@ func createAndRegisterBatchSP(t *testing.T, option testOption, te *testBatchExpo return ssp } -func generateSpan(t *testing.T, tr apitrace.Tracer, option testOption) { +func generateSpan(t *testing.T, parallel bool, tr apitrace.Tracer, option testOption) { sc := getSpanContext() + wg := &sync.WaitGroup{} for i := 0; i < option.genNumSpans; i++ { binary.BigEndian.PutUint64(sc.TraceID[0:8], uint64(i+1)) - ctx := apitrace.ContextWithRemoteSpanContext(context.Background(), sc) - _, span := tr.Start(ctx, option.name) - span.End() + wg.Add(1) + f := func(sc core.SpanContext) { + ctx := apitrace.ContextWithRemoteSpanContext(context.Background(), sc) + _, span := tr.Start(ctx, option.name) + span.End() + wg.Done() + } + if parallel { + go f(sc) + } else { + f(sc) + } } + wg.Wait() } func getSpanContext() core.SpanContext {