From e399d355cb37fa1c1680b92fbe63216393d98e5f Mon Sep 17 00:00:00 2001 From: Gustavo Silva Paiva Date: Thu, 29 Apr 2021 13:42:11 -0300 Subject: [PATCH] drop failed to exporter batches and return error when forcing flush a span processor (#1860) * drop failed to exporter batches and return error when forcing flush a span processor * changelog * changelog * change should export condition * cleanup --- CHANGELOG.md | 2 + sdk/trace/batch_span_processor.go | 23 +++++--- sdk/trace/batch_span_processor_test.go | 77 ++++++++++++++++++++++++-- 3 files changed, 87 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 27d539e6b..c47a7e89f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Make `NewSplitDriver` from `go.opentelemetry.io/otel/exporters/otlp` take variadic arguments instead of a `SplitConfig` item. `NewSplitDriver` now automatically implements an internal `noopDriver` for `SplitConfig` fields that are not initialized. (#1798) +- BatchSpanProcessor now report export failures when calling `ForceFlush()` method. (#1860) ### Deprecated @@ -23,6 +24,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Only report errors from the `"go.opentelemetry.io/otel/sdk/resource".Environment` function when they are not `nil`. (#1850, #1851) - The `Shutdown` method of the simple `SpanProcessor` in the `go.opentelemetry.io/otel/sdk/trace` package now honors the context deadline or cancellation. (#1616, #1856) +- BatchSpanProcessor now drops span batches that failed to be exported. (#1860) ### Security diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index f63aa7a94..6687839ee 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -156,16 +156,14 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error { func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error { var err error if bsp.e != nil { - wait := make(chan struct{}) + wait := make(chan error) go func() { - if err := bsp.exportSpans(ctx); err != nil { - otel.Handle(err) - } + wait <- bsp.exportSpans(ctx) close(wait) }() // Wait until the export is finished or the context is cancelled/timed out select { - case <-wait: + case err = <-wait: case <-ctx.Done(): err = ctx.Err() } @@ -216,11 +214,18 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { defer cancel() } - if len(bsp.batch) > 0 { - if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil { + if l := len(bsp.batch); l > 0 { + err := bsp.e.ExportSpans(ctx, bsp.batch) + + // A new batch is always created after exporting, even if the batch failed to be exported. + // + // It is up to the exporter to implement any type of retry logic if a batch is failing + // to be exported, since it is specific to the protocol and backend being sent to. + bsp.batch = bsp.batch[:0] + + if err != nil { return err } - bsp.batch = bsp.batch[:0] } return nil } @@ -244,7 +249,7 @@ func (bsp *batchSpanProcessor) processQueue() { case sd := <-bsp.queue: bsp.batchMutex.Lock() bsp.batch = append(bsp.batch, sd) - shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize + shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize bsp.batchMutex.Unlock() if shouldExport { if !bsp.timer.Stop() { diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index db7ead846..a77e881f5 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -37,6 +37,9 @@ type testBatchExporter struct { batchCount int shutdownCount int delay time.Duration + errors []error + droppedCount int + idx int err error } @@ -44,6 +47,13 @@ func (t *testBatchExporter) ExportSpans(ctx context.Context, ss []*sdktrace.Span t.mu.Lock() defer t.mu.Unlock() + if t.idx < len(t.errors) { + t.droppedCount += len(ss) + err := t.errors[t.idx] + t.idx++ + return err + } + time.Sleep(t.delay) select { @@ -338,12 +348,8 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) { // Force flush any held span batches err := ssp.ForceFlush(context.Background()) - gotNumOfSpans := te.len() - spanDifference := option.wantNumSpans - gotNumOfSpans - if spanDifference > 10 || spanDifference < 0 { - t.Errorf("number of exported span not equal to or within 10 less than: got %+v, want %+v\n", - gotNumOfSpans, option.wantNumSpans) - } + assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10) + gotBatchCount := te.getBatchCount() if gotBatchCount < option.wantBatchCount { t.Errorf("number batches: got %+v, want >= %+v\n", @@ -353,6 +359,65 @@ func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) { assert.NoError(t, err) } +func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) { + te := testBatchExporter{ + errors: []error{errors.New("fail to export")}, + } + tp := basicTracerProvider(t) + option := testOption{ + o: []sdktrace.BatchSpanProcessorOption{ + sdktrace.WithMaxQueueSize(0), + sdktrace.WithMaxExportBatchSize(2000), + }, + wantNumSpans: 1000, + wantBatchCount: 1, + genNumSpans: 1000, + } + ssp := createAndRegisterBatchSP(option, &te) + if ssp == nil { + t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) + } + tp.RegisterSpanProcessor(ssp) + tr := tp.Tracer("BatchSpanProcessorWithOption") + generateSpan(t, option.parallel, tr, option) + + // Force flush any held span batches + err := ssp.ForceFlush(context.Background()) + assert.Error(t, err) + assert.EqualError(t, err, "fail to export") + + // First flush will fail, nothing should be exported. + assertMaxSpanDiff(t, te.droppedCount, option.wantNumSpans, 10) + assert.Equal(t, 0, te.len()) + assert.Equal(t, 0, te.getBatchCount()) + + // Generate a new batch, this will succeed + generateSpan(t, option.parallel, tr, option) + + // Force flush any held span batches + err = ssp.ForceFlush(context.Background()) + assert.NoError(t, err) + + assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10) + gotBatchCount := te.getBatchCount() + if gotBatchCount < option.wantBatchCount { + t.Errorf("number batches: got %+v, want >= %+v\n", + gotBatchCount, option.wantBatchCount) + t.Errorf("Batches %v\n", te.sizes) + } +} + +func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) { + spanDifference := want - got + if spanDifference < 0 { + spanDifference = spanDifference * -1 + } + if spanDifference > maxDif { + t.Errorf("number of exported span not equal to or within %d less than: got %+v, want %+v\n", + maxDif, got, want) + } +} + func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) { var bp testBatchExporter bsp := sdktrace.NewBatchSpanProcessor(&bp)