1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-06-25 00:16:49 +02:00

Remove buggy enqueueWait

This commit is contained in:
Vladimir Mihailenco
2020-05-16 10:19:46 +03:00
parent b2285e0c71
commit 4408b6e328

View File

@ -17,7 +17,6 @@ package trace
import (
"context"
"errors"
"log"
"sync"
"sync/atomic"
"time"
@ -71,10 +70,9 @@ type BatchSpanProcessor struct {
queue chan *export.SpanData
dropped uint32
enqueueWait sync.WaitGroup
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
}
var _ SpanProcessor = (*BatchSpanProcessor)(nil)
@ -192,24 +190,11 @@ loop:
}
}
go func() {
bsp.enqueueWait.Wait()
close(bsp.queue)
}()
for {
if !timer.Stop() {
<-timer.C
}
// This is not needed normally, but use some timeout so we are not stuck
// waiting for enqueueWait forever.
const waitTimeout = 30 * time.Second
timer.Reset(waitTimeout)
select {
case sd := <-bsp.queue:
if sd == nil { // queue is closed
go throwAwayFutureSends(bsp.queue)
exportSpans()
return
}
@ -218,10 +203,18 @@ loop:
if len(batch) == bsp.o.MaxExportBatchSize {
exportSpans()
}
case <-timer.C:
//TODO: use error callback - see issue #174
log.Println("bsp.enqueueWait timeout")
exportSpans()
default:
// Send nil instead of closing to prevent "send on closed channel".
bsp.queue <- nil
}
}
}
func throwAwayFutureSends(ch <-chan *export.SpanData) {
for {
select {
case <-ch:
case <-time.After(time.Minute):
return
}
}
@ -232,11 +225,8 @@ func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
return
}
bsp.enqueueWait.Add(1)
select {
case <-bsp.stopCh:
bsp.enqueueWait.Done()
return
default:
}
@ -250,6 +240,4 @@ func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
atomic.AddUint32(&bsp.dropped, 1)
}
}
bsp.enqueueWait.Done()
}