You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-06-25 00:16:49 +02:00
Modify ForceFlush to abort after timeout/cancellation (#1757)
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
@ -41,6 +41,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
|||||||
The existing `ParentSpanID` and `HasRemoteParent` fields are removed in favor of this. (#1748)
|
The existing `ParentSpanID` and `HasRemoteParent` fields are removed in favor of this. (#1748)
|
||||||
- The `ParentContext` field of the `"go.opentelemetry.io/otel/sdk/trace".SamplingParameters` is updated to hold a `context.Context` containing the parent span.
|
- The `ParentContext` field of the `"go.opentelemetry.io/otel/sdk/trace".SamplingParameters` is updated to hold a `context.Context` containing the parent span.
|
||||||
This changes it to make `SamplingParameters` conform with the OpenTelemetry specification. (#1749)
|
This changes it to make `SamplingParameters` conform with the OpenTelemetry specification. (#1749)
|
||||||
|
- Modify `BatchSpanProcessor.ForceFlush` to abort after timeout/cancellation. (#1757)
|
||||||
- Improve OTLP/gRPC exporter connection errors. (#1737)
|
- Improve OTLP/gRPC exporter connection errors. (#1737)
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
|
@ -148,7 +148,23 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
|
|||||||
|
|
||||||
// ForceFlush exports all ended spans that have not yet been exported.
|
// ForceFlush exports all ended spans that have not yet been exported.
|
||||||
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
|
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
|
||||||
return bsp.exportSpans(ctx)
|
var err error
|
||||||
|
if bsp.e != nil {
|
||||||
|
wait := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
if err := bsp.exportSpans(ctx); err != nil {
|
||||||
|
otel.Handle(err)
|
||||||
|
}
|
||||||
|
close(wait)
|
||||||
|
}()
|
||||||
|
// Wait until the export is finished or the context is cancelled/timed out
|
||||||
|
select {
|
||||||
|
case <-wait:
|
||||||
|
case <-ctx.Done():
|
||||||
|
err = ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithMaxQueueSize(size int) BatchSpanProcessorOption {
|
func WithMaxQueueSize(size int) BatchSpanProcessorOption {
|
||||||
|
@ -17,6 +17,7 @@ package trace_test
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"errors"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -257,3 +258,71 @@ func TestBatchSpanProcessorShutdown(t *testing.T) {
|
|||||||
}
|
}
|
||||||
assert.Equal(t, 1, bp.shutdownCount)
|
assert.Equal(t, 1, bp.shutdownCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
|
||||||
|
te := testBatchExporter{}
|
||||||
|
tp := basicTracerProvider(t)
|
||||||
|
option := testOption{
|
||||||
|
name: "default BatchSpanProcessorOptions",
|
||||||
|
o: []sdktrace.BatchSpanProcessorOption{
|
||||||
|
sdktrace.WithMaxQueueSize(0),
|
||||||
|
sdktrace.WithMaxExportBatchSize(3000),
|
||||||
|
},
|
||||||
|
wantNumSpans: 2053,
|
||||||
|
wantBatchCount: 1,
|
||||||
|
genNumSpans: 2053,
|
||||||
|
}
|
||||||
|
ssp := createAndRegisterBatchSP(option, &te)
|
||||||
|
if ssp == nil {
|
||||||
|
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
|
||||||
|
}
|
||||||
|
tp.RegisterSpanProcessor(ssp)
|
||||||
|
tr := tp.Tracer("BatchSpanProcessorWithOption")
|
||||||
|
generateSpan(t, option.parallel, tr, option)
|
||||||
|
|
||||||
|
// Force flush any held span batches
|
||||||
|
err := ssp.ForceFlush(context.Background())
|
||||||
|
|
||||||
|
gotNumOfSpans := te.len()
|
||||||
|
spanDifference := option.wantNumSpans - gotNumOfSpans
|
||||||
|
if spanDifference > 10 || spanDifference < 0 {
|
||||||
|
t.Errorf("number of exported span not equal to or within 10 less than: got %+v, want %+v\n",
|
||||||
|
gotNumOfSpans, option.wantNumSpans)
|
||||||
|
}
|
||||||
|
gotBatchCount := te.getBatchCount()
|
||||||
|
if gotBatchCount < option.wantBatchCount {
|
||||||
|
t.Errorf("number batches: got %+v, want >= %+v\n",
|
||||||
|
gotBatchCount, option.wantBatchCount)
|
||||||
|
t.Errorf("Batches %v\n", te.sizes)
|
||||||
|
}
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
|
||||||
|
var bp testBatchExporter
|
||||||
|
bsp := sdktrace.NewBatchSpanProcessor(&bp)
|
||||||
|
// Add timeout to context to test deadline
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
|
||||||
|
defer cancel()
|
||||||
|
<-ctx.Done()
|
||||||
|
|
||||||
|
if err := bsp.ForceFlush(ctx); err == nil {
|
||||||
|
t.Error("expected context DeadlineExceeded error, got nil")
|
||||||
|
} else if !errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
t.Errorf("expected context DeadlineExceeded error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
|
||||||
|
var bp testBatchExporter
|
||||||
|
bsp := sdktrace.NewBatchSpanProcessor(&bp)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
// Cancel the context
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
if err := bsp.ForceFlush(ctx); err == nil {
|
||||||
|
t.Error("expected context canceled error, got nil")
|
||||||
|
} else if !errors.Is(err, context.Canceled) {
|
||||||
|
t.Errorf("expected context canceled error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user