You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-08-10 22:31:50 +02:00
Use the cause of the context error in OTLP retry (#6898)
Part of #6588 For a demo code like this ```go package main import ( "context" "fmt" "log" "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) func main() { ctx := context.Background() exp, err := newExporter(ctx) if err != nil { log.Fatalf("failed to initialize trace exporter: %v", err) } tp, err := newTracerProvider(exp) if err != nil { log.Fatalf("failed to initialize trace provider: %v", err) } defer func() { _ = tp.Shutdown(ctx) }() otel.SetTracerProvider(tp) generateSpan() select {} } func generateSpan() { log.Println("Generating a dummy span") _, span := otel.Tracer("").Start(context.Background(), "dummy") defer span.End() } func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) { return sdktrace.NewTracerProvider( sdktrace.WithBatcher(exp), ), nil } func newExporter(ctx context.Context) (*otlptrace.Exporter, error) { traceExporter, err := otlptrace.New( ctx, otlptracegrpc.NewClient( otlptracegrpc.WithEndpoint("127.0.0.1:4317"), otlptracegrpc.WithInsecure(), otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{ Enabled: true, InitialInterval: 1 * time.Second, MaxInterval: 30 * time.Second, MaxElapsedTime: time.Minute, }), ), ) if err != nil { return nil, fmt.Errorf("failed to create trace exporter: %w", err) } return traceExporter, nil } ``` the error result from ``` traces export: context deadline exceeded: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused" ``` become ``` traces export: exporter export timeout: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused" ```
This commit is contained in:
@@ -34,6 +34,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- The semantic conventions have been upgraded from `v1.26.0` to `v1.34.0` in `go.opentelemetry.io/otel/sdk/trace`. (#6835)
|
||||
- The semantic conventions have been upgraded from `v1.26.0` to `v1.34.0` in `go.opentelemetry.io/otel/trace`. (#6836)
|
||||
- `Record.Resource` now returns `*resource.Resource` instead of `resource.Resource` in `go.opentelemetry.io/otel/sdk/log`. (#6864)
|
||||
- Retry now shows error cause for context timeout in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`, `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`, `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`, `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#6898)
|
||||
|
||||
### Fixed
|
||||
|
||||
|
@@ -5,6 +5,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -192,7 +193,7 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
|
||||
)
|
||||
|
||||
if c.exportTimeout > 0 {
|
||||
ctx, cancel = context.WithTimeout(parent, c.exportTimeout)
|
||||
ctx, cancel = context.WithTimeoutCause(parent, c.exportTimeout, errors.New("exporter export timeout"))
|
||||
} else {
|
||||
ctx, cancel = context.WithCancel(parent)
|
||||
}
|
||||
@@ -228,6 +229,8 @@ func retryable(err error) (bool, time.Duration) {
|
||||
|
||||
func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
|
||||
switch s.Code() {
|
||||
// Follows the retryable error codes defined in
|
||||
// https://opentelemetry.io/docs/specs/otlp/#failures
|
||||
case codes.Canceled,
|
||||
codes.DeadlineExceeded,
|
||||
codes.Aborted,
|
||||
|
@@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
return ctx.Err()
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
|
@@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
return ctx.Err()
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
|
@@ -5,6 +5,7 @@ package otlpmetricgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpme
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"google.golang.org/genproto/googleapis/rpc/errdetails"
|
||||
@@ -149,7 +150,7 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
|
||||
)
|
||||
|
||||
if c.exportTimeout > 0 {
|
||||
ctx, cancel = context.WithTimeout(parent, c.exportTimeout)
|
||||
ctx, cancel = context.WithTimeoutCause(parent, c.exportTimeout, errors.New("exporter export timeout"))
|
||||
} else {
|
||||
ctx, cancel = context.WithCancel(parent)
|
||||
}
|
||||
|
@@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
return ctx.Err()
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
|
@@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
return ctx.Err()
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
|
@@ -223,7 +223,7 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
|
||||
)
|
||||
|
||||
if c.exportTimeout > 0 {
|
||||
ctx, cancel = context.WithTimeout(parent, c.exportTimeout)
|
||||
ctx, cancel = context.WithTimeoutCause(parent, c.exportTimeout, errors.New("exporter export timeout"))
|
||||
} else {
|
||||
ctx, cancel = context.WithCancel(parent)
|
||||
}
|
||||
|
@@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
return ctx.Err()
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
|
@@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
return ctx.Err()
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
|
@@ -132,7 +132,7 @@ func wait(ctx context.Context, delay time.Duration) error {
|
||||
select {
|
||||
case <-timer.C:
|
||||
default:
|
||||
return ctx.Err()
|
||||
return context.Cause(ctx)
|
||||
}
|
||||
case <-timer.C:
|
||||
}
|
||||
|
@@ -119,7 +119,9 @@ func newTimeoutExporter(exp Exporter, timeout time.Duration) Exporter {
|
||||
|
||||
// Export sets the timeout of ctx before calling the Exporter e wraps.
|
||||
func (e *timeoutExporter) Export(ctx context.Context, records []Record) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, e.timeout)
|
||||
// This only used by the batch processor, and it takes processor timeout config.
|
||||
// Thus, the error message points to the processor. So users know they should adjust the processor timeout.
|
||||
ctx, cancel := context.WithTimeoutCause(ctx, e.timeout, errors.New("processor export timeout"))
|
||||
defer cancel()
|
||||
return e.Exporter.Export(ctx, records)
|
||||
}
|
||||
|
@@ -202,7 +202,7 @@ func (r *PeriodicReader) aggregation(
|
||||
// collectAndExport gather all metric data related to the periodicReader r from
|
||||
// the SDK and exports it with r's exporter.
|
||||
func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, r.timeout)
|
||||
ctx, cancel := context.WithTimeoutCause(ctx, r.timeout, errors.New("reader collect and export timeout"))
|
||||
defer cancel()
|
||||
|
||||
// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
|
||||
@@ -278,7 +278,7 @@ func (r *PeriodicReader) ForceFlush(ctx context.Context) error {
|
||||
// Prioritize the ctx timeout if it is set.
|
||||
if _, ok := ctx.Deadline(); !ok {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, r.timeout)
|
||||
ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader force flush timeout"))
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
@@ -311,7 +311,7 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error {
|
||||
// Prioritize the ctx timeout if it is set.
|
||||
if _, ok := ctx.Deadline(); !ok {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, r.timeout)
|
||||
ctx, cancel = context.WithTimeoutCause(ctx, r.timeout, errors.New("reader shutdown timeout"))
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
|
@@ -5,6 +5,7 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -267,7 +268,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
|
||||
|
||||
if bsp.o.ExportTimeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
|
||||
ctx, cancel = context.WithTimeoutCause(ctx, bsp.o.ExportTimeout, errors.New("processor export timeout"))
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user