1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Merge pull request #719 from vmihailenco/master

Rewrite processQueue for better batching
This commit is contained in:
Tyler Yahn
2020-05-18 11:21:37 -07:00
committed by GitHub
2 changed files with 68 additions and 57 deletions

View File

@@ -103,23 +103,10 @@ func NewBatchSpanProcessor(e export.SpanBatcher, opts ...BatchSpanProcessorOptio
bsp.stopCh = make(chan struct{})
// Start timer to export spans.
ticker := time.NewTicker(bsp.o.ScheduledDelayMillis)
bsp.stopWait.Add(1)
go func() {
defer ticker.Stop()
batch := make([]*export.SpanData, 0, bsp.o.MaxExportBatchSize)
for {
select {
case <-bsp.stopCh:
bsp.processQueue(&batch)
close(bsp.queue)
bsp.stopWait.Done()
return
case <-ticker.C:
bsp.processQueue(&batch)
}
}
defer bsp.stopWait.Done()
bsp.processQueue()
}()
return bsp, nil
@@ -167,52 +154,89 @@ func WithBlocking() BatchSpanProcessorOption {
}
}
// processQueue removes spans from the `queue` channel until there is
// no more data. It calls the exporter in batches of up to
// MaxExportBatchSize until all the available data have been processed.
func (bsp *BatchSpanProcessor) processQueue(batch *[]*export.SpanData) {
// processQueue removes spans from the `queue` channel until processor
// is shut down. It calls the exporter in batches of up to MaxExportBatchSize
// waiting up to ScheduledDelayMillis to form a batch.
func (bsp *BatchSpanProcessor) processQueue() {
timer := time.NewTimer(bsp.o.ScheduledDelayMillis)
defer timer.Stop()
batch := make([]*export.SpanData, 0, bsp.o.MaxExportBatchSize)
exportSpans := func() {
timer.Reset(bsp.o.ScheduledDelayMillis)
if len(batch) > 0 {
bsp.e.ExportSpans(context.Background(), batch)
batch = batch[:0]
}
}
loop:
for {
// Read spans until either the buffer fills or the
// queue is empty.
for ok := true; ok && len(*batch) < bsp.o.MaxExportBatchSize; {
select {
case sd := <-bsp.queue:
if sd != nil && sd.SpanContext.IsSampled() {
*batch = append(*batch, sd)
select {
case <-bsp.stopCh:
break loop
case <-timer.C:
exportSpans()
case sd := <-bsp.queue:
batch = append(batch, sd)
if len(batch) == bsp.o.MaxExportBatchSize {
if !timer.Stop() {
<-timer.C
}
default:
ok = false
exportSpans()
}
}
}
if len(*batch) == 0 {
for {
select {
case sd := <-bsp.queue:
if sd == nil { // queue is closed
go throwAwayFutureSends(bsp.queue)
exportSpans()
return
}
batch = append(batch, sd)
if len(batch) == bsp.o.MaxExportBatchSize {
exportSpans()
}
default:
// Send nil instead of closing to prevent "send on closed channel".
bsp.queue <- nil
}
}
}
func throwAwayFutureSends(ch <-chan *export.SpanData) {
for {
select {
case <-ch:
case <-time.After(time.Minute):
return
}
// Send one batch, then continue reading until the
// buffer is empty.
bsp.e.ExportSpans(context.Background(), *batch)
*batch = (*batch)[:0]
}
}
func (bsp *BatchSpanProcessor) enqueue(sd *export.SpanData) {
if !sd.SpanContext.IsSampled() {
return
}
select {
case <-bsp.stopCh:
return
default:
}
if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
} else {
var ok bool
select {
case bsp.queue <- sd:
ok = true
default:
ok = false
}
if !ok {
atomic.AddUint32(&bsp.dropped, 1)
}
}

View File

@@ -69,30 +69,26 @@ type testOption struct {
wantNumSpans int
wantBatchCount int
genNumSpans int
waitTime time.Duration
parallel bool
}
func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
schDelay := 200 * time.Millisecond
waitTime := schDelay + 100*time.Millisecond
options := []testOption{
{
name: "default BatchSpanProcessorOptions",
wantNumSpans: 2048,
wantNumSpans: 2053,
wantBatchCount: 4,
genNumSpans: 2053,
waitTime: 5100 * time.Millisecond,
},
{
name: "non-default ScheduledDelayMillis",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithScheduleDelayMillis(schDelay),
},
wantNumSpans: 2048,
wantNumSpans: 2053,
wantBatchCount: 4,
genNumSpans: 2053,
waitTime: waitTime,
},
{
name: "non-default MaxQueueSize and ScheduledDelayMillis",
@@ -100,10 +96,9 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
sdktrace.WithScheduleDelayMillis(schDelay),
sdktrace.WithMaxQueueSize(200),
},
wantNumSpans: 200,
wantNumSpans: 205,
wantBatchCount: 1,
genNumSpans: 205,
waitTime: waitTime,
},
{
name: "non-default MaxQueueSize, ScheduledDelayMillis and MaxExportBatchSize",
@@ -112,10 +107,9 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
sdktrace.WithMaxQueueSize(205),
sdktrace.WithMaxExportBatchSize(20),
},
wantNumSpans: 205,
wantNumSpans: 210,
wantBatchCount: 11,
genNumSpans: 210,
waitTime: waitTime,
},
{
name: "blocking option",
@@ -128,7 +122,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
wantNumSpans: 205,
wantBatchCount: 11,
genNumSpans: 205,
waitTime: waitTime,
},
{
name: "parallel span generation",
@@ -136,10 +129,9 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
sdktrace.WithScheduleDelayMillis(schDelay),
sdktrace.WithMaxQueueSize(200),
},
wantNumSpans: 200,
wantNumSpans: 205,
wantBatchCount: 1,
genNumSpans: 205,
waitTime: waitTime,
parallel: true,
},
{
@@ -152,7 +144,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
wantNumSpans: 2000,
wantBatchCount: 10,
genNumSpans: 2000,
waitTime: waitTime,
parallel: true,
},
}
@@ -168,8 +159,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
generateSpan(t, option.parallel, tr, option)
time.Sleep(option.waitTime)
tp.UnregisterSpanProcessor(ssp)
gotNumOfSpans := te.len()
@@ -182,8 +171,6 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
t.Errorf("%s: number batches: got %+v, want >= %+v\n", option.name, gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}
tp.UnregisterSpanProcessor(ssp)
}
}