diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index f3845b1dc..4b320eb4d 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -17,6 +17,7 @@ package trace import ( "context" "errors" + "log" "sync" "sync/atomic" "time" @@ -70,9 +71,10 @@ type BatchSpanProcessor struct { queue chan *export.SpanData dropped uint32 - stopWait sync.WaitGroup - stopOnce sync.Once - stopCh chan struct{} + enqueueWait sync.WaitGroup + stopWait sync.WaitGroup + stopOnce sync.Once + stopCh chan struct{} } var _ SpanProcessor = (*BatchSpanProcessor)(nil) @@ -192,45 +194,58 @@ loop: } } - // Consume queue before close to unblock enqueue and prevent - // "panic: send on closed channel". + go func() { + bsp.enqueueWait.Wait() + close(bsp.queue) + }() + for { + if !timer.Stop() { + <-timer.C + } + const waitTimeout = 30 * time.Second + timer.Reset(waitTimeout) + select { case sd := <-bsp.queue: if sd == nil { exportSpans() return } + if sd.SpanContext.IsSampled() { batch = append(batch, sd) if len(batch) == bsp.o.MaxExportBatchSize { exportSpans() } } - default: - close(bsp.queue) + case <-timer.C: + log.Println("bsp.enqueueWait timeout") + exportSpans() + return } } } func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) { + bsp.enqueueWait.Add(1) + select { case <-bsp.stopCh: + bsp.enqueueWait.Done() return default: } + if bsp.o.BlockOnQueueFull { bsp.queue <- sd } else { - var ok bool select { case bsp.queue <- sd: - ok = true default: - ok = false - } - if !ok { atomic.AddUint32(&bsp.dropped, 1) } } + + bsp.enqueueWait.Done() }