1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-03-11 14:49:19 +02:00

Unify OTLP exporter retry logic (#2095)

* Add retry internal package

* Use retry package in connection

* Use retry package for otlpconfig

* Use the retry package in otlptracegrpc

* Use the retry package in otlptracehttp

* Add changes to CHANGELOG

* Lint internal

* Update otlptracehttp tests

* Update dependencies

* Add retry tests to otlptracehttp

* Remove TestRetry from otlptracehttp
This commit is contained in:
Tyler Yahn 2021-07-22 15:07:38 -07:00 committed by GitHub
parent abe2243718
commit cb607b0ab1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 757 additions and 816 deletions

View File

@ -12,11 +12,15 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Added `WithOSDescription` resource configuration option to set OS (Operating System) description resource attribute (`os.description`). (#1840)
- Added `WithOS` resource configuration option to set all OS (Operating System) resource attributes at once. (#1840)
- Added the `WithRetry` option to the `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` package.
This option is a replacement for the removed `WithMaxAttempts` and `WithBackoff` options. (#2095)
- Added API `LinkFromContext` to return Link which encapsulates SpanContext from provided context and also encapsulates attributes. (#2115)
### Changed
- The `SpanModels` function is now exported from the `go.opentelemetry.io/otel/exporters/zipkin` package to convert OpenTelemetry spans into Zipkin model spans. (#2027)
- Rename the `"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc".RetrySettings` to `RetryConfig`. (#2095)
- Rename the `"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp".RetrySettings` to `RetryConfig`. (#2095)
### Deprecated
@ -28,6 +32,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Removed the deprecated package `go.opentelemetry.io/otel/exporters/trace/zipkin`. (#2020)
- Removed the `"go.opentelemetry.io/otel/sdk/resource".WithBuiltinDetectors` function.
The explicit `With*` options for every built-in detector should be used instead. (#2026 #2097)
- Removed the `WithMaxAttempts` and `WithBackoff` options from the `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp` package.
The retry logic of the package has been updated to match the `otlptracegrpc` package and accordingly a `WithRetry` option is added that should be used instead. (#2095)
- Removed metrics test package `go.opentelemetry.io/otel/sdk/export/metric/metrictest`. (#2105)
### Fixed

View File

@ -16,14 +16,12 @@ package connection
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/cenkalti/backoff/v4"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -31,6 +29,7 @@ import (
"google.golang.org/grpc/encoding/gzip"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
@ -48,6 +47,7 @@ type Connection struct {
// these fields are read-only after constructor is finished
cfg otlpconfig.Config
SCfg otlpconfig.SignalConfig
requestFunc retry.RequestFunc
metadata metadata.MD
newConnectionHandler func(cc *grpc.ClientConn)
@ -66,6 +66,7 @@ func NewConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler
c := new(Connection)
c.newConnectionHandler = handler
c.cfg = cfg
c.requestFunc = cfg.RetryConfig.RequestFunc(evaluate)
c.SCfg = sCfg
if len(c.SCfg.Headers) > 0 {
c.metadata = metadata.New(c.SCfg.Headers)
@ -287,88 +288,24 @@ func (c *Connection) ContextWithStop(ctx context.Context) (context.Context, cont
}
func (c *Connection) DoRequest(ctx context.Context, fn func(context.Context) error) error {
expBackoff := newExponentialBackoff(c.cfg.RetrySettings)
for {
ctx, cancel := c.ContextWithStop(ctx)
defer cancel()
return c.requestFunc(ctx, func(ctx context.Context) error {
err := fn(ctx)
if err == nil {
// request succeeded.
// nil is converted to OK.
if status.Code(err) == codes.OK {
// Success.
return nil
}
if !c.cfg.RetrySettings.Enabled {
return err
}
// We have an error, check gRPC status code.
st := status.Convert(err)
if st.Code() == codes.OK {
// Not really an error, still success.
return nil
}
// Now, this is this a real error.
if !shouldRetry(st.Code()) {
// It is not a retryable error, we should not retry.
return err
}
// Need to retry.
throttle := getThrottleDuration(st)
backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
// throw away the batch
err = fmt.Errorf("max elapsed time expired: %w", err)
return err
}
var delay time.Duration
if backoffDelay > throttle {
delay = backoffDelay
} else {
if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime {
err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err)
return err
}
// Respect server throttling.
delay = throttle
}
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
err = func() error {
dt := time.NewTimer(delay)
defer dt.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-c.stopCh:
return fmt.Errorf("interrupted due to shutdown: %w", err)
case <-dt.C:
}
return nil
}()
if err != nil {
return err
}
}
return err
})
}
func shouldRetry(code codes.Code) bool {
switch code {
case codes.OK:
// Success. This function should not be called for this code, the best we
// can do is tell the caller not to retry.
return false
// evaluate returns if err is retry-able and a duration to wait for if an
// explicit throttle time is included in err.
func evaluate(err error) (bool, time.Duration) {
s := status.Convert(err)
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
@ -376,54 +313,20 @@ func shouldRetry(code codes.Code) bool {
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// These are retryable errors.
return true
case codes.Unknown,
codes.InvalidArgument,
codes.Unauthenticated,
codes.PermissionDenied,
codes.NotFound,
codes.AlreadyExists,
codes.FailedPrecondition,
codes.Unimplemented,
codes.Internal:
// These are fatal errors, don't retry.
return false
default:
// Don't retry on unknown codes.
return false
return true, throttleDelay(s)
}
// Not a retry-able error.
return false, 0
}
func getThrottleDuration(status *status.Status) time.Duration {
// See if throttling information is available.
// throttleDelay returns a duration to wait for if an explicit throttle time
// is included in the response status.
func throttleDelay(status *status.Status) time.Duration {
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
// We are throttled. Wait before retrying as requested by the server.
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
return t.RetryDelay.AsDuration()
}
}
return 0
}
func newExponentialBackoff(rs otlpconfig.RetrySettings) *backoff.ExponentialBackOff {
// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := &backoff.ExponentialBackOff{
InitialInterval: rs.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: rs.MaxInterval,
MaxElapsedTime: rs.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()
return expBackoff
}

View File

@ -15,76 +15,132 @@
package connection
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
)
func TestGetThrottleDuration(t *testing.T) {
tts := []struct {
stsFn func() (*status.Status, error)
throttle time.Duration
func TestThrottleDuration(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
expected time.Duration
}{
{
stsFn: func() (*status.Status, error) {
return status.New(
codes.OK,
"status with no retry info",
), nil
},
throttle: 0,
status: status.New(c, "no retry info"),
expected: 0,
},
{
stsFn: func() (*status.Status, error) {
st := status.New(codes.ResourceExhausted, "status with retry info")
return st.WithDetails(
&errdetails.RetryInfo{RetryDelay: durationpb.New(15 * time.Millisecond)},
status: func() *status.Status {
s, err := status.New(c, "single retry info").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Millisecond),
},
)
},
throttle: 15 * time.Millisecond,
require.NoError(t, err)
return s
}(),
expected: 15 * time.Millisecond,
},
{
stsFn: func() (*status.Status, error) {
st := status.New(codes.ResourceExhausted, "status with error info detail")
return st.WithDetails(
status: func() *status.Status {
s, err := status.New(c, "error info").WithDetails(
&errdetails.ErrorInfo{Reason: "no throttle detail"},
)
},
throttle: 0,
require.NoError(t, err)
return s
}(),
expected: 0,
},
{
stsFn: func() (*status.Status, error) {
st := status.New(codes.ResourceExhausted, "status with error info and retry info")
return st.WithDetails(
&errdetails.ErrorInfo{Reason: "no throttle detail"},
&errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)},
status: func() *status.Status {
s, err := status.New(c, "error and retry info").WithDetails(
&errdetails.ErrorInfo{Reason: "with throttle detail"},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
)
},
throttle: 13 * time.Minute,
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
},
{
stsFn: func() (*status.Status, error) {
st := status.New(codes.ResourceExhausted, "status with two retry info should take the first")
return st.WithDetails(
&errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)},
&errdetails.RetryInfo{RetryDelay: durationpb.New(18 * time.Minute)},
status: func() *status.Status {
s, err := status.New(c, "double retry info").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Minute),
},
)
},
throttle: 13 * time.Minute,
require.NoError(t, err)
return s
}(),
expected: 13 * time.Minute,
},
}
for _, tt := range tts {
sts, _ := tt.stsFn()
t.Run(sts.Message(), func(t *testing.T) {
th := getThrottleDuration(sts)
require.Equal(t, tt.throttle, th)
for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
require.Equal(t, tc.expected, throttleDelay(tc.status))
})
}
}
func TestEvaluate(t *testing.T) {
retryable := map[codes.Code]bool{
codes.OK: false,
codes.Canceled: true,
codes.Unknown: false,
codes.InvalidArgument: false,
codes.DeadlineExceeded: true,
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: true,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
codes.Unimplemented: false,
codes.Internal: false,
codes.Unavailable: true,
codes.DataLoss: true,
codes.Unauthenticated: false,
}
for c, want := range retryable {
got, _ := evaluate(status.Error(c, ""))
assert.Equalf(t, want, got, "evaluate(%s)", c)
}
}
func TestDoRequest(t *testing.T) {
ev := func(error) (bool, time.Duration) { return false, 0 }
c := new(Connection)
c.requestFunc = retry.Config{}.RequestFunc(ev)
c.stopCh = make(chan struct{})
ctx := context.Background()
assert.NoError(t, c.DoRequest(ctx, func(ctx context.Context) error {
return nil
}))
assert.NoError(t, c.DoRequest(ctx, func(ctx context.Context) error {
return status.Error(codes.OK, "")
}))
assert.ErrorIs(t, c.DoRequest(ctx, func(ctx context.Context) error {
return assert.AnError
}), assert.AnError)
}

View File

@ -21,34 +21,19 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"
)
const (
// DefaultMaxAttempts describes how many times the driver
// should retry the sending of the payload in case of a
// retryable error.
DefaultMaxAttempts int = 5
// DefaultTracesPath is a default URL path for endpoint that
// receives spans.
DefaultTracesPath string = "/v1/traces"
// DefaultBackoff is a default base backoff time used in the
// exponential backoff strategy.
DefaultBackoff time.Duration = 300 * time.Millisecond
// DefaultTimeout is a default max waiting time for the backend to process
// each span batch.
DefaultTimeout time.Duration = 10 * time.Second
)
var (
// defaultRetrySettings is a default settings for the retry policy.
defaultRetrySettings = RetrySettings{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}
)
type (
SignalConfig struct {
Endpoint string
@ -67,15 +52,12 @@ type (
// Signal specific configurations
Traces SignalConfig
// HTTP configurations
MaxAttempts int
Backoff time.Duration
RetryConfig retry.Config
// gRPC configurations
ReconnectionPeriod time.Duration
ServiceConfig string
DialOptions []grpc.DialOption
RetrySettings RetrySettings
}
)
@ -87,9 +69,7 @@ func NewDefaultConfig() Config {
Compression: NoCompression,
Timeout: DefaultTimeout,
},
MaxAttempts: DefaultMaxAttempts,
Backoff: DefaultBackoff,
RetrySettings: defaultRetrySettings,
RetryConfig: retry.DefaultConfig,
}
return c
@ -219,9 +199,9 @@ func WithURLPath(urlPath string) GenericOption {
})
}
func WithRetry(settings RetrySettings) GenericOption {
func WithRetry(rc retry.Config) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.RetrySettings = settings
cfg.RetryConfig = rc
})
}
@ -256,15 +236,3 @@ func WithTimeout(duration time.Duration) GenericOption {
cfg.Traces.Timeout = duration
})
}
func WithMaxAttempts(maxAttempts int) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.MaxAttempts = maxAttempts
})
}
func WithBackoff(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Backoff = duration
})
}

