1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-04-11 11:21:59 +02:00

Add proper enqueue sync

This commit is contained in:
Vladimir Mihailenco 2020-05-14 14:25:41 +03:00
parent ab19dddd0f
commit 774889cbfa

View File

@ -17,6 +17,7 @@ package trace
import (
"context"
"errors"
"log"
"sync"
"sync/atomic"
"time"
@ -70,9 +71,10 @@ type BatchSpanProcessor struct {
queue chan *export.SpanData
dropped uint32
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
enqueueWait sync.WaitGroup
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
}
var _ SpanProcessor = (*BatchSpanProcessor)(nil)
@ -192,45 +194,58 @@ loop:
}
}
// Consume queue before close to unblock enqueue and prevent
// "panic: send on closed channel".
go func() {
bsp.enqueueWait.Wait()
close(bsp.queue)
}()
for {
if !timer.Stop() {
<-timer.C
}
const waitTimeout = 30 * time.Second
timer.Reset(waitTimeout)
select {
case sd := <-bsp.queue:
if sd == nil {
exportSpans()
return
}
if sd.SpanContext.IsSampled() {
batch = append(batch, sd)
if len(batch) == bsp.o.MaxExportBatchSize {
exportSpans()
}
}
default:
close(bsp.queue)
case <-timer.C:
log.Println("bsp.enqueueWait timeout")
exportSpans()
return
}
}
}
func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
bsp.enqueueWait.Add(1)
select {
case <-bsp.stopCh:
bsp.enqueueWait.Done()
return
default:
}
if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
} else {
var ok bool
select {
case bsp.queue <- sd:
ok = true
default:
ok = false
}
if !ok {
atomic.AddUint32(&bsp.dropped, 1)
}
}
bsp.enqueueWait.Done()
}