From a6b31e0da1ad6c0e7e66624386a4fc8fd932f521 Mon Sep 17 00:00:00 2001 From: Ilya Kaznacheev Date: Tue, 27 Oct 2020 05:06:55 +0300 Subject: [PATCH] Update SpanProcessor Shutdown with context and error (#1264) * 1232: update SpanProcessor Shutdown with context and error * 1232: add changelog info * 1232: fix CI error, rm commented code * 1232: fix CI unhandled error * 1232: Done commit properly * Add shutdown error handling * Merge branch 'master' into update-span-processor * Revert now unneeded context declaration move Co-authored-by: Tyler Yahn Co-authored-by: Tyler Yahn --- CHANGELOG.md | 1 + example/otel-collector/main.go | 5 +++-- sdk/trace/batch_span_processor.go | 18 +++++++++++++++--- sdk/trace/batch_span_processor_test.go | 15 ++++++++++++--- sdk/trace/provider.go | 5 +++-- sdk/trace/provider_test.go | 3 ++- sdk/trace/simple_span_processor.go | 3 ++- sdk/trace/simple_span_processor_test.go | 6 +++++- sdk/trace/span_processor.go | 5 +++-- sdk/trace/span_processor_example_test.go | 13 +++++++------ sdk/trace/span_processor_test.go | 24 ++++++++++++++---------- 11 files changed, 67 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce18bf337..349a2c585 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm They no longer track the gRPC codes. (#1214) - The `StatusCode` field of the `SpanData` struct in the `go.opentelemetry.io/otel/sdk/export/trace` package now uses the codes package from this package instead of the gRPC project. (#1214) - Move the `go.opentelemetry.io/otel/api/baggage` package into `go.opentelemetry.io/otel/propagators`. (#1217) +- A `Shutdown` method of `SpanProcessor` and all its implementations receives a context and returns an error. (#1264) ### Fixed diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index 6fa294f6f..39aa5ba68 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -80,8 +80,9 @@ func initProvider() func() { pusher.Start() return func() { - handleErr(tracerProvider.Shutdown(context.Background()), "failed to shutdown provider") - handleErr(exp.Shutdown(context.Background()), "failed to stop exporter") + ctx := context.Background() + handleErr(tracerProvider.Shutdown(ctx), "failed to shutdown provider") + handleErr(exp.Shutdown(ctx), "failed to stop exporter") pusher.Stop() // pushes any last exports to the receiver } } diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 82a4722ba..83015e943 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -125,11 +125,23 @@ func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) { // Shutdown flushes the queue and waits until all spans are processed. // It only executes once. Subsequent call does nothing. -func (bsp *BatchSpanProcessor) Shutdown() { +func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error { + var err error bsp.stopOnce.Do(func() { - close(bsp.stopCh) - bsp.stopWait.Wait() + wait := make(chan struct{}) + go func() { + close(bsp.stopCh) + bsp.stopWait.Wait() + close(wait) + }() + // Wait until the wait group is done or the context is cancelled + select { + case <-wait: + case <-ctx.Done(): + err = ctx.Err() + } }) + return err } // ForceFlush exports all ended spans that have not yet been exported. diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index 239a0af87..173903b8e 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -65,7 +65,10 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) { bsp.OnStart(&export.SpanData{}) bsp.OnEnd(&export.SpanData{}) bsp.ForceFlush() - bsp.Shutdown() + err := bsp.Shutdown(context.Background()) + if err != nil { + t.Error("Error shutting the BatchSpanProcessor down\n") + } } type testOption struct { @@ -222,8 +225,14 @@ func getSpanContext() otel.SpanContext { func TestBatchSpanProcessorShutdown(t *testing.T) { bsp := sdktrace.NewBatchSpanProcessor(&testBatchExporter{}) - bsp.Shutdown() + err := bsp.Shutdown(context.Background()) + if err != nil { + t.Error("Error shutting the BatchSpanProcessor down\n") + } // Multiple call to Shutdown() should not panic. - bsp.Shutdown() + err = bsp.Shutdown(context.Background()) + if err != nil { + t.Error("Error shutting the BatchSpanProcessor down\n") + } } diff --git a/sdk/trace/provider.go b/sdk/trace/provider.go index b4cdcf6bb..d915f19a0 100644 --- a/sdk/trace/provider.go +++ b/sdk/trace/provider.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/global" export "go.opentelemetry.io/otel/sdk/export/trace" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/resource" @@ -142,7 +143,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) { } if stopOnce != nil { stopOnce.state.Do(func() { - s.Shutdown() + global.Handle(s.Shutdown(context.Background())) }) } if len(new) > 1 { @@ -190,7 +191,7 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error { for _, sps := range spss { sps.state.Do(func() { - sps.sp.Shutdown() + global.Handle(sps.sp.Shutdown(ctx)) }) } return nil diff --git a/sdk/trace/provider_test.go b/sdk/trace/provider_test.go index 02c028289..edfd8735b 100644 --- a/sdk/trace/provider_test.go +++ b/sdk/trace/provider_test.go @@ -25,8 +25,9 @@ type basicSpanProcesor struct { running bool } -func (t *basicSpanProcesor) Shutdown() { +func (t *basicSpanProcesor) Shutdown(context.Context) error { t.running = false + return nil } func (t *basicSpanProcesor) OnStart(s *export.SpanData) {} diff --git a/sdk/trace/simple_span_processor.go b/sdk/trace/simple_span_processor.go index d0cef298c..c3899e210 100644 --- a/sdk/trace/simple_span_processor.go +++ b/sdk/trace/simple_span_processor.go @@ -52,7 +52,8 @@ func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) { } // Shutdown method does nothing. There is no data to cleanup. -func (ssp *SimpleSpanProcessor) Shutdown() { +func (ssp *SimpleSpanProcessor) Shutdown(_ context.Context) error { + return nil } // ForceFlush does nothing as there is no data to flush. diff --git a/sdk/trace/simple_span_processor_test.go b/sdk/trace/simple_span_processor_test.go index 4014d9e59..98b570308 100644 --- a/sdk/trace/simple_span_processor_test.go +++ b/sdk/trace/simple_span_processor_test.go @@ -82,7 +82,11 @@ func TestSimpleSpanProcessorShutdown(t *testing.T) { ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{}) if ssp == nil { t.Errorf("Error creating new instance of SimpleSpanProcessor\n") + return } - ssp.Shutdown() + err := ssp.Shutdown(context.Background()) + if err != nil { + t.Error("Error shutting the SimpleSpanProcessor down\n") + } } diff --git a/sdk/trace/span_processor.go b/sdk/trace/span_processor.go index e707745f9..dffb9aa02 100644 --- a/sdk/trace/span_processor.go +++ b/sdk/trace/span_processor.go @@ -15,6 +15,7 @@ package trace import ( + "context" "sync" export "go.opentelemetry.io/otel/sdk/export/trace" @@ -31,10 +32,10 @@ type SpanProcessor interface { // and hence should not block. OnEnd(sd *export.SpanData) - // Shutdown is invoked when SDK shutsdown. Use this call to cleanup any processor + // Shutdown is invoked when SDK shuts down. Use this call to cleanup any processor // data. No calls to OnStart and OnEnd method is invoked after Shutdown call is // made. It should not be blocked indefinitely. - Shutdown() + Shutdown(ctx context.Context) error // ForceFlush exports all ended spans to the configured Exporter that have not yet // been exported. It should only be called when absolutely necessary, such as when diff --git a/sdk/trace/span_processor_example_test.go b/sdk/trace/span_processor_example_test.go index 5eb41b005..f22407852 100644 --- a/sdk/trace/span_processor_example_test.go +++ b/sdk/trace/span_processor_example_test.go @@ -15,6 +15,7 @@ package trace import ( + "context" "time" export "go.opentelemetry.io/otel/sdk/export/trace" @@ -33,9 +34,9 @@ type DurationFilter struct { Max time.Duration } -func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) } -func (f DurationFilter) Shutdown() { f.Next.Shutdown() } -func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() } +func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) } +func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) } +func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() } func (f DurationFilter) OnEnd(sd *export.SpanData) { if f.Min > 0 && sd.EndTime.Sub(sd.StartTime) < f.Min { // Drop short lived spans. @@ -59,9 +60,9 @@ type InstrumentationBlacklist struct { Blacklist map[string]bool } -func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) } -func (f InstrumentationBlacklist) Shutdown() { f.Next.Shutdown() } -func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() } +func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) } +func (f InstrumentationBlacklist) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) } +func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() } func (f InstrumentationBlacklist) OnEnd(sd *export.SpanData) { if f.Blacklist != nil && f.Blacklist[sd.InstrumentationLibrary.Name] { // Drop spans from this instrumentation diff --git a/sdk/trace/span_processor_test.go b/sdk/trace/span_processor_test.go index a64d80839..081307ce9 100644 --- a/sdk/trace/span_processor_test.go +++ b/sdk/trace/span_processor_test.go @@ -22,14 +22,14 @@ import ( export "go.opentelemetry.io/otel/sdk/export/trace" ) -type testSpanProcesor struct { +type testSpanProcessor struct { name string spansStarted []*export.SpanData spansEnded []*export.SpanData shutdownCount int } -func (t *testSpanProcesor) OnStart(s *export.SpanData) { +func (t *testSpanProcessor) OnStart(s *export.SpanData) { kv := label.KeyValue{ Key: "OnStart", Value: label.StringValue(t.name), @@ -38,7 +38,7 @@ func (t *testSpanProcesor) OnStart(s *export.SpanData) { t.spansStarted = append(t.spansStarted, s) } -func (t *testSpanProcesor) OnEnd(s *export.SpanData) { +func (t *testSpanProcessor) OnEnd(s *export.SpanData) { kv := label.KeyValue{ Key: "OnEnd", Value: label.StringValue(t.name), @@ -47,11 +47,12 @@ func (t *testSpanProcesor) OnEnd(s *export.SpanData) { t.spansEnded = append(t.spansEnded, s) } -func (t *testSpanProcesor) Shutdown() { +func (t *testSpanProcessor) Shutdown(_ context.Context) error { t.shutdownCount++ + return nil } -func (t *testSpanProcesor) ForceFlush() { +func (t *testSpanProcessor) ForceFlush() { } func TestRegisterSpanProcessort(t *testing.T) { @@ -181,7 +182,10 @@ func TestSpanProcessorShutdown(t *testing.T) { tp.RegisterSpanProcessor(sp) wantCount := 1 - sp.Shutdown() + err := sp.Shutdown(context.Background()) + if err != nil { + t.Error("Error shutting the testSpanProcessor down\n") + } gotCount := sp.shutdownCount if wantCount != gotCount { @@ -216,12 +220,12 @@ func TestMultipleUnregisterSpanProcessorCalls(t *testing.T) { } } -func NewTestSpanProcessor(name string) *testSpanProcesor { - return &testSpanProcesor{name: name} +func NewTestSpanProcessor(name string) *testSpanProcessor { + return &testSpanProcessor{name: name} } -func NewNamedTestSpanProcessors(names []string) []*testSpanProcesor { - tsp := []*testSpanProcesor{} +func NewNamedTestSpanProcessors(names []string) []*testSpanProcessor { + tsp := []*testSpanProcessor{} for _, n := range names { tsp = append(tsp, NewTestSpanProcessor(n)) }