1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-06-25 00:16:49 +02:00

Rewrite processQueue for better batching

This commit is contained in:
Vladimir Mihailenco
2020-05-06 17:46:54 +03:00
parent f1112a4fbb
commit 2719c0ac16

View File

@ -103,23 +103,10 @@ func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOptio
bsp.stopCh = make(chan struct{}) bsp.stopCh = make(chan struct{})
// Start timer to export spans.
ticker := time.NewTicker(bsp.o.ScheduledDelayMillis)
bsp.stopWait.Add(1) bsp.stopWait.Add(1)
go func() { go func() {
defer ticker.Stop() defer bsp.stopWait.Done()
batch := make([]*export.SpanData, 0, bsp.o.MaxExportBatchSize) bsp.processQueue()
for {
select {
case <-bsp.stopCh:
bsp.processQueue(&batch)
close(bsp.queue)
bsp.stopWait.Done()
return
case <-ticker.C:
bsp.processQueue(&batch)
}
}
}() }()
return bsp, nil return bsp, nil
@ -167,32 +154,58 @@ func WithBlocking() BatchSpanProcessorOption {
} }
} }
// processQueue removes spans from the `queue` channel until there is // processQueue removes spans from the `queue` channel until processor
// no more data. It calls the exporter in batches of up to // is shut down. It calls the exporter in batches of up to MaxExportBatchSize
// MaxExportBatchSize until all the available data have been processed. // waiting up to ScheduledDelayMillis to form a batch.
func (bsp *BatchSpanProcessor) processQueue(batch *[]*export.SpanData) { func (bsp *BatchSpanProcessor) processQueue() {
ticker := time.NewTicker(bsp.o.ScheduledDelayMillis)
defer ticker.Stop()
batch := make([]*export.SpanData, 0, bsp.o.MaxExportBatchSize)
exportSpans := func() {
if len(batch) > 0 {
bsp.e.ExportSpans(context.Background(), batch)
batch = batch[:0]
}
}
loop:
for { for {
// Read spans until either the buffer fills or the select {
// queue is empty. case <-bsp.stopCh:
for ok := true; ok && len(*batch) < bsp.o.MaxExportBatchSize; { break loop
select { case <-ticker.C:
case sd := <-bsp.queue: exportSpans()
if sd != nil && sd.SpanContext.IsSampled() { case sd := <-bsp.queue:
*batch = append(*batch, sd) if sd.SpanContext.IsSampled() {
batch = append(batch, sd)
if len(batch) == bsp.o.MaxExportBatchSize {
ticker.Reset(bsp.o.ScheduledDelayMillis)
exportSpans()
} }
default:
ok = false
} }
} }
}
if len(*batch) == 0 { // Consume queue before close to unblock enqueue and prevent
return // "panic: send on closed channel".
for {
select {
case sd := <-bsp.queue:
if sd == nil {
exportSpans()
return
}
if sd.SpanContext.IsSampled() {
batch = append(batch, sd)
if len(batch) == bsp.o.MaxExportBatchSize {
exportSpans()
}
}
default:
close(bsp.queue)
} }
// Send one batch, then continue reading until the
// buffer is empty.
bsp.e.ExportSpans(context.Background(), *batch)
*batch = (*batch)[:0]
} }
} }