View File

@ -14,8 +14,6 @@
package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
import "time"
const (
// DefaultCollectorPort is the port the Exporter will attempt connect to
// if no collector port is provided.
@ -47,18 +45,3 @@ const (
// MarshalJSON tells the driver to send using json format.
MarshalJSON
)
// RetrySettings defines configuration for retrying batches in case of export failure
// using an exponential backoff.
type RetrySettings struct {
// Enabled indicates whether to not retry sending batches in case of export failure.
Enabled bool
// InitialInterval the time to wait after the first failure before retrying.
InitialInterval time.Duration
// MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between
// consecutive retries will always be `MaxInterval`.
MaxInterval time.Duration
// MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch.
// Once this value is reached, the data is discarded.
MaxElapsedTime time.Duration
}

View File

@ -0,0 +1,130 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retry
import (
"context"
"fmt"
"time"
"github.com/cenkalti/backoff/v4"
)
// DefaultConfig are the recommended defaults to use.
var DefaultConfig = Config{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}
// Config defines configuration for retrying batches in case of export failure
// using an exponential backoff.
type Config struct {
// Enabled indicates whether to not retry sending batches in case of
// export failure.
Enabled bool
// InitialInterval the time to wait after the first failure before
// retrying.
InitialInterval time.Duration
// MaxInterval is the upper bound on backoff interval. Once this value is
// reached the delay between consecutive retries will always be
// `MaxInterval`.
MaxInterval time.Duration
// MaxElapsedTime is the maximum amount of time (including retries) spent
// trying to send a request/batch. Once this value is reached, the data
// is discarded.
MaxElapsedTime time.Duration
}
// RequestFunc wraps a request with retry logic.
type RequestFunc func(context.Context, func(context.Context) error) error
// EvaluateFunc returns if an error is retry-able and if an explicit throttle
// duration should be honored that was included in the error.
type EvaluateFunc func(error) (bool, time.Duration)
func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc {
if !c.Enabled {
return func(ctx context.Context, fn func(context.Context) error) error {
return fn(ctx)
}
}
// Do not use NewExponentialBackOff since it calls Reset and the code here
// must call Reset after changing the InitialInterval (this saves an
// unnecessary call to Now).
b := &backoff.ExponentialBackOff{
InitialInterval: c.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: c.MaxInterval,
MaxElapsedTime: c.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b.Reset()
return func(ctx context.Context, fn func(context.Context) error) error {
for {
err := fn(ctx)
if err == nil {
return nil
}
retryable, throttle := evaluate(err)
if !retryable {
return err
}
bOff := b.NextBackOff()
if bOff == backoff.Stop {
return fmt.Errorf("max retry time elapsed: %w", err)
}
// Wait for the greater of the backoff or throttle delay.
var delay time.Duration
if bOff > throttle {
delay = bOff
} else {
elapsed := b.GetElapsedTime()
if b.MaxElapsedTime != 0 && elapsed+throttle > b.MaxElapsedTime {
return fmt.Errorf("max retry time would elapse: %w", err)
}
delay = throttle
}
if err := waitFunc(ctx, delay); err != nil {
return err
}
}
}
}
// Allow override for testing.
var waitFunc = wait
func wait(ctx context.Context, delay time.Duration) error {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
}
return nil
}

View File

@ -0,0 +1,195 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retry
import (
"context"
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestWait(t *testing.T) {
tests := []struct {
ctx context.Context
delay time.Duration
expected error
}{
{
ctx: context.Background(),
delay: time.Duration(0),
expected: nil,
},
{
ctx: context.Background(),
delay: time.Duration(1),
expected: nil,
},
{
ctx: context.Background(),
delay: time.Duration(-1),
expected: nil,
},
{
ctx: func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}(),
expected: context.Canceled,
},
}
for _, test := range tests {
assert.Equal(t, test.expected, wait(test.ctx, test.delay))
}
}
func TestNonRetryableError(t *testing.T) {
ev := func(error) (bool, time.Duration) { return false, 0 }
reqFunc := Config{
Enabled: true,
InitialInterval: 1 * time.Nanosecond,
MaxInterval: 1 * time.Nanosecond,
// Never stop retrying.
MaxElapsedTime: 0,
}.RequestFunc(ev)
ctx := context.Background()
assert.NoError(t, reqFunc(ctx, func(context.Context) error {
return nil
}))
assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error {
return assert.AnError
}), assert.AnError)
}
func TestThrottledRetry(t *testing.T) {
// Ensure the throttle delay is used by making longer than backoff delay.
throttleDelay, backoffDelay := time.Second, time.Nanosecond
ev := func(error) (bool, time.Duration) {
// Retry everything with a throttle delay.
return true, throttleDelay
}
reqFunc := Config{
Enabled: true,
InitialInterval: backoffDelay,
MaxInterval: backoffDelay,
// Never stop retrying.
MaxElapsedTime: 0,
}.RequestFunc(ev)
origWait := waitFunc
var done bool
waitFunc = func(_ context.Context, delay time.Duration) error {
assert.Equal(t, throttleDelay, delay, "retry not throttled")
// Try twice to ensure call is attempted again after delay.
if done {
return assert.AnError
}
done = true
return nil
}
defer func() { waitFunc = origWait }()
ctx := context.Background()
assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error {
return errors.New("not this error")
}), assert.AnError)
}
func TestBackoffRetry(t *testing.T) {
ev := func(error) (bool, time.Duration) { return true, 0 }
delay := time.Nanosecond
reqFunc := Config{
Enabled: true,
InitialInterval: delay,
MaxInterval: delay,
// Never stop retrying.
MaxElapsedTime: 0,
}.RequestFunc(ev)
origWait := waitFunc
var done bool
waitFunc = func(_ context.Context, d time.Duration) error {
assert.Equal(t, delay, d, "retry not backoffed")
// Try twice to ensure call is attempted again after delay.
if done {
return assert.AnError
}
done = true
return nil
}
defer func() { waitFunc = origWait }()
ctx := context.Background()
assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error {
return errors.New("not this error")
}), assert.AnError)
}
func TestThrottledRetryGreaterThanMaxElapsedTime(t *testing.T) {
// Ensure the throttle delay is used by making longer than backoff delay.
tDelay, bDelay := time.Hour, time.Nanosecond
ev := func(error) (bool, time.Duration) { return true, tDelay }
reqFunc := Config{
Enabled: true,
InitialInterval: bDelay,
MaxInterval: bDelay,
MaxElapsedTime: tDelay - (time.Nanosecond),
}.RequestFunc(ev)
ctx := context.Background()
assert.Contains(t, reqFunc(ctx, func(context.Context) error {
return assert.AnError
}).Error(), "max retry time would elapse: ")
}
func TestMaxElapsedTime(t *testing.T) {
ev := func(error) (bool, time.Duration) { return true, 0 }
delay := time.Nanosecond
reqFunc := Config{
Enabled: true,
// InitialInterval > MaxElapsedTime means immediate return.
InitialInterval: 2 * delay,
MaxElapsedTime: delay,
}.RequestFunc(ev)
ctx := context.Background()
assert.Contains(t, reqFunc(ctx, func(context.Context) error {
return assert.AnError
}).Error(), "max retry time elapsed: ")
}
func TestRetryNotEnabled(t *testing.T) {
ev := func(error) (bool, time.Duration) {
t.Error("evaluated retry when not enabled")
return false, 0
}
reqFunc := Config{}.RequestFunc(ev)
ctx := context.Background()
assert.NoError(t, reqFunc(ctx, func(context.Context) error {
return nil
}))
assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error {
return assert.AnError
}), assert.AnError)
}

