You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
d03b03395d
Fix #7673 [Issue being addressed](https://github.com/open-telemetry/opentelemetry-go/issues/7673#issuecomment-3618325229): > 1. [`fn`](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry_test.go#L163-L165) is [called](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go#L87) > 2. It [returns an error](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry_test.go#L165) > 3. The code [checks if the error is retryable](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go#L92), it [always is](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry_test.go#L149) > 4. [Time delay is checked](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go#L97-L108) > - [Max elsapsed time](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry_test.go#L156-L157) is 10 ms > - Initial [delay is 1ms](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry_test.go#L151) > - Delay is determined to be 1ms > - The program proceeds to waiting > 5. [Wait is called](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go#L110-L112) > 6. The [wait select statement is evaluated](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go#L127-L138) > - On slow systems both `case`s are true > - [Non-deterministically](https://go.dev/ref/spec#:~:text=If%20one%20or,communications%20can%20proceed.) the [timer channel `case`](https://github.com/open-telemetry/opentelemetry-go/blob/1bc9713ac6dc8cbe2fd04fd6dc716d316059eb90/exporters/otlp/otlplog/otlploggrpc/internal/retry/retry.go#L137) is selected > - The retry function is re-run and and second iteration is recorded causing the failure > - On fast systems only the context cancel is true > - The retry stops here with only `1` execution Do not rely on non-deterministic `select` statement to catch ended context prior to waiting for a retry delay. Explicitly check the context prior to entering the wait. This resolves the flaky test and ensure in normal operation that requests with canceled context are ended without having to wait for any additional delays.
147 lines
4.4 KiB
Go
147 lines
4.4 KiB
Go
// Code generated by gotmpl. DO NOT MODIFY.
|
|
// source: internal/shared/otlp/retry/retry.go.tmpl
|
|
|
|
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
// Package retry provides request retry functionality that can perform
|
|
// configurable exponential backoff for transient errors and honor any
|
|
// explicit throttle responses received.
|
|
package retry // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v5"
|
|
)
|
|
|
|
// DefaultConfig are the recommended defaults to use.
|
|
var DefaultConfig = Config{
|
|
Enabled: true,
|
|
InitialInterval: 5 * time.Second,
|
|
MaxInterval: 30 * time.Second,
|
|
MaxElapsedTime: time.Minute,
|
|
}
|
|
|
|
// Config defines configuration for retrying batches in case of export failure
|
|
// using an exponential backoff.
|
|
type Config struct {
|
|
// Enabled indicates whether to not retry sending batches in case of
|
|
// export failure.
|
|
Enabled bool
|
|
// InitialInterval the time to wait after the first failure before
|
|
// retrying.
|
|
InitialInterval time.Duration
|
|
// MaxInterval is the upper bound on backoff interval. Once this value is
|
|
// reached the delay between consecutive retries will always be
|
|
// `MaxInterval`.
|
|
MaxInterval time.Duration
|
|
// MaxElapsedTime is the maximum amount of time (including retries) spent
|
|
// trying to send a request/batch. Once this value is reached, the data
|
|
// is discarded.
|
|
MaxElapsedTime time.Duration
|
|
}
|
|
|
|
// RequestFunc wraps a request with retry logic.
|
|
type RequestFunc func(context.Context, func(context.Context) error) error
|
|
|
|
// EvaluateFunc returns if an error is retry-able and if an explicit throttle
|
|
// duration should be honored that was included in the error.
|
|
//
|
|
// The function must return true if the error argument is retry-able,
|
|
// otherwise it must return false for the first return parameter.
|
|
//
|
|
// The function must return a non-zero time.Duration if the error contains
|
|
// explicit throttle duration that should be honored, otherwise it must return
|
|
// a zero valued time.Duration.
|
|
type EvaluateFunc func(error) (bool, time.Duration)
|
|
|
|
// RequestFunc returns a RequestFunc using the evaluate function to determine
|
|
// if requests can be retried and based on the exponential backoff
|
|
// configuration of c.
|
|
func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc {
|
|
if !c.Enabled {
|
|
return func(ctx context.Context, fn func(context.Context) error) error {
|
|
return fn(ctx)
|
|
}
|
|
}
|
|
|
|
return func(ctx context.Context, fn func(context.Context) error) error {
|
|
// Do not use NewExponentialBackOff since it calls Reset and the code here
|
|
// must call Reset after changing the InitialInterval (this saves an
|
|
// unnecessary call to Now).
|
|
b := &backoff.ExponentialBackOff{
|
|
InitialInterval: c.InitialInterval,
|
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
|
Multiplier: backoff.DefaultMultiplier,
|
|
MaxInterval: c.MaxInterval,
|
|
}
|
|
b.Reset()
|
|
|
|
maxElapsedTime := c.MaxElapsedTime
|
|
startTime := time.Now()
|
|
|
|
for {
|
|
err := fn(ctx)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
retryable, throttle := evaluate(err)
|
|
if !retryable {
|
|
return err
|
|
}
|
|
|
|
// Check if context is canceled before attempting to wait and retry.
|
|
if ctx.Err() != nil {
|
|
return fmt.Errorf("%w: %w", ctx.Err(), err)
|
|
}
|
|
|
|
if maxElapsedTime != 0 && time.Since(startTime) > maxElapsedTime {
|
|
return fmt.Errorf("max retry time elapsed: %w", err)
|
|
}
|
|
|
|
// Wait for the greater of the backoff or throttle delay.
|
|
bOff := b.NextBackOff()
|
|
delay := max(throttle, bOff)
|
|
|
|
elapsed := time.Since(startTime)
|
|
if maxElapsedTime != 0 && elapsed+throttle > maxElapsedTime {
|
|
return fmt.Errorf("max retry time would elapse: %w", err)
|
|
}
|
|
|
|
if ctxErr := waitFunc(ctx, delay); ctxErr != nil {
|
|
return fmt.Errorf("%w: %w", ctxErr, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Allow override for testing.
|
|
var waitFunc = wait
|
|
|
|
// wait takes the caller's context, and the amount of time to wait. It will
|
|
// return nil if the timer fires before or at the same time as the context's
|
|
// deadline. This indicates that the call can be retried.
|
|
func wait(ctx context.Context, delay time.Duration) error {
|
|
timer := time.NewTimer(delay)
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
// Handle the case where the timer and context deadline end
|
|
// simultaneously by prioritizing the timer expiration nil value
|
|
// response.
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
return context.Cause(ctx)
|
|
}
|
|
case <-timer.C:
|
|
}
|
|
|
|
return nil
|
|
}
|