1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-04-11 11:21:59 +02:00

Empty queued spans when ForceFlush called (#2335)

* Empty queued spans when ForceFlush called

Update the implementation of ForceFlush() to first ensure that all spans
which are queued are added to the batch before calling export spans.
Create a small ReadOnlySpan implementation which can be used as a marker
that ForceFlush has been invoked and used to notify when all spans are
ready to be exported.

Fixes #2080.

* Add a changelog entry.

* Update CHANGELOG.md

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Update sdk/trace/batch_span_processor.go

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>

* Improve test case to verify multiple flushes.

* Refactor code to use enqueue.

* Be more defensive on waiting for queue.

Update the handling of the force flush span so we only wait on the
channel if we were able to enqueue the span to the queue.

* Fix linter.

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
This commit is contained in:
Philip K. Warren 2021-11-05 10:34:57 -05:00 committed by GitHub
parent 7ce58f3558
commit 6d2aeb0dc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 65 additions and 5 deletions

View File

@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The following interface types simply moved from `metric` to `metric/sdkapi`: `Descriptor`, `MeterImpl`, `InstrumentImpl`, `SyncImpl`, `BoundSyncImpl`, `AsyncImpl`, `AsyncRunner`, `AsyncSingleRunner`, and `AsyncBatchRunner`
- The following struct types moved and are replaced with type aliases, since they are exposed to the user: `Observation`, `Measurement`.
- The No-op implementations of sync and async instruments are no longer exported, new functions `sdkapi.NewNoopAsyncInstrument()` and `sdkapi.NewNoopSyncInstrument()` are provided instead. (#2271)
- Update the SDK `BatchSpanProcessor` to export all queued spans when `ForceFlush` is called. (#2080, #2335)
### Added

View File

@ -22,6 +22,7 @@ import (
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
// Defaults for BatchSpanProcessorOptions.
@ -153,10 +154,29 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
return err
}
type forceFlushSpan struct {
ReadOnlySpan
flushed chan struct{}
}
func (f forceFlushSpan) SpanContext() trace.SpanContext {
return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
}
// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
var err error
if bsp.e != nil {
flushCh := make(chan struct{})
if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}, true) {
select {
case <-flushCh:
// Processed any items in queue prior to ForceFlush being called
case <-ctx.Done():
return ctx.Err()
}
}
wait := make(chan error)
go func() {
wait <- bsp.exportSpans(ctx)
@ -248,6 +268,10 @@ func (bsp *batchSpanProcessor) processQueue() {
otel.Handle(err)
}
case sd := <-bsp.queue:
if ffs, ok := sd.(forceFlushSpan); ok {
close(ffs.flushed)
continue
}
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
@ -296,8 +320,12 @@ func (bsp *batchSpanProcessor) drainQueue() {
}
func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
bsp.enqueueBlockOnQueueFull(context.TODO(), sd, bsp.o.BlockOnQueueFull)
}
func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan, block bool) bool {
if !sd.SpanContext().IsSampled() {
return
return false
}
// This ensures the bsp.queue<- below does not panic as the
@ -317,18 +345,24 @@ func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
select {
case <-bsp.stopCh:
return
return false
default:
}
if bsp.o.BlockOnQueueFull {
bsp.queue <- sd
return
if block {
select {
case bsp.queue <- sd:
return true
case <-ctx.Done():
return false
}
}
select {
case bsp.queue <- sd:
return true
default:
atomic.AddUint32(&bsp.dropped, 1)
}
return false
}

View File

@ -18,6 +18,7 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"testing"
"time"
@ -25,6 +26,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
@ -457,3 +460,25 @@ func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
t.Errorf("expected %q error, got %v", want, got)
}
}
func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
ctx := context.Background()
exp := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
)
tracer := tp.Tracer("tracer")
for i := 0; i < 10; i++ {
_, span := tracer.Start(ctx, fmt.Sprintf("span%d", i))
span.End()
err := tp.ForceFlush(ctx)
assert.NoError(t, err)
assert.Len(t, exp.GetSpans(), i+1)
}
}