View File

@ -25,10 +25,8 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlptracetest"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -171,7 +169,7 @@ func TestNew_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) {
reconnectionPeriod := 20 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlptracegrpc.WithRetry(otlptracegrpc.RetrySettings{Enabled: false}),
otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}),
otlptracegrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()
@ -222,280 +220,13 @@ func TestNew_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) {
require.NoError(t, nmc.Stop())
}
func TestExporterExportFailureAndRecoveryModes(t *testing.T) {
tts := []struct {
name string
errors []error
rs otlptracegrpc.RetrySettings
fn func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector)
opts []otlptracegrpc.Option
}{
{
name: "Do not retry if succeeded",
fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) {
require.NoError(t, exp.ExportSpans(ctx, roSpans))
span := mc.getSpans()
require.Len(t, span, 1)
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 success request.")
},
},
{
name: "Do not retry if 'error' is ok",
errors: []error{
status.Error(codes.OK, ""),
},
fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) {
require.NoError(t, exp.ExportSpans(ctx, roSpans))
span := mc.getSpans()
require.Len(t, span, 0)
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 error OK request.")
},
},
{
name: "Fail three times and succeed",
rs: otlptracegrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: 300 * time.Millisecond,
InitialInterval: 2 * time.Millisecond,
MaxInterval: 10 * time.Millisecond,
},
errors: []error{
status.Error(codes.Unavailable, "backend under pressure"),
status.Error(codes.Unavailable, "backend under pressure"),
status.Error(codes.Unavailable, "backend under pressure"),
},
fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) {
require.NoError(t, exp.ExportSpans(ctx, roSpans))
span := mc.getSpans()
require.Len(t, span, 1)
require.Equal(t, 4, mc.traceSvc.requests, "trace service must receive 3 failure requests and 1 success request.")
},
},
{
name: "Permanent error should not be retried",
rs: otlptracegrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: 300 * time.Millisecond,
InitialInterval: 2 * time.Millisecond,
MaxInterval: 10 * time.Millisecond,
},
errors: []error{
status.Error(codes.InvalidArgument, "invalid arguments"),
},
fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) {
require.Error(t, exp.ExportSpans(ctx, roSpans))
span := mc.getSpans()
require.Len(t, span, 0)
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 error requests.")
},
},
{
name: "Test all transient errors and succeed",
rs: otlptracegrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: 500 * time.Millisecond,
InitialInterval: 1 * time.Millisecond,
MaxInterval: 2 * time.Millisecond,
},
errors: []error{
status.Error(codes.Canceled, ""),
status.Error(codes.DeadlineExceeded, ""),
status.Error(codes.ResourceExhausted, ""),
status.Error(codes.Aborted, ""),
status.Error(codes.OutOfRange, ""),
status.Error(codes.Unavailable, ""),
status.Error(codes.DataLoss, ""),
},
fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) {
require.NoError(t, exp.ExportSpans(ctx, roSpans))
span := mc.getSpans()
require.Len(t, span, 1)
require.Equal(t, 8, mc.traceSvc.requests, "trace service must receive 9 failure requests and 1 success request.")
},
},
{
name: "Retry should honor server throttling",
rs: otlptracegrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: time.Minute,
InitialInterval: time.Nanosecond,
MaxInterval: time.Nanosecond,
},
opts: []otlptracegrpc.Option{
otlptracegrpc.WithTimeout(time.Millisecond * 100),
},
errors: []error{
newThrottlingError(codes.ResourceExhausted, time.Second*30),
},
fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) {
err := exp.ExportSpans(ctx, roSpans)
require.Error(t, err)
require.Equal(t, "context deadline exceeded", err.Error())
span := mc.getSpans()
require.Len(t, span, 0)
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests and 1 success request.")
},
},
{
name: "Retry should fail if server throttling is higher than the MaxElapsedTime",
rs: otlptracegrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: time.Millisecond * 100,
InitialInterval: time.Nanosecond,
MaxInterval: time.Nanosecond,
},
errors: []error{
newThrottlingError(codes.ResourceExhausted, time.Minute),
},
fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) {
err := exp.ExportSpans(ctx, roSpans)
require.Error(t, err)
require.Equal(t, "max elapsed time expired when respecting server throttle: rpc error: code = ResourceExhausted desc = ", err.Error())
span := mc.getSpans()
require.Len(t, span, 0)
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests and 1 success request.")
},
},
{
name: "Retry stops if takes too long",
rs: otlptracegrpc.RetrySettings{
Enabled: true,
MaxElapsedTime: time.Millisecond * 100,
InitialInterval: time.Millisecond * 50,
MaxInterval: time.Millisecond * 50,
},
errors: []error{
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
status.Error(codes.Unavailable, "unavailable"),
},
fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) {
err := exp.ExportSpans(ctx, roSpans)
require.Error(t, err)
require.Equal(t, "max elapsed time expired: rpc error: code = Unavailable desc = unavailable", err.Error())
span := mc.getSpans()
require.Len(t, span, 0)
require.LessOrEqual(t, 1, mc.traceSvc.requests, "trace service must receive at least 1 failure requests.")
},
},
{
name: "Disabled retry",
rs: otlptracegrpc.RetrySettings{
Enabled: false,
},
errors: []error{
status.Error(codes.Unavailable, "unavailable"),
},
fn: func(t *testing.T, ctx context.Context, exp *otlptrace.Exporter, mc *mockCollector) {
err := exp.ExportSpans(ctx, roSpans)
require.Error(t, err)
require.Equal(t, "rpc error: code = Unavailable desc = unavailable", err.Error())
span := mc.getSpans()
require.Len(t, span, 0)
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests.")
},
},
}
for _, tt := range tts {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
mc := runMockCollectorWithConfig(t, &mockConfig{
errors: tt.errors,
})
opts := []otlptracegrpc.Option{
otlptracegrpc.WithRetry(tt.rs),
}
if len(tt.opts) != 0 {
opts = append(opts, tt.opts...)
}
exp := newGRPCExporter(t, ctx, mc.endpoint, opts...)
tt.fn(t, ctx, exp, mc)
require.NoError(t, mc.Stop())
require.NoError(t, exp.Shutdown(ctx))
})
}
}
func TestPermanentErrorsShouldNotBeRetried(t *testing.T) {
permanentErrors := []*status.Status{
status.New(codes.Unknown, "Unknown"),
status.New(codes.InvalidArgument, "InvalidArgument"),
status.New(codes.NotFound, "NotFound"),
status.New(codes.AlreadyExists, "AlreadyExists"),
status.New(codes.FailedPrecondition, "FailedPrecondition"),
status.New(codes.Unimplemented, "Unimplemented"),
status.New(codes.Internal, "Internal"),
status.New(codes.PermissionDenied, ""),
status.New(codes.Unauthenticated, ""),
}
for _, sts := range permanentErrors {
t.Run(sts.Code().String(), func(t *testing.T) {
ctx := context.Background()
mc := runMockCollectorWithConfig(t, &mockConfig{
errors: []error{sts.Err()},
})
exp := newGRPCExporter(t, ctx, mc.endpoint)
err := exp.ExportSpans(ctx, roSpans)
require.Error(t, err)
require.Len(t, mc.getSpans(), 0)
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 permanent error requests.")
require.NoError(t, mc.Stop())
require.NoError(t, exp.Shutdown(ctx))
})
}
}
func newThrottlingError(code codes.Code, duration time.Duration) error {
s := status.New(code, "")
s, _ = s.WithDetails(&errdetails.RetryInfo{RetryDelay: durationpb.New(duration)})
return s.Err()
}
func TestNew_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCollector(t)
reconnectionPeriod := 50 * time.Millisecond
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlptracegrpc.WithRetry(otlptracegrpc.RetrySettings{Enabled: false}),
otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}),
otlptracegrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()
@ -637,7 +368,7 @@ func TestNew_WithTimeout(t *testing.T) {
}()
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint, otlptracegrpc.WithTimeout(tt.timeout), otlptracegrpc.WithRetry(otlptracegrpc.RetrySettings{Enabled: false}))
exp := newGRPCExporter(t, ctx, mc.endpoint, otlptracegrpc.WithTimeout(tt.timeout), otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}))
defer func() {
_ = exp.Shutdown(ctx)
}()

