diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 4030e0a34..db647d97b 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -105,9 +105,10 @@ func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOptio queue: make(chan *export.SpanData, o.MaxQueueSize), stopCh: make(chan struct{}), } - bsp.stopWait.Add(1) + bsp.stopWait.Add(1) go func() { + defer bsp.stopWait.Done() bsp.processQueue() bsp.drainQueue() }() @@ -130,8 +131,6 @@ func (bsp *BatchSpanProcessor) Shutdown() { bsp.stopOnce.Do(func() { close(bsp.stopCh) bsp.stopWait.Wait() - close(bsp.queue) - }) } @@ -173,7 +172,6 @@ func (bsp *BatchSpanProcessor) exportSpans() { // is shut down. It calls the exporter in batches of up to MaxExportBatchSize // waiting up to BatchTimeout to form a batch. func (bsp *BatchSpanProcessor) processQueue() { - defer bsp.stopWait.Done() defer bsp.timer.Stop() for { @@ -197,13 +195,22 @@ func (bsp *BatchSpanProcessor) processQueue() { // drainQueue awaits the any caller that had added to bsp.stopWait // to finish the enqueue, then exports the final batch. func (bsp *BatchSpanProcessor) drainQueue() { - for sd := range bsp.queue { - bsp.batch = append(bsp.batch, sd) - if len(bsp.batch) == bsp.o.MaxExportBatchSize { - bsp.exportSpans() + for { + select { + case sd := <-bsp.queue: + if sd == nil { + bsp.exportSpans() + return + } + + bsp.batch = append(bsp.batch, sd) + if len(bsp.batch) == bsp.o.MaxExportBatchSize { + bsp.exportSpans() + } + default: + close(bsp.queue) } } - bsp.exportSpans() } func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) { @@ -226,17 +233,19 @@ func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) { panic(x) }() + select { + case <-bsp.stopCh: + return + default: + } + if bsp.o.BlockOnQueueFull { - select { - case bsp.queue <- sd: - case <-bsp.stopCh: - } + bsp.queue <- sd return } select { case bsp.queue <- sd: - case <-bsp.stopCh: default: atomic.AddUint32(&bsp.dropped, 1) } diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 3e80a20d3..373cc18b6 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -117,7 +117,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) { sdktrace.WithBatchTimeout(schDelay), sdktrace.WithMaxQueueSize(200), sdktrace.WithMaxExportBatchSize(20), - sdktrace.WithBlocking(), }, wantNumSpans: 205, wantBatchCount: 11, @@ -139,7 +138,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) { o: []sdktrace.BatchSpanProcessorOption{ sdktrace.WithBatchTimeout(schDelay), sdktrace.WithMaxExportBatchSize(200), - sdktrace.WithBlocking(), }, wantNumSpans: 2000, wantBatchCount: 10, @@ -162,18 +160,26 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) { tp.UnregisterSpanProcessor(ssp) - // TODO(https://github.com/open-telemetry/opentelemetry-go/issues/741) - // Restore some sort of test here. - _ = option.wantNumSpans - _ = option.wantBatchCount - _ = te.len() // gotNumOfSpans - _ = te.getBatchCount() // gotBatchCount + gotNumOfSpans := te.len() + if option.wantNumSpans != gotNumOfSpans { + t.Errorf("number of exported span: got %+v, want %+v\n", + gotNumOfSpans, option.wantNumSpans) + } + + gotBatchCount := te.getBatchCount() + if gotBatchCount < option.wantBatchCount { + t.Errorf("number batches: got %+v, want >= %+v\n", + gotBatchCount, option.wantBatchCount) + t.Errorf("Batches %v\n", te.sizes) + } }) } } func createAndRegisterBatchSP(t *testing.T, option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor { - ssp, err := sdktrace.NewBatchSpanProcessor(te, option.o...) + // Always use blocking queue to avoid flaky tests. + options := append(option.o, sdktrace.WithBlocking()) + ssp, err := sdktrace.NewBatchSpanProcessor(te, options...) if ssp == nil { t.Errorf("%s: Error creating new instance of BatchSpanProcessor, error: %v\n", option.name, err) }