From a4055c21bce3d57d51429ca2d0748b6ce400c612 Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Thu, 12 Jun 2025 10:02:35 -0700 Subject: [PATCH] Use the cause of the context error in OTLP retry (#6898) Part of #6588 For a demo code like this ```go package main import ( "context" "fmt" "log" "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) func main() { ctx := context.Background() exp, err := newExporter(ctx) if err != nil { log.Fatalf("failed to initialize trace exporter: %v", err) } tp, err := newTracerProvider(exp) if err != nil { log.Fatalf("failed to initialize trace provider: %v", err) } defer func() { _ = tp.Shutdown(ctx) }() otel.SetTracerProvider(tp) generateSpan() select {} } func generateSpan() { log.Println("Generating a dummy span") _, span := otel.Tracer("").Start(context.Background(), "dummy") defer span.End() } func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) { return sdktrace.NewTracerProvider( sdktrace.WithBatcher(exp), ), nil } func newExporter(ctx context.Context) (*otlptrace.Exporter, error) { traceExporter, err := otlptrace.New( ctx, otlptracegrpc.NewClient( otlptracegrpc.WithEndpoint("127.0.0.1:4317"), otlptracegrpc.WithInsecure(), otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{ Enabled: true, InitialInterval: 1 * time.Second, MaxInterval: 30 * time.Second, MaxElapsedTime: time.Minute, }), ), ) if err != nil { return nil, fmt.Errorf("failed to create trace exporter: %w", err) } return traceExporter, nil } ``` the error result from ``` traces export: context deadline exceeded: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused" ``` become ``` traces export: exporter export timeout: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused" ``` --- CHANGELOG.md | 1 + exporters/otlp/otlplog/otlploggrpc/client.go | 5 ++++- exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go | 2 +- exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go | 2 +- exporters/otlp/otlpmetric/otlpmetricgrpc/client.go | 3 ++- .../otlp/otlpmetric/otlpmetricgrpc/internal/retry/retry.go | 2 +- .../otlp/otlpmetric/otlpmetrichttp/internal/retry/retry.go | 2 +- exporters/otlp/otlptrace/otlptracegrpc/client.go | 2 +- .../otlp/otlptrace/otlptracegrpc/internal/retry/retry.go | 2 +- .../otlp/otlptrace/otlptracehttp/internal/retry/retry.go | 2 +- internal/shared/otlp/retry/retry.go.tmpl | 2 +- sdk/log/exporter.go | 4 +++- sdk/metric/periodic_reader.go | 6 +++--- sdk/trace/batch_span_processor.go | 3 ++- 14 files changed, 23 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5855e7d53..4e6e47443 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The semantic conventions have been upgraded from `v1.26.0` to `v1.34.0` in `go.opentelemetry.io/otel/sdk/trace`. (#6835) - The semantic conventions have been upgraded from `v1.26.0` to `v1.34.0` in `go.opentelemetry.io/otel/trace`. (#6836) - `Record.Resource` now returns `*resource.Resource` instead of `resource.Resource` in `go.opentelemetry.io/otel/sdk/log`. (#6864) +- Retry now shows error cause for context timeout in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`, `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`, `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#6898) ### Fixed diff --git a/exporters/otlp/otlplog/otlploggrpc/client.go b/exporters/otlp/otlplog/otlploggrpc/client.go index 05abd92ee..1add3f333 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client.go +++ b/exporters/otlp/otlplog/otlploggrpc/client.go @@ -5,6 +5,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o import ( "context" + "errors" "fmt" "time" @@ -192,7 +193,7 @@ func (c *client) exportContext(parent context.Context) (context.Context, context ) if c.exportTimeout > 0 { - ctx, cancel = context.WithTimeout(parent, c.exportTimeout) + ctx, cancel = context.WithTimeoutCause(parent, c.exportTimeout, errors.New("exporter export timeout")) } else { ctx, cancel = context.WithCancel(parent) } @@ -228,6 +229,8 @@ func retryable(err error) (bool, time.Duration) { func retryableGRPCStatus(s *status.Status) (bool, time.Duration) { switch s.Code() { + // Follows the retryable error codes defined in + // https://opentelemetry.io/docs/specs/otlp/#failures case codes.Canceled, codes.DeadlineExceeded, codes.Aborted, diff --git a/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go b/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go index 896c3a303..fa5946774 100644 --- a/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go +++ b/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go @@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error { select { case <-timer.C: default: - return ctx.Err() + return context.Cause(ctx) } case <-timer.C: } diff --git a/exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go b/exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go index bd9a750a1..a0a9dc133 100644 --- a/exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go +++ b/exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go @@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error { select { case <-timer.C: default: - return ctx.Err() + return context.Cause(ctx) } case <-timer.C: } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index e0fa0570a..82a4c2c2a 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -5,6 +5,7 @@ package otlpmetricgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpme import ( "context" + "errors" "time" "google.golang.org/genproto/googleapis/rpc/errdetails" @@ -149,7 +150,7 @@ func (c *client) exportContext(parent context.Context) (context.Context, context ) if c.exportTimeout > 0 { - ctx, cancel = context.WithTimeout(parent, c.exportTimeout) + ctx, cancel = context.WithTimeoutCause(parent, c.exportTimeout, errors.New("exporter export timeout")) } else { ctx, cancel = context.WithCancel(parent) } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/retry/retry.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/retry/retry.go index 37cc6c519..80691ac3a 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/retry/retry.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/retry/retry.go @@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error { select { case <-timer.C: default: - return ctx.Err() + return context.Cause(ctx) } case <-timer.C: } diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/retry/retry.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/retry/retry.go index c855bdc93..8a5fa80ea 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/retry/retry.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/retry/retry.go @@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error { select { case <-timer.C: default: - return ctx.Err() + return context.Cause(ctx) } case <-timer.C: } diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client.go b/exporters/otlp/otlptrace/otlptracegrpc/client.go index 8409b5f8f..8236c995a 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client.go @@ -223,7 +223,7 @@ func (c *client) exportContext(parent context.Context) (context.Context, context ) if c.exportTimeout > 0 { - ctx, cancel = context.WithTimeout(parent, c.exportTimeout) + ctx, cancel = context.WithTimeoutCause(parent, c.exportTimeout, errors.New("exporter export timeout")) } else { ctx, cancel = context.WithCancel(parent) } diff --git a/exporters/otlp/otlptrace/otlptracegrpc/internal/retry/retry.go b/exporters/otlp/otlptrace/otlptracegrpc/internal/retry/retry.go index 777e68a7b..259a898ae 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/internal/retry/retry.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/internal/retry/retry.go @@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error { select { case <-timer.C: default: - return ctx.Err() + return context.Cause(ctx) } case <-timer.C: } diff --git a/exporters/otlp/otlptrace/otlptracehttp/internal/retry/retry.go b/exporters/otlp/otlptrace/otlptracehttp/internal/retry/retry.go index e9d35c7fa..107428fa6 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/internal/retry/retry.go +++ b/exporters/otlp/otlptrace/otlptracehttp/internal/retry/retry.go @@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error { select { case <-timer.C: default: - return ctx.Err() + return context.Cause(ctx) } case <-timer.C: } diff --git a/internal/shared/otlp/retry/retry.go.tmpl b/internal/shared/otlp/retry/retry.go.tmpl index 896c3a303..fa5946774 100644 --- a/internal/shared/otlp/retry/retry.go.tmpl +++ b/internal/shared/otlp/retry/retry.go.tmpl @@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error { select { case <-timer.C: default: - return ctx.Err() + return context.Cause(ctx) } case <-timer.C: } diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 8cef5dde6..a9d3c439b 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -119,7 +119,9 @@ func newTimeoutExporter(exp Exporter, timeout time.Duration) Exporter { // Export sets the timeout of ctx before calling the Exporter e wraps. func (e *timeoutExporter) Export(ctx context.Context, records []Record) error { - ctx, cancel := context.WithTimeout(ctx, e.timeout) + // This only used by the batch processor, and it takes processor timeout config. + // Thus, the error message points to the processor. So users know they should adjust the processor timeout. + ctx, cancel := context.WithTimeoutCause(ctx, e.timeout, errors.New("processor export timeout")) defer cancel() return e.Exporter.Export(ctx, records) } diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index ebb9a0463..0a48aed74 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -202,7 +202,7 @@ func (r *PeriodicReader) aggregation( // collectAndExport gather all metric data related to the periodicReader r from // the SDK and exports it with r's exporter. func (r *PeriodicReader) collectAndExport(ctx context.Context) error { - ctx, cancel := context.WithTimeout(ctx, r.timeout) + ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect and export timeout")) defer cancel() // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect. @@ -278,7 +278,7 @@ func (r *PeriodicReader) ForceFlush(ctx context.Context) error { // Prioritize the ctx timeout if it is set. if _, ok := ctx.Deadline(); !ok { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.timeout) + ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader force flush timeout")) defer cancel() } @@ -311,7 +311,7 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error { // Prioritize the ctx timeout if it is set. if _, ok := ctx.Deadline(); !ok { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, r.timeout) + ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader shutdown timeout")) defer cancel() } diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index 6872cbb4e..6966ed861 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -5,6 +5,7 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" + "errors" "sync" "sync/atomic" "time" @@ -267,7 +268,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error { if bsp.o.ExportTimeout > 0 { var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout) + ctx, cancel = context.WithTimeoutCause(ctx, bsp.o.ExportTimeout, errors.New("processor export timeout")) defer cancel() }