View File

@ -8,9 +8,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.0-RC1
go.opentelemetry.io/otel/sdk v1.0.0-RC1
go.opentelemetry.io/proto/otlp v0.9.0
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.39.0
google.golang.org/protobuf v1.27.1
)
replace go.opentelemetry.io/otel => ../../../..

View File

@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -30,9 +31,9 @@ type Option interface {
applyGRPCOption(*otlpconfig.Config)
}
// RetrySettings defines configuration for retrying batches in case of export failure
// using an exponential backoff.
type RetrySettings otlpconfig.RetrySettings
// RetryConfig defines configuration for retrying batches in case of export
// failure using an exponential backoff.
type RetryConfig retry.Config
type wrappedOption struct {
otlpconfig.GRPCOption
@ -121,12 +122,11 @@ func WithTimeout(duration time.Duration) Option {
return wrappedOption{otlpconfig.WithTimeout(duration)}
}
// WithRetry configures the retry policy for transient errors that may occurs when
// exporting traces. An exponential back-off algorithm is used to
// ensure endpoints are not overwhelmed with retries. If unset, the default
// ensure endpoints are not overwhelmed with retries. If unset, the default
// retry policy will retry after 5 seconds and increase exponentially after each
// WithRetry configures the retry policy for transient errors that may occurs
// when exporting traces. An exponential back-off algorithm is used to ensure
// endpoints are not overwhelmed with retries. If unset, the default retry
// policy will retry after 5 seconds and increase exponentially after each
// error for a total of 1 minute.
func WithRetry(settings RetrySettings) Option {
return wrappedOption{otlpconfig.WithRetry(otlpconfig.RetrySettings(settings))}
func WithRetry(settings RetryConfig) Option {
return wrappedOption{otlpconfig.WithRetry(retry.Config(settings))}
}

View File

@ -21,26 +21,34 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"net"
"net/http"
"path"
"strconv"
"strings"
"sync"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"
"google.golang.org/protobuf/proto"
"go.opentelemetry.io/otel"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
)
const contentTypeProto = "application/x-protobuf"
var gzPool = sync.Pool{
New: func() interface{} {
w := gzip.NewWriter(ioutil.Discard)
return w
},
}
// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
// DefaultTransport is overwritten with some different implementation
@ -59,11 +67,12 @@ var ourTransport = &http.Transport{
}
type client struct {
name string
cfg otlpconfig.SignalConfig
generalCfg otlpconfig.Config
client *http.Client
stopCh chan struct{}
name string
cfg otlpconfig.SignalConfig
generalCfg otlpconfig.Config
requestFunc retry.RequestFunc
client *http.Client
stopCh chan struct{}
}
var _ otlptrace.Client = (*client)(nil)
@ -77,7 +86,7 @@ func NewClient(opts ...Option) otlptrace.Client {
}
for pathPtr, defaultPath := range map[*string]string{
&cfg.Traces.URLPath: defaultTracesPath,
&cfg.Traces.URLPath: otlpconfig.DefaultTracesPath,
} {
tmp := strings.TrimSpace(*pathPtr)
if tmp == "" {
@ -90,15 +99,6 @@ func NewClient(opts ...Option) otlptrace.Client {
}
*pathPtr = tmp
}
if cfg.MaxAttempts <= 0 {
cfg.MaxAttempts = defaultMaxAttempts
}
if cfg.MaxAttempts > defaultMaxAttempts {
cfg.MaxAttempts = defaultMaxAttempts
}
if cfg.Backoff <= 0 {
cfg.Backoff = defaultBackoff
}
httpClient := &http.Client{
Transport: ourTransport,
@ -112,11 +112,12 @@ func NewClient(opts ...Option) otlptrace.Client {
stopCh := make(chan struct{})
return &client{
name: "traces",
cfg: cfg.Traces,
generalCfg: cfg,
stopCh: stopCh,
client: httpClient,
name: "traces",
cfg: cfg.Traces,
generalCfg: cfg,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
stopCh: stopCh,
client: httpClient,
}
}
@ -151,41 +152,150 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
if err != nil {
return err
}
return d.send(ctx, rawRequest)
}
func (d *client) send(ctx context.Context, rawRequest []byte) error {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath)
var cancel context.CancelFunc
ctx, cancel = d.contextWithStop(ctx)
ctx, cancel := d.contextWithStop(ctx)
defer cancel()
for i := 0; i < d.generalCfg.MaxAttempts; i++ {
response, err := d.singleSend(ctx, rawRequest, address)
request, err := d.newRequest(rawRequest)
if err != nil {
return err
}
return d.requestFunc(ctx, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
request.reset(ctx)
resp, err := d.client.Do(request.Request)
if err != nil {
return err
}
// We don't care about the body, so try to read it
// into /dev/null and close it immediately. The
// reading part is to facilitate connection reuse.
_, _ = io.Copy(ioutil.Discard, response.Body)
_ = response.Body.Close()
switch response.StatusCode {
var rErr error
switch resp.StatusCode {
case http.StatusOK:
return nil
case http.StatusTooManyRequests:
fallthrough
case http.StatusServiceUnavailable:
select {
case <-time.After(getWaitDuration(d.generalCfg.Backoff, i)):
continue
case <-ctx.Done():
return ctx.Err()
// Success, do not retry.
case http.StatusTooManyRequests,
http.StatusServiceUnavailable:
// Retry-able failure.
rErr = newResponseError(resp.Header)
// Going to retry, drain the body to reuse the connection.
if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil {
_ = resp.Body.Close()
return err
}
default:
return fmt.Errorf("failed to send %s to %s with HTTP status %s", d.name, address, response.Status)
rErr = fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status)
}
if err := resp.Body.Close(); err != nil {
return err
}
return rErr
})
}
func (d *client) newRequest(body []byte) (request, error) {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath)
r, err := http.NewRequest(http.MethodPost, address, nil)
if err != nil {
return request{Request: r}, err
}
for k, v := range d.cfg.Headers {
r.Header.Set(k, v)
}
r.Header.Set("Content-Type", contentTypeProto)
req := request{Request: r}
switch Compression(d.cfg.Compression) {
case NoCompression:
r.ContentLength = (int64)(len(body))
req.bodyReader = bodyReader(body)
case GzipCompression:
// Ensure the content length is not used.
r.ContentLength = -1
r.Header.Set("Content-Encoding", "gzip")
gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)
var b bytes.Buffer
gz.Reset(&b)
if _, err := gz.Write(body); err != nil {
return req, err
}
// Close needs to be called to ensure body if fully written.
if err := gz.Close(); err != nil {
return req, err
}
req.bodyReader = bodyReader(b.Bytes())
}
return req, nil
}
// bodyReader returns a closure returning a new reader for buf.
func bodyReader(buf []byte) func() io.ReadCloser {
return func() io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader(buf))
}
}
// request wraps an http.Request with a resettable body reader.
type request struct {
*http.Request
// bodyReader allows the same body to be used for multiple requests.
bodyReader func() io.ReadCloser
}
// reset reinitializes the request Body and uses ctx for the request.
func (r *request) reset(ctx context.Context) {
r.Body = r.bodyReader()
r.Request = r.Request.WithContext(ctx)
}
// retryableError represents a request failure that can be retried.
type retryableError struct {
throttle int64
}
// newResponseError returns a retryableError and will extract any explicit
// throttle delay contained in headers.
func newResponseError(header http.Header) error {
var rErr retryableError
if s, ok := header["Retry-After"]; ok {
if t, err := strconv.ParseInt(s[0], 10, 64); err == nil {
rErr.throttle = t
}
}
return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.MaxAttempts)
return rErr
}
func (e retryableError) Error() string {
return "retry-able request failure"
}
// evaluate returns if err is retry-able. If it is and it includes an explicit
// throttling delay, that delay is also returned.
func evaluate(err error) (bool, time.Duration) {
if err == nil {
return false, 0
}
rErr, ok := err.(retryableError)
if !ok {
return false, 0
}
return true, time.Duration(rErr.throttle)
}
func (d *client) getScheme() string {
@ -195,26 +305,6 @@ func (d *client) getScheme() string {
return "https"
}
func getWaitDuration(backoff time.Duration, i int) time.Duration {
// Strategy: after nth failed attempt, attempt resending after
// k * initialBackoff + jitter, where k is a random number in
// range [0, 2^n-1), and jitter is a random percentage of
// initialBackoff from [-5%, 5%).
//
// Based on
// https://en.wikipedia.org/wiki/Exponential_backoff#Example_exponential_backoff_algorithm
//
// Jitter is our addition.
// There won't be an overflow, since i is capped to
// defaultMaxAttempts (5).
upperK := (int64)(1) << (i + 1)
jitterPercent := (rand.Float64() - 0.5) / 10.
jitter := jitterPercent * (float64)(backoff)
k := rand.Int63n(upperK)
return (time.Duration)(k)*backoff + (time.Duration)(jitter)
}
func (d *client) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the client's stop
// channel.
@ -230,51 +320,3 @@ func (d *client) contextWithStop(ctx context.Context) (context.Context, context.
}(ctx, cancel)
return ctx, cancel
}
func (d *client) singleSend(ctx context.Context, rawRequest []byte, address string) (*http.Response, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodPost, address, nil)
if err != nil {
return nil, err
}
bodyReader, contentLength, headers := d.prepareBody(rawRequest)
// Not closing bodyReader through defer, the HTTP Client's
// Transport will do it for us
request.Body = bodyReader
request.ContentLength = contentLength
for key, values := range headers {
for _, value := range values {
request.Header.Add(key, value)
}
}
return d.client.Do(request)
}
func (d *client) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) {
var bodyReader io.ReadCloser
headers := http.Header{}
for k, v := range d.cfg.Headers {
headers.Set(k, v)
}
contentLength := (int64)(len(rawRequest))
headers.Set("Content-Type", contentTypeProto)
requestReader := bytes.NewBuffer(rawRequest)
switch Compression(d.cfg.Compression) {
case NoCompression:
bodyReader = ioutil.NopCloser(requestReader)
case GzipCompression:
preader, pwriter := io.Pipe()
go func() {
defer pwriter.Close()
gzipper := gzip.NewWriter(pwriter)
defer gzipper.Close()
_, err := io.Copy(gzipper, requestReader)
if err != nil {
otel.Handle(fmt.Errorf("otlphttp: failed to gzip request: %v", err))
}
}()
headers.Set("Content-Encoding", "gzip")
bodyReader = preader
contentLength = -1
}
return bodyReader, contentLength, headers
}

