1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-30 21:20:04 +02:00

Update SpanProcessor Shutdown with context and error (#1264)

* 1232: update SpanProcessor Shutdown with context and error

* 1232: add changelog info

* 1232: fix CI error, rm commented code

* 1232: fix CI unhandled error

* 1232: Done commit properly

* Add shutdown error handling

* Merge branch 'master' into update-span-processor

* Revert now unneeded context declaration move

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Tyler Yahn <codingalias@gmail.com>
This commit is contained in:
Ilya Kaznacheev 2020-10-27 05:06:55 +03:00 committed by GitHub
parent 412ee70ad2
commit a6b31e0da1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 67 additions and 31 deletions

View File

@ -64,6 +64,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
They no longer track the gRPC codes. (#1214)
- The `StatusCode` field of the `SpanData` struct in the `go.opentelemetry.io/otel/sdk/export/trace` package now uses the codes package from this package instead of the gRPC project. (#1214)
- Move the `go.opentelemetry.io/otel/api/baggage` package into `go.opentelemetry.io/otel/propagators`. (#1217)
- A `Shutdown` method of `SpanProcessor` and all its implementations receives a context and returns an error. (#1264)
### Fixed

View File

@ -80,8 +80,9 @@ func initProvider() func() {
pusher.Start()
return func() {
handleErr(tracerProvider.Shutdown(context.Background()), "failed to shutdown provider")
handleErr(exp.Shutdown(context.Background()), "failed to stop exporter")
ctx := context.Background()
handleErr(tracerProvider.Shutdown(ctx), "failed to shutdown provider")
handleErr(exp.Shutdown(ctx), "failed to stop exporter")
pusher.Stop() // pushes any last exports to the receiver
}
}

View File

@ -125,11 +125,23 @@ func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) {
// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func (bsp *BatchSpanProcessor) Shutdown() {
func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
wait := make(chan struct{})
go func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
close(wait)
}()
// Wait until the wait group is done or the context is cancelled
select {
case <-wait:
case <-ctx.Done():
err = ctx.Err()
}
})
return err
}
// ForceFlush exports all ended spans that have not yet been exported.

View File

@ -65,7 +65,10 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
bsp.OnStart(&export.SpanData{})
bsp.OnEnd(&export.SpanData{})
bsp.ForceFlush()
bsp.Shutdown()
err := bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
}
}
type testOption struct {
@ -222,8 +225,14 @@ func getSpanContext() otel.SpanContext {
func TestBatchSpanProcessorShutdown(t *testing.T) {
bsp := sdktrace.NewBatchSpanProcessor(&testBatchExporter{})
bsp.Shutdown()
err := bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
}
// Multiple call to Shutdown() should not panic.
bsp.Shutdown()
err = bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
}
}

View File

