1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Add ForceFlush() method to SpanProcessor interface (#1166)

* Add ForceFlush() method to SpanProcessor interface

* Add a stub implementation to SimpleSpanProcessor
* Add a working implementation to BatchSpanProcessor

* add CHANGELOG.md entry

* Eliminate sleep from BatchSpanProcessor.ForceFlush() test

* Generating test spans serially should reduce test flakiness
This commit is contained in:
Anthony Mirabella
2020-09-20 13:35:44 -04:00
committed by GitHub
parent a12224a454
commit 995be31f42
6 changed files with 85 additions and 7 deletions

View File

@@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- In the `go.opentelemetry.io/otel/api/trace` package, `NewTracerConfig` was added to construct new `TracerConfig`s. - In the `go.opentelemetry.io/otel/api/trace` package, `NewTracerConfig` was added to construct new `TracerConfig`s.
This addition was made to conform with our project option conventions. (#1155) This addition was made to conform with our project option conventions. (#1155)
- Instrumentation library information was added to the Zipkin exporter. (#1119) - Instrumentation library information was added to the Zipkin exporter. (#1119)
- The `SpanProcessor` interface now has a `ForceFlush()` method. (#1166)
- More semantic conventions for k8s as resource attributes. (#1167) - More semantic conventions for k8s as resource attributes. (#1167)
### Changed ### Changed

View File

@@ -66,11 +66,12 @@ type BatchSpanProcessor struct {
queue chan *export.SpanData queue chan *export.SpanData
dropped uint32 dropped uint32
batch []*export.SpanData batch []*export.SpanData
timer *time.Timer batchMutex sync.Mutex
stopWait sync.WaitGroup timer *time.Timer
stopOnce sync.Once stopWait sync.WaitGroup
stopCh chan struct{} stopOnce sync.Once
stopCh chan struct{}
} }
var _ SpanProcessor = (*BatchSpanProcessor)(nil) var _ SpanProcessor = (*BatchSpanProcessor)(nil)
@@ -131,6 +132,11 @@ func (bsp *BatchSpanProcessor) Shutdown() {
}) })
} }
// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *BatchSpanProcessor) ForceFlush() {
bsp.exportSpans()
}
func WithMaxQueueSize(size int) BatchSpanProcessorOption { func WithMaxQueueSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) { return func(o *BatchSpanProcessorOptions) {
o.MaxQueueSize = size o.MaxQueueSize = size
@@ -159,6 +165,9 @@ func WithBlocking() BatchSpanProcessorOption {
func (bsp *BatchSpanProcessor) exportSpans() { func (bsp *BatchSpanProcessor) exportSpans() {
bsp.timer.Reset(bsp.o.BatchTimeout) bsp.timer.Reset(bsp.o.BatchTimeout)
bsp.batchMutex.Lock()
defer bsp.batchMutex.Unlock()
if len(bsp.batch) > 0 { if len(bsp.batch) > 0 {
if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil { if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil {
global.Handle(err) global.Handle(err)
@@ -180,8 +189,11 @@ func (bsp *BatchSpanProcessor) processQueue() {
case <-bsp.timer.C: case <-bsp.timer.C:
bsp.exportSpans() bsp.exportSpans()
case sd := <-bsp.queue: case sd := <-bsp.queue:
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd) bsp.batch = append(bsp.batch, sd)
if len(bsp.batch) == bsp.o.MaxExportBatchSize { shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()
if shouldExport {
if !bsp.timer.Stop() { if !bsp.timer.Stop() {
<-bsp.timer.C <-bsp.timer.C
} }
@@ -202,8 +214,12 @@ func (bsp *BatchSpanProcessor) drainQueue() {
return return
} }
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd) bsp.batch = append(bsp.batch, sd)
if len(bsp.batch) == bsp.o.MaxExportBatchSize { shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()
if shouldExport {
bsp.exportSpans() bsp.exportSpans()
} }
default: default:

View File

@@ -64,6 +64,7 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
// These should not panic. // These should not panic.
bsp.OnStart(&export.SpanData{}) bsp.OnStart(&export.SpanData{})
bsp.OnEnd(&export.SpanData{}) bsp.OnEnd(&export.SpanData{})
bsp.ForceFlush()
bsp.Shutdown() bsp.Shutdown()
} }
@@ -180,6 +181,53 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
} }
} }
func TestBatchSpanProcessorForceFlush(t *testing.T) {
option := testOption{
name: "ForceFlush()",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(10 * time.Second),
sdktrace.WithMaxQueueSize(2000),
sdktrace.WithMaxExportBatchSize(2000),
},
wantNumSpans: 205,
wantBatchCount: 1,
genNumSpans: 205,
}
te := testBatchExporter{}
tp := basicProvider(t)
ssp := createAndRegisterBatchSP(option, &te)
if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOptions")
generateSpan(t, false, tr, option)
ssp.ForceFlush()
gotNumOfSpans := te.len()
if 0 == gotNumOfSpans {
t.Errorf("number of flushed spans is zero")
}
tp.UnregisterSpanProcessor(ssp)
gotNumOfSpans = te.len()
if option.wantNumSpans != gotNumOfSpans {
t.Errorf("number of exported span: got %+v, want %+v\n",
gotNumOfSpans, option.wantNumSpans)
}
gotBatchCount := te.getBatchCount()
if gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}
}
func createAndRegisterBatchSP(option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor { func createAndRegisterBatchSP(option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor {
// Always use blocking queue to avoid flaky tests. // Always use blocking queue to avoid flaky tests.
options := append(option.o, sdktrace.WithBlocking()) options := append(option.o, sdktrace.WithBlocking())

View File

@@ -54,3 +54,7 @@ func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) {
// Shutdown method does nothing. There is no data to cleanup. // Shutdown method does nothing. There is no data to cleanup.
func (ssp *SimpleSpanProcessor) Shutdown() { func (ssp *SimpleSpanProcessor) Shutdown() {
} }
// ForceFlush does nothing as there is no data to flush.
func (ssp *SimpleSpanProcessor) ForceFlush() {
}

View File

@@ -35,6 +35,12 @@ type SpanProcessor interface {
// data. No calls to OnStart and OnEnd method is invoked after Shutdown call is // data. No calls to OnStart and OnEnd method is invoked after Shutdown call is
// made. It should not be blocked indefinitely. // made. It should not be blocked indefinitely.
Shutdown() Shutdown()
// ForceFlush exports all ended spans to the configured Exporter that have not yet
// been exported. It should only be called when absolutely necessary, such as when
// using a FaaS provider that may suspend the process after an invocation, but before
// the Processor can export the completed spans.
ForceFlush()
} }
type spanProcessorMap map[SpanProcessor]*sync.Once type spanProcessorMap map[SpanProcessor]*sync.Once

View File

@@ -39,6 +39,9 @@ func (t *testSpanProcesor) Shutdown() {
t.shutdownCount++ t.shutdownCount++
} }
func (t *testSpanProcesor) ForceFlush() {
}
func TestRegisterSpanProcessort(t *testing.T) { func TestRegisterSpanProcessort(t *testing.T) {
name := "Register span processor before span starts" name := "Register span processor before span starts"
tp := basicProvider(t) tp := basicProvider(t)