View File

@ -60,6 +60,55 @@ func TestEndToEnd(t *testing.T) {
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
},
},
{
name: "retry",
opts: []otlptracehttp.Option{
otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
Enabled: true,
InitialInterval: time.Nanosecond,
MaxInterval: time.Nanosecond,
// Do not stop trying.
MaxElapsedTime: 0,
}),
},
mcCfg: mockCollectorConfig{
InjectHTTPStatus: []int{503, 429},
},
},
{
name: "retry with gzip compression",
opts: []otlptracehttp.Option{
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
Enabled: true,
InitialInterval: time.Nanosecond,
MaxInterval: time.Nanosecond,
// Do not stop trying.
MaxElapsedTime: 0,
}),
},
mcCfg: mockCollectorConfig{
InjectHTTPStatus: []int{503, 503},
},
},
{
name: "retry with throttle",
opts: []otlptracehttp.Option{
otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
Enabled: true,
InitialInterval: time.Nanosecond,
MaxInterval: time.Nanosecond,
// Do not stop trying.
MaxElapsedTime: 0,
}),
},
mcCfg: mockCollectorConfig{
InjectHTTPStatus: []int{503},
InjectResponseHeader: []map[string]string{
{"Retry-After": "10"},
},
},
},
{
name: "with empty paths (forced to defaults)",
opts: []otlptracehttp.Option{
@ -138,32 +187,6 @@ func TestExporterShutdown(t *testing.T) {
})
}
func TestRetry(t *testing.T) {
statuses := []int{
http.StatusTooManyRequests,
http.StatusServiceUnavailable,
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
client := otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(mc.Endpoint()),
otlptracehttp.WithInsecure(),
otlptracehttp.WithMaxAttempts(len(statuses)+1),
)
ctx := context.Background()
exporter, err := otlptrace.New(ctx, client)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
assert.NoError(t, err)
assert.Len(t, mc.GetSpans(), 1)
}
func TestTimeout(t *testing.T) {
mcCfg := mockCollectorConfig{
InjectDelay: 100 * time.Millisecond,
@ -185,45 +208,21 @@ func TestTimeout(t *testing.T) {
assert.Equal(t, true, os.IsTimeout(err))
}
func TestRetryFailed(t *testing.T) {
statuses := []int{
http.StatusTooManyRequests,
http.StatusServiceUnavailable,
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(mc.Endpoint()),
otlptracehttp.WithInsecure(),
otlptracehttp.WithMaxAttempts(1),
)
ctx := context.Background()
exporter, err := otlptrace.New(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
}
func TestNoRetry(t *testing.T) {
statuses := []int{
http.StatusBadRequest,
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
mc := runMockCollector(t, mockCollectorConfig{
InjectHTTPStatus: []int{http.StatusBadRequest},
})
defer mc.MustStop(t)
driver := otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(mc.Endpoint()),
otlptracehttp.WithInsecure(),
otlptracehttp.WithMaxAttempts(len(statuses)+1),
otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
Enabled: true,
InitialInterval: 1 * time.Nanosecond,
MaxInterval: 1 * time.Nanosecond,
// Never stop retry of retry-able status.
MaxElapsedTime: 0,
}),
)
ctx := context.Background()
exporter, err := otlptrace.New(ctx, driver)
@ -233,7 +232,7 @@ func TestNoRetry(t *testing.T) {
}()
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
assert.Error(t, err)
assert.Equal(t, fmt.Sprintf("failed to send traces to http://%s/v1/traces with HTTP status 400 Bad Request", mc.endpoint), err.Error())
assert.Equal(t, fmt.Sprintf("failed to send traces to http://%s/v1/traces: 400 Bad Request", mc.endpoint), err.Error())
assert.Empty(t, mc.GetSpans())
}
@ -257,88 +256,6 @@ func TestEmptyData(t *testing.T) {
assert.Empty(t, mc.GetSpans())
}
func TestUnreasonableMaxAttempts(t *testing.T) {
// Max attempts is 5, we set collector to fail 7 times and try
// to configure max attempts to be either negative or too
// large. Since we set max attempts to 5 in such cases,
// exporting to the collector should fail.
type testcase struct {
name string
maxAttempts int
}
for _, tc := range []testcase{
{
name: "negative max attempts",
maxAttempts: -3,
},
{
name: "too large max attempts",
maxAttempts: 10,
},
} {
t.Run(tc.name, func(t *testing.T) {
statuses := make([]int, 0, 7)
for i := 0; i < cap(statuses); i++ {
statuses = append(statuses, http.StatusTooManyRequests)
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(mc.Endpoint()),
otlptracehttp.WithInsecure(),
otlptracehttp.WithMaxAttempts(tc.maxAttempts),
otlptracehttp.WithBackoff(time.Millisecond),
)
ctx := context.Background()
exporter, err := otlptrace.New(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(ctx))
}()
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
})
}
}
func TestUnreasonableBackoff(t *testing.T) {
// This sets backoff to negative value, which gets corrected
// to default backoff instead of being used. Default max
// attempts is 5, so we set the collector to fail 4 times, but
// we set the deadline to 3 times of the default backoff, so
// this should show that deadline is not met, meaning that the
// retries weren't immediate (as negative backoff could
// imply).
statuses := make([]int, 0, 4)
for i := 0; i < cap(statuses); i++ {
statuses = append(statuses, http.StatusTooManyRequests)
}
mcCfg := mockCollectorConfig{
InjectHTTPStatus: statuses,
}
mc := runMockCollector(t, mcCfg)
defer mc.MustStop(t)
driver := otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(mc.Endpoint()),
otlptracehttp.WithInsecure(),
otlptracehttp.WithBackoff(-time.Millisecond),
)
ctx, cancel := context.WithTimeout(context.Background(), 3*(300*time.Millisecond))
defer cancel()
exporter, err := otlptrace.New(ctx, driver)
require.NoError(t, err)
defer func() {
assert.NoError(t, exporter.Shutdown(context.Background()))
}()
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
assert.Error(t, err)
assert.Empty(t, mc.GetSpans())
}
func TestCancelledContext(t *testing.T) {
mcCfg := mockCollectorConfig{}
mc := runMockCollector(t, mcCfg)
@ -372,7 +289,13 @@ func TestDeadlineContext(t *testing.T) {
driver := otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(mc.Endpoint()),
otlptracehttp.WithInsecure(),
otlptracehttp.WithBackoff(time.Minute),
otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
Enabled: true,
InitialInterval: 1 * time.Hour,
MaxInterval: 1 * time.Hour,
// Never stop retry of retry-able status.
MaxElapsedTime: 0,
}),
)
ctx := context.Background()
exporter, err := otlptrace.New(ctx, driver)
@ -400,7 +323,13 @@ func TestStopWhileExporting(t *testing.T) {
driver := otlptracehttp.NewClient(
otlptracehttp.WithEndpoint(mc.Endpoint()),
otlptracehttp.WithInsecure(),
otlptracehttp.WithBackoff(time.Minute),
otlptracehttp.WithRetry(otlptracehttp.RetryConfig{
Enabled: true,
InitialInterval: 1 * time.Hour,
MaxInterval: 1 * time.Hour,
// Never stop retry of retry-able status.
MaxElapsedTime: 0,
}),
)
ctx := context.Background()
exporter, err := otlptrace.New(ctx, driver)

View File

@ -4,7 +4,6 @@ go 1.15
require (
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v1.0.0-RC1
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.0.0-RC1
go.opentelemetry.io/proto/otlp v0.9.0
google.golang.org/protobuf v1.27.1

View File

@ -2,6 +2,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/cenkalti/backoff/v4 v4.1.1 h1:G2HAfAmvm/GcKan2oOQpBXOd2tT2G57ZnZGWa1PxPBQ=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=

View File

@ -46,9 +46,10 @@ type mockCollector struct {
spanLock sync.Mutex
spansStorage otlptracetest.SpansStorage
injectHTTPStatus []int
injectContentType string
injectDelay time.Duration
injectHTTPStatus []int
injectResponseHeader []map[string]string
injectContentType string
injectDelay time.Duration
clientTLSConfig *tls.Config
expectedHeaders map[string]string
@ -97,8 +98,9 @@ func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
return
}
h := c.getInjectResponseHeader()
if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 {
writeReply(w, rawResponse, injectedStatus, c.injectContentType)
writeReply(w, rawResponse, injectedStatus, c.injectContentType, h)
return
}
rawRequest, err := readRequest(r)
@ -112,7 +114,7 @@ func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}
writeReply(w, rawResponse, 0, c.injectContentType)
writeReply(w, rawResponse, 0, c.injectContentType, h)
c.spanLock.Lock()
defer c.spanLock.Unlock()
c.spansStorage.AddSpans(request)
@ -149,6 +151,17 @@ func (c *mockCollector) getInjectHTTPStatus() int {
return status
}
func (c *mockCollector) getInjectResponseHeader() (h map[string]string) {
if len(c.injectResponseHeader) == 0 {
return
}
h, c.injectResponseHeader = c.injectResponseHeader[0], c.injectResponseHeader[1:]
if len(c.injectResponseHeader) == 0 {
c.injectResponseHeader = nil
}
return
}
func readRequest(r *http.Request) ([]byte, error) {
if r.Header.Get("Content-Encoding") == "gzip" {
return readGzipBody(r.Body)
@ -170,28 +183,32 @@ func readGzipBody(body io.Reader) ([]byte, error) {
return rawRequest.Bytes(), nil
}
func writeReply(w http.ResponseWriter, rawResponse []byte, injectHTTPStatus int, injectContentType string) {
func writeReply(w http.ResponseWriter, rawResponse []byte, s int, ct string, h map[string]string) {
status := http.StatusOK
if injectHTTPStatus != 0 {
status = injectHTTPStatus
if s != 0 {
status = s
}
contentType := "application/x-protobuf"
if injectContentType != "" {
contentType = injectContentType
if ct != "" {
contentType = ct
}
w.Header().Set("Content-Type", contentType)
for k, v := range h {
w.Header().Add(k, v)
}
w.WriteHeader(status)
_, _ = w.Write(rawResponse)
}
type mockCollectorConfig struct {
TracesURLPath string
Port int
InjectHTTPStatus []int
InjectContentType string
InjectDelay time.Duration
WithTLS bool
ExpectedHeaders map[string]string
TracesURLPath string
Port int
InjectHTTPStatus []int
InjectContentType string
InjectResponseHeader []map[string]string
InjectDelay time.Duration
WithTLS bool
ExpectedHeaders map[string]string
}
func (c *mockCollectorConfig) fillInDefaults() {
@ -207,12 +224,13 @@ func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector {
_, portStr, err := net.SplitHostPort(ln.Addr().String())
require.NoError(t, err)
m := &mockCollector{
endpoint: fmt.Sprintf("localhost:%s", portStr),
spansStorage: otlptracetest.NewSpansStorage(),
injectHTTPStatus: cfg.InjectHTTPStatus,
injectContentType: cfg.InjectContentType,
injectDelay: cfg.InjectDelay,
expectedHeaders: cfg.ExpectedHeaders,
endpoint: fmt.Sprintf("localhost:%s", portStr),
spansStorage: otlptracetest.NewSpansStorage(),
injectHTTPStatus: cfg.InjectHTTPStatus,
injectResponseHeader: cfg.InjectResponseHeader,
injectContentType: cfg.InjectContentType,
injectDelay: cfg.InjectDelay,
expectedHeaders: cfg.ExpectedHeaders,
}
mux := http.NewServeMux()
mux.Handle(cfg.TracesURLPath, http.HandlerFunc(m.serveTraces))

View File

@ -19,19 +19,7 @@ import (
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
)
const (
// defaultMaxAttempts describes how many times the driver
// should retry the sending of the payload in case of a
// retryable error.
defaultMaxAttempts int = 5
// defaultTracesPath is a default URL path for endpoint that
// receives spans.
defaultTracesPath string = "/v1/traces"
// defaultBackoff is a default base backoff time used in the
// exponential backoff strategy.
defaultBackoff time.Duration = 300 * time.Millisecond
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"
)
// Compression describes the compression used for payloads sent to the
@ -52,9 +40,9 @@ type Option interface {
applyHTTPOption(*otlpconfig.Config)
}
// RetrySettings defines configuration for retrying batches in case of export failure
// using an exponential backoff.
type RetrySettings otlpconfig.RetrySettings
// RetryConfig defines configuration for retrying batches in case of export
// failure using an exponential backoff.
type RetryConfig retry.Config
type wrappedOption struct {
otlpconfig.HTTPOption
@ -84,21 +72,6 @@ func WithURLPath(urlPath string) Option {
return wrappedOption{otlpconfig.WithURLPath(urlPath)}
}
// WithMaxAttempts allows one to override how many times the driver
// will try to send the payload in case of retryable errors.
// The max attempts is limited to at most 5 retries. If unset,
// default (5) will be used.
func WithMaxAttempts(maxAttempts int) Option {
return wrappedOption{otlpconfig.WithMaxAttempts(maxAttempts)}
}
// WithBackoff tells the driver to use the duration as a base of the
// exponential backoff strategy. If unset, default (300ms) will be
// used.
func WithBackoff(duration time.Duration) Option {
return wrappedOption{otlpconfig.WithBackoff(duration)}
}
// WithTLSClientConfig can be used to set up a custom TLS
// configuration for the client used to send payloads to the
// collector. Use it if you want to use a custom certificate.
@ -124,3 +97,12 @@ func WithHeaders(headers map[string]string) Option {
func WithTimeout(duration time.Duration) Option {
return wrappedOption{otlpconfig.WithTimeout(duration)}
}
// WithRetry configures the retry policy for transient errors that may occurs
// when exporting traces. An exponential back-off algorithm is used to ensure
// endpoints are not overwhelmed with retries. If unset, the default retry
// policy will retry after 5 seconds and increase exponentially after each
// error for a total of 1 minute.
func WithRetry(rc RetryConfig) Option {
return wrappedOption{otlpconfig.WithRetry(retry.Config(rc))}
}