@ -20,6 +20,7 @@ import (
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/global"
export "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
@ -142,7 +143,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
}
if stopOnce != nil {
stopOnce.state.Do(func() {
s.Shutdown()
global.Handle(s.Shutdown(context.Background()))
})
}
if len(new) > 1 {
@ -190,7 +191,7 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error {
for _, sps := range spss {
sps.state.Do(func() {
sps.sp.Shutdown()
global.Handle(sps.sp.Shutdown(ctx))
})
}
return nil

View File

@ -25,8 +25,9 @@ type basicSpanProcesor struct {
running bool
}
func (t *basicSpanProcesor) Shutdown() {
func (t *basicSpanProcesor) Shutdown(context.Context) error {
t.running = false
return nil
}
func (t *basicSpanProcesor) OnStart(s *export.SpanData) {}

View File

@ -52,7 +52,8 @@ func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) {
}
// Shutdown method does nothing. There is no data to cleanup.
func (ssp *SimpleSpanProcessor) Shutdown() {
func (ssp *SimpleSpanProcessor) Shutdown(_ context.Context) error {
return nil
}
// ForceFlush does nothing as there is no data to flush.

View File

@ -82,7 +82,11 @@ func TestSimpleSpanProcessorShutdown(t *testing.T) {
ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{})
if ssp == nil {
t.Errorf("Error creating new instance of SimpleSpanProcessor\n")
return
}
ssp.Shutdown()
err := ssp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the SimpleSpanProcessor down\n")
}
}

View File

@ -15,6 +15,7 @@
package trace
import (
"context"
"sync"
export "go.opentelemetry.io/otel/sdk/export/trace"
@ -31,10 +32,10 @@ type SpanProcessor interface {
// and hence should not block.
OnEnd(sd *export.SpanData)
// Shutdown is invoked when SDK shutsdown. Use this call to cleanup any processor
// Shutdown is invoked when SDK shuts down. Use this call to cleanup any processor
// data. No calls to OnStart and OnEnd method is invoked after Shutdown call is
// made. It should not be blocked indefinitely.
Shutdown()
Shutdown(ctx context.Context) error
// ForceFlush exports all ended spans to the configured Exporter that have not yet
// been exported. It should only be called when absolutely necessary, such as when

View File

@ -15,6 +15,7 @@
package trace
import (
"context"
"time"
export "go.opentelemetry.io/otel/sdk/export/trace"
@ -33,9 +34,9 @@ type DurationFilter struct {
Max time.Duration
}
func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f DurationFilter) Shutdown() { f.Next.Shutdown() }
func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() }
func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() }
func (f DurationFilter) OnEnd(sd *export.SpanData) {
if f.Min > 0 && sd.EndTime.Sub(sd.StartTime) < f.Min {
// Drop short lived spans.
@ -59,9 +60,9 @@ type InstrumentationBlacklist struct {
Blacklist map[string]bool
}
func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f InstrumentationBlacklist) Shutdown() { f.Next.Shutdown() }
func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() }
func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f InstrumentationBlacklist) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() }
func (f InstrumentationBlacklist) OnEnd(sd *export.SpanData) {
if f.Blacklist != nil && f.Blacklist[sd.InstrumentationLibrary.Name] {
// Drop spans from this instrumentation

View File

@ -22,14 +22,14 @@ import (
export "go.opentelemetry.io/otel/sdk/export/trace"
)
type testSpanProcesor struct {
type testSpanProcessor struct {
name string
spansStarted []*export.SpanData
spansEnded []*export.SpanData
shutdownCount int
}
func (t *testSpanProcesor) OnStart(s *export.SpanData) {
func (t *testSpanProcessor) OnStart(s *export.SpanData) {
kv := label.KeyValue{
Key: "OnStart",
Value: label.StringValue(t.name),
@ -38,7 +38,7 @@ func (t *testSpanProcesor) OnStart(s *export.SpanData) {
t.spansStarted = append(t.spansStarted, s)
}
func (t *testSpanProcesor) OnEnd(s *export.SpanData) {
func (t *testSpanProcessor) OnEnd(s *export.SpanData) {
kv := label.KeyValue{
Key: "OnEnd",
Value: label.StringValue(t.name),
@ -47,11 +47,12 @@ func (t *testSpanProcesor) OnEnd(s *export.SpanData) {
t.spansEnded = append(t.spansEnded, s)
}
func (t *testSpanProcesor) Shutdown() {
func (t *testSpanProcessor) Shutdown(_ context.Context) error {
t.shutdownCount++
return nil
}
func (t *testSpanProcesor) ForceFlush() {
func (t *testSpanProcessor) ForceFlush() {
}
func TestRegisterSpanProcessort(t *testing.T) {
@ -181,7 +182,10 @@ func TestSpanProcessorShutdown(t *testing.T) {
tp.RegisterSpanProcessor(sp)
wantCount := 1
sp.Shutdown()
err := sp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the testSpanProcessor down\n")
}
gotCount := sp.shutdownCount
if wantCount != gotCount {
@ -216,12 +220,12 @@ func TestMultipleUnregisterSpanProcessorCalls(t *testing.T) {
}
}
func NewTestSpanProcessor(name string) *testSpanProcesor {
return &testSpanProcesor{name: name}
func NewTestSpanProcessor(name string) *testSpanProcessor {
return &testSpanProcessor{name: name}
}
func NewNamedTestSpanProcessors(names []string) []*testSpanProcesor {
tsp := []*testSpanProcesor{}
func NewNamedTestSpanProcessors(names []string) []*testSpanProcessor {
tsp := []*testSpanProcessor{}
for _, n := range names {
tsp = append(tsp, NewTestSpanProcessor(n))
}