mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-03-03 14:52:56 +02:00
Implement retry policy for the OTLP/gRPC exporter (#1832)
This was heavily inspired by the retry policy from the https://github.com/open-telemetry/opentelemetry-collector code. Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
parent
ec75390fc4
commit
f92a6d8361
12
CHANGELOG.md
12
CHANGELOG.md
@ -10,6 +10,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
|
||||
### Added
|
||||
|
||||
- Adds `otlpgrpc.WithRetry`option for configuring the retry policy for transient errors on the otlp/gRPC exporter. (#1832)
|
||||
- The following status codes are defined as transient errors:
|
||||
| gRPC Status Code | Description |
|
||||
| ---------------- | ----------- |
|
||||
| 1 | Cancelled |
|
||||
| 4 | Deadline Exceeded |
|
||||
| 8 | Resource Exhausted |
|
||||
| 10 | Aborted |
|
||||
| 10 | Out of Range |
|
||||
| 14 | Unavailable |
|
||||
| 15 | Data Loss |
|
||||
|
||||
### Changed
|
||||
|
||||
- Make `NewSplitDriver` from `go.opentelemetry.io/otel/exporters/otlp` take variadic arguments instead of a `SplitConfig` item.
|
||||
|
@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
|
||||
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
|
||||
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
|
||||
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
|
@ -29,7 +29,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
|
||||
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
|
||||
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
|
||||
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
|
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
|
@ -4,28 +4,12 @@
|
||||
|
||||
This exporter exports OpenTelemetry spans and metrics to the OpenTelemetry Collector.
|
||||
|
||||
|
||||
## Installation and Setup
|
||||
|
||||
The exporter can be installed using standard `go` functionality.
|
||||
|
||||
```bash
|
||||
$ go get -u go.opentelemetry.io/otel/exporters/otlp
|
||||
go get -u go.opentelemetry.io/otel/exporters/otlp
|
||||
```
|
||||
|
||||
A new exporter can be created using the `NewExporter` function.
|
||||
|
||||
## Retries
|
||||
|
||||
The exporter will not, by default, retry failed requests to the collector.
|
||||
However, it is configured in a way that it can be easily enabled.
|
||||
|
||||
To enable retries, the `GRPC_GO_RETRY` environment variable needs to be set to `on`. For example,
|
||||
|
||||
```
|
||||
GRPC_GO_RETRY=on go run .
|
||||
```
|
||||
|
||||
The [default service config](https://github.com/grpc/proposal/blob/master/A6-client-retries.md) used by default is defined to retry failed requests with exponential backoff (`0.3seconds * (2)^retry`) with [a max of `5` retries](https://github.com/open-telemetry/oteps/blob/be2a3fcbaa417ebbf5845cd485d34fdf0ab4a2a4/text/0035-opentelemetry-protocol.md#export-response)).
|
||||
|
||||
These retries are only attempted for reponses that are [deemed "retry-able" by the collector](https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy).
|
||||
|
@ -8,6 +8,7 @@ replace (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cenkalti/backoff/v4 v4.1.0
|
||||
github.com/google/go-cmp v0.5.5
|
||||
github.com/stretchr/testify v1.7.0
|
||||
go.opentelemetry.io/otel v0.20.0
|
||||
@ -17,6 +18,7 @@ require (
|
||||
go.opentelemetry.io/otel/sdk/metric v0.20.0
|
||||
go.opentelemetry.io/otel/trace v0.20.0
|
||||
go.opentelemetry.io/proto/otlp v0.7.0
|
||||
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
|
||||
google.golang.org/grpc v1.37.0
|
||||
google.golang.org/protobuf v1.26.0
|
||||
)
|
||||
|
@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
|
||||
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
|
||||
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
|
||||
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
|
@ -42,31 +42,16 @@ const (
|
||||
// DefaultTimeout is a default max waiting time for the backend to process
|
||||
// each span or metrics batch.
|
||||
DefaultTimeout time.Duration = 10 * time.Second
|
||||
// DefaultServiceConfig is the gRPC service config used if none is
|
||||
// provided by the user.
|
||||
DefaultServiceConfig = `{
|
||||
"methodConfig":[{
|
||||
"name":[
|
||||
{ "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" },
|
||||
{ "service":"opentelemetry.proto.collector.trace.v1.TraceService" }
|
||||
],
|
||||
"retryPolicy":{
|
||||
"MaxAttempts":5,
|
||||
"InitialBackoff":"0.3s",
|
||||
"MaxBackoff":"5s",
|
||||
"BackoffMultiplier":2,
|
||||
"RetryableStatusCodes":[
|
||||
"CANCELLED",
|
||||
"DEADLINE_EXCEEDED",
|
||||
"RESOURCE_EXHAUSTED",
|
||||
"ABORTED",
|
||||
"OUT_OF_RANGE",
|
||||
"UNAVAILABLE",
|
||||
"DATA_LOSS"
|
||||
]
|
||||
}
|
||||
}]
|
||||
}`
|
||||
)
|
||||
|
||||
var (
|
||||
// defaultRetrySettings is a default settings for the retry policy.
|
||||
defaultRetrySettings = otlp.RetrySettings{
|
||||
Enabled: true,
|
||||
InitialInterval: 5 * time.Second,
|
||||
MaxInterval: 30 * time.Second,
|
||||
MaxElapsedTime: time.Minute,
|
||||
}
|
||||
)
|
||||
|
||||
type (
|
||||
@ -88,17 +73,16 @@ type (
|
||||
Metrics SignalConfig
|
||||
Traces SignalConfig
|
||||
|
||||
// General configurations
|
||||
// HTTP configurations
|
||||
Marshaler otlp.Marshaler
|
||||
MaxAttempts int
|
||||
Backoff time.Duration
|
||||
|
||||
// HTTP configuration
|
||||
Marshaler otlp.Marshaler
|
||||
|
||||
// gRPC configurations
|
||||
ReconnectionPeriod time.Duration
|
||||
ServiceConfig string
|
||||
DialOptions []grpc.DialOption
|
||||
RetrySettings otlp.RetrySettings
|
||||
}
|
||||
)
|
||||
|
||||
@ -118,7 +102,7 @@ func NewDefaultConfig() Config {
|
||||
},
|
||||
MaxAttempts: DefaultMaxAttempts,
|
||||
Backoff: DefaultBackoff,
|
||||
ServiceConfig: DefaultServiceConfig,
|
||||
RetrySettings: defaultRetrySettings,
|
||||
}
|
||||
|
||||
return c
|
||||
@ -280,15 +264,9 @@ func WithMetricsURLPath(urlPath string) GenericOption {
|
||||
})
|
||||
}
|
||||
|
||||
func WithMaxAttempts(maxAttempts int) GenericOption {
|
||||
func WithRetry(settings otlp.RetrySettings) GenericOption {
|
||||
return newGenericOption(func(cfg *Config) {
|
||||
cfg.MaxAttempts = maxAttempts
|
||||
})
|
||||
}
|
||||
|
||||
func WithBackoff(duration time.Duration) GenericOption {
|
||||
return newGenericOption(func(cfg *Config) {
|
||||
cfg.Backoff = duration
|
||||
cfg.RetrySettings = settings
|
||||
})
|
||||
}
|
||||
|
||||
@ -374,3 +352,15 @@ func WithMetricsTimeout(duration time.Duration) GenericOption {
|
||||
cfg.Metrics.Timeout = duration
|
||||
})
|
||||
}
|
||||
|
||||
func WithMaxAttempts(maxAttempts int) GenericOption {
|
||||
return newGenericOption(func(cfg *Config) {
|
||||
cfg.MaxAttempts = maxAttempts
|
||||
})
|
||||
}
|
||||
|
||||
func WithBackoff(duration time.Duration) GenericOption {
|
||||
return newGenericOption(func(cfg *Config) {
|
||||
cfg.Backoff = duration
|
||||
})
|
||||
}
|
||||
|
@ -16,12 +16,18 @@ package otlpgrpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"google.golang.org/genproto/googleapis/rpc/errdetails"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"google.golang.org/grpc/encoding/gzip"
|
||||
|
||||
"go.opentelemetry.io/otel/exporters/otlp"
|
||||
@ -276,3 +282,145 @@ func (c *connection) contextWithStop(ctx context.Context) (context.Context, cont
|
||||
}(ctx, cancel)
|
||||
return ctx, cancel
|
||||
}
|
||||
|
||||
func (c *connection) doRequest(ctx context.Context, fn func(context.Context) error) error {
|
||||
expBackoff := newExponentialBackoff(c.cfg.RetrySettings)
|
||||
|
||||
for {
|
||||
err := fn(ctx)
|
||||
if err == nil {
|
||||
// request succeeded.
|
||||
return nil
|
||||
}
|
||||
|
||||
if !c.cfg.RetrySettings.Enabled {
|
||||
return err
|
||||
}
|
||||
|
||||
// We have an error, check gRPC status code.
|
||||
st := status.Convert(err)
|
||||
if st.Code() == codes.OK {
|
||||
// Not really an error, still success.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Now, this is this a real error.
|
||||
|
||||
if !shouldRetry(st.Code()) {
|
||||
// It is not a retryable error, we should not retry.
|
||||
return err
|
||||
}
|
||||
|
||||
// Need to retry.
|
||||
|
||||
throttle := getThrottleDuration(st)
|
||||
|
||||
backoffDelay := expBackoff.NextBackOff()
|
||||
if backoffDelay == backoff.Stop {
|
||||
// throw away the batch
|
||||
err = fmt.Errorf("max elapsed time expired: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var delay time.Duration
|
||||
|
||||
if backoffDelay > throttle {
|
||||
delay = backoffDelay
|
||||
} else {
|
||||
if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime {
|
||||
err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Respect server throttling.
|
||||
delay = throttle
|
||||
}
|
||||
|
||||
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
|
||||
err = func() error {
|
||||
dt := time.NewTimer(delay)
|
||||
defer dt.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-c.stopCh:
|
||||
return fmt.Errorf("interrupted due to shutdown: %w", err)
|
||||
case <-dt.C:
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func shouldRetry(code codes.Code) bool {
|
||||
switch code {
|
||||
case codes.OK:
|
||||
// Success. This function should not be called for this code, the best we
|
||||
// can do is tell the caller not to retry.
|
||||
return false
|
||||
|
||||
case codes.Canceled,
|
||||
codes.DeadlineExceeded,
|
||||
codes.ResourceExhausted,
|
||||
codes.Aborted,
|
||||
codes.OutOfRange,
|
||||
codes.Unavailable,
|
||||
codes.DataLoss:
|
||||
// These are retryable errors.
|
||||
return true
|
||||
|
||||
case codes.Unknown,
|
||||
codes.InvalidArgument,
|
||||
codes.Unauthenticated,
|
||||
codes.PermissionDenied,
|
||||
codes.NotFound,
|
||||
codes.AlreadyExists,
|
||||
codes.FailedPrecondition,
|
||||
codes.Unimplemented,
|
||||
codes.Internal:
|
||||
// These are fatal errors, don't retry.
|
||||
return false
|
||||
|
||||
default:
|
||||
// Don't retry on unknown codes.
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func getThrottleDuration(status *status.Status) time.Duration {
|
||||
// See if throttling information is available.
|
||||
for _, detail := range status.Details() {
|
||||
if t, ok := detail.(*errdetails.RetryInfo); ok {
|
||||
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
|
||||
// We are throttled. Wait before retrying as requested by the server.
|
||||
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
|
||||
}
|
||||
return 0
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func newExponentialBackoff(rs otlp.RetrySettings) *backoff.ExponentialBackOff {
|
||||
// Do not use NewExponentialBackOff since it calls Reset and the code here must
|
||||
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
|
||||
expBackoff := &backoff.ExponentialBackOff{
|
||||
InitialInterval: rs.InitialInterval,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: rs.MaxInterval,
|
||||
MaxElapsedTime: rs.MaxElapsedTime,
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}
|
||||
expBackoff.Reset()
|
||||
|
||||
return expBackoff
|
||||
}
|
||||
|
90
exporters/otlp/otlpgrpc/connection_test.go
Normal file
90
exporters/otlp/otlpgrpc/connection_test.go
Normal file
@ -0,0 +1,90 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package otlpgrpc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"google.golang.org/genproto/googleapis/rpc/errdetails"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
)
|
||||
|
||||
func TestGetThrottleDuration(t *testing.T) {
|
||||
tts := []struct {
|
||||
stsFn func() (*status.Status, error)
|
||||
throttle time.Duration
|
||||
}{
|
||||
{
|
||||
stsFn: func() (*status.Status, error) {
|
||||
return status.New(
|
||||
codes.OK,
|
||||
"status with no retry info",
|
||||
), nil
|
||||
},
|
||||
throttle: 0,
|
||||
},
|
||||
{
|
||||
stsFn: func() (*status.Status, error) {
|
||||
st := status.New(codes.ResourceExhausted, "status with retry info")
|
||||
return st.WithDetails(
|
||||
&errdetails.RetryInfo{RetryDelay: durationpb.New(15 * time.Millisecond)},
|
||||
)
|
||||
},
|
||||
throttle: 15 * time.Millisecond,
|
||||
},
|
||||
{
|
||||
stsFn: func() (*status.Status, error) {
|
||||
st := status.New(codes.ResourceExhausted, "status with error info detail")
|
||||
return st.WithDetails(
|
||||
&errdetails.ErrorInfo{Reason: "no throttle detail"},
|
||||
)
|
||||
},
|
||||
throttle: 0,
|
||||
},
|
||||
{
|
||||
stsFn: func() (*status.Status, error) {
|
||||
st := status.New(codes.ResourceExhausted, "status with error info and retry info")
|
||||
return st.WithDetails(
|
||||
&errdetails.ErrorInfo{Reason: "no throttle detail"},
|
||||
&errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)},
|
||||
)
|
||||
},
|
||||
throttle: 13 * time.Minute,
|
||||
},
|
||||
{
|
||||
stsFn: func() (*status.Status, error) {
|
||||
st := status.New(codes.ResourceExhausted, "status with two retry info should take the first")
|
||||
return st.WithDetails(
|
||||
&errdetails.RetryInfo{RetryDelay: durationpb.New(13 * time.Minute)},
|
||||
&errdetails.RetryInfo{RetryDelay: durationpb.New(18 * time.Minute)},
|
||||
)
|
||||
},
|
||||
throttle: 13 * time.Minute,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tts {
|
||||
sts, _ := tt.stsFn()
|
||||
t.Run(sts.Message(), func(t *testing.T) {
|
||||
th := getThrottleDuration(sts)
|
||||
require.Equal(t, tt.throttle, th)
|
||||
})
|
||||
}
|
||||
}
|
@ -145,10 +145,13 @@ func (md *metricsDriver) uploadMetrics(ctx context.Context, protoMetrics []*metr
|
||||
if md.metricsClient == nil {
|
||||
return errNoClient
|
||||
}
|
||||
_, err := md.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{
|
||||
ResourceMetrics: protoMetrics,
|
||||
|
||||
return md.connection.doRequest(ctx, func(ctx context.Context) error {
|
||||
_, err := md.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{
|
||||
ResourceMetrics: protoMetrics,
|
||||
})
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}()
|
||||
if err != nil {
|
||||
md.connection.setStateDisconnected(err)
|
||||
@ -183,10 +186,12 @@ func (td *tracesDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb.
|
||||
if td.tracesClient == nil {
|
||||
return errNoClient
|
||||
}
|
||||
_, err := td.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{
|
||||
ResourceSpans: protoSpans,
|
||||
return td.connection.doRequest(ctx, func(ctx context.Context) error {
|
||||
_, err := td.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{
|
||||
ResourceSpans: protoSpans,
|
||||
})
|
||||
return err
|
||||
})
|
||||
return err
|
||||
}()
|
||||
if err != nil {
|
||||
td.connection.setStateDisconnected(err)
|
||||
|
@ -34,11 +34,12 @@ import (
|
||||
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
|
||||
)
|
||||
|
||||
func makeMockCollector(t *testing.T) *mockCollector {
|
||||
func makeMockCollector(t *testing.T, mockConfig *mockConfig) *mockCollector {
|
||||
return &mockCollector{
|
||||
t: t,
|
||||
traceSvc: &mockTraceService{
|
||||
storage: otlptest.NewSpansStorage(),
|
||||
errors: mockConfig.errors,
|
||||
},
|
||||
metricSvc: &mockMetricService{
|
||||
storage: otlptest.NewMetricsStorage(),
|
||||
@ -49,10 +50,12 @@ func makeMockCollector(t *testing.T) *mockCollector {
|
||||
type mockTraceService struct {
|
||||
collectortracepb.UnimplementedTraceServiceServer
|
||||
|
||||
mu sync.RWMutex
|
||||
storage otlptest.SpansStorage
|
||||
headers metadata.MD
|
||||
delay time.Duration
|
||||
errors []error
|
||||
requests int
|
||||
mu sync.RWMutex
|
||||
storage otlptest.SpansStorage
|
||||
headers metadata.MD
|
||||
delay time.Duration
|
||||
}
|
||||
|
||||
func (mts *mockTraceService) getHeaders() metadata.MD {
|
||||
@ -77,9 +80,19 @@ func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.E
|
||||
if mts.delay > 0 {
|
||||
time.Sleep(mts.delay)
|
||||
}
|
||||
reply := &collectortracepb.ExportTraceServiceResponse{}
|
||||
|
||||
mts.mu.Lock()
|
||||
defer mts.mu.Unlock()
|
||||
defer func() {
|
||||
mts.requests++
|
||||
mts.mu.Unlock()
|
||||
}()
|
||||
|
||||
reply := &collectortracepb.ExportTraceServiceResponse{}
|
||||
if mts.requests < len(mts.errors) {
|
||||
idx := mts.requests
|
||||
return reply, mts.errors[idx]
|
||||
}
|
||||
|
||||
mts.headers, _ = metadata.FromIncomingContext(ctx)
|
||||
mts.storage.AddSpans(exp)
|
||||
return reply, nil
|
||||
@ -122,6 +135,11 @@ type mockCollector struct {
|
||||
stopOnce sync.Once
|
||||
}
|
||||
|
||||
type mockConfig struct {
|
||||
errors []error
|
||||
endpoint string
|
||||
}
|
||||
|
||||
var _ collectortracepb.TraceServiceServer = (*mockTraceService)(nil)
|
||||
var _ collectormetricpb.MetricsServiceServer = (*mockMetricService)(nil)
|
||||
|
||||
@ -192,13 +210,17 @@ func runMockCollector(t *testing.T) *mockCollector {
|
||||
}
|
||||
|
||||
func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector {
|
||||
ln, err := net.Listen("tcp", endpoint)
|
||||
return runMockCollectorWithConfig(t, &mockConfig{endpoint: endpoint})
|
||||
}
|
||||
|
||||
func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockCollector {
|
||||
ln, err := net.Listen("tcp", mockConfig.endpoint)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get an endpoint: %v", err)
|
||||
}
|
||||
|
||||
srv := grpc.NewServer()
|
||||
mc := makeMockCollector(t)
|
||||
mc := makeMockCollector(t, mockConfig)
|
||||
collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc)
|
||||
collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc)
|
||||
mc.ln = newListener(ln)
|
||||
|
@ -200,3 +200,12 @@ func WithTracesTimeout(duration time.Duration) Option {
|
||||
func WithMetricsTimeout(duration time.Duration) Option {
|
||||
return otlpconfig.WithMetricsTimeout(duration)
|
||||
}
|
||||
|
||||
// WithRetry configures the retry policy for transient errors that may occurs when
|
||||
// exporting metrics or traces. An exponential back-off algorithm is used to
|
||||
// ensure endpoints are not overwhelmed with retries. If unset, the default
|
||||
// retry policy will retry after 5 seconds and increase exponentially after each
|
||||
// error for a total of 1 minute.
|
||||
func WithRetry(settings otlp.RetrySettings) Option {
|
||||
return otlpconfig.WithRetry(settings)
|
||||
}
|
||||
|
@ -22,8 +22,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/genproto/googleapis/rpc/errdetails"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -150,6 +152,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test
|
||||
reconnectionPeriod := 20 * time.Millisecond
|
||||
ctx := context.Background()
|
||||
exp := newGRPCExporter(t, ctx, mc.endpoint,
|
||||
otlpgrpc.WithRetry(otlp.RetrySettings{Enabled: false}),
|
||||
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
|
||||
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()
|
||||
|
||||
@ -200,12 +203,280 @@ func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *test
|
||||
require.NoError(t, nmc.Stop())
|
||||
}
|
||||
|
||||
func TestExporterExportFailureAndRecoveryModes(t *testing.T) {
|
||||
tts := []struct {
|
||||
name string
|
||||
errors []error
|
||||
rs otlp.RetrySettings
|
||||
fn func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector)
|
||||
opts []otlpgrpc.Option
|
||||
}{
|
||||
{
|
||||
name: "Do not retry if succeeded",
|
||||
fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) {
|
||||
require.NoError(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}))
|
||||
|
||||
span := mc.getSpans()
|
||||
|
||||
require.Len(t, span, 1)
|
||||
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 success request.")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Do not retry if 'error' is ok",
|
||||
errors: []error{
|
||||
status.Error(codes.OK, ""),
|
||||
},
|
||||
fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) {
|
||||
require.NoError(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}))
|
||||
|
||||
span := mc.getSpans()
|
||||
|
||||
require.Len(t, span, 0)
|
||||
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 error OK request.")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Fail three times and succeed",
|
||||
rs: otlp.RetrySettings{
|
||||
Enabled: true,
|
||||
MaxElapsedTime: 300 * time.Millisecond,
|
||||
InitialInterval: 2 * time.Millisecond,
|
||||
MaxInterval: 10 * time.Millisecond,
|
||||
},
|
||||
errors: []error{
|
||||
status.Error(codes.Unavailable, "backend under pressure"),
|
||||
status.Error(codes.Unavailable, "backend under pressure"),
|
||||
status.Error(codes.Unavailable, "backend under pressure"),
|
||||
},
|
||||
fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) {
|
||||
require.NoError(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}))
|
||||
|
||||
span := mc.getSpans()
|
||||
|
||||
require.Len(t, span, 1)
|
||||
require.Equal(t, 4, mc.traceSvc.requests, "trace service must receive 3 failure requests and 1 success request.")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Permanent error should not be retried",
|
||||
rs: otlp.RetrySettings{
|
||||
Enabled: true,
|
||||
MaxElapsedTime: 300 * time.Millisecond,
|
||||
InitialInterval: 2 * time.Millisecond,
|
||||
MaxInterval: 10 * time.Millisecond,
|
||||
},
|
||||
errors: []error{
|
||||
status.Error(codes.InvalidArgument, "invalid arguments"),
|
||||
},
|
||||
fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) {
|
||||
require.Error(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}))
|
||||
|
||||
span := mc.getSpans()
|
||||
|
||||
require.Len(t, span, 0)
|
||||
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 error requests.")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test all transient errors and succeed",
|
||||
rs: otlp.RetrySettings{
|
||||
Enabled: true,
|
||||
MaxElapsedTime: 500 * time.Millisecond,
|
||||
InitialInterval: 1 * time.Millisecond,
|
||||
MaxInterval: 2 * time.Millisecond,
|
||||
},
|
||||
errors: []error{
|
||||
status.Error(codes.Canceled, ""),
|
||||
status.Error(codes.DeadlineExceeded, ""),
|
||||
status.Error(codes.ResourceExhausted, ""),
|
||||
status.Error(codes.Aborted, ""),
|
||||
status.Error(codes.OutOfRange, ""),
|
||||
status.Error(codes.Unavailable, ""),
|
||||
status.Error(codes.DataLoss, ""),
|
||||
},
|
||||
fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) {
|
||||
require.NoError(t, exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}}))
|
||||
|
||||
span := mc.getSpans()
|
||||
|
||||
require.Len(t, span, 1)
|
||||
require.Equal(t, 8, mc.traceSvc.requests, "trace service must receive 9 failure requests and 1 success request.")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Retry should honor server throttling",
|
||||
rs: otlp.RetrySettings{
|
||||
Enabled: true,
|
||||
MaxElapsedTime: time.Minute,
|
||||
InitialInterval: time.Nanosecond,
|
||||
MaxInterval: time.Nanosecond,
|
||||
},
|
||||
opts: []otlpgrpc.Option{
|
||||
otlpgrpc.WithTimeout(time.Millisecond * 100),
|
||||
},
|
||||
errors: []error{
|
||||
newThrottlingError(codes.ResourceExhausted, time.Second*30),
|
||||
},
|
||||
fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) {
|
||||
err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})
|
||||
require.Error(t, err)
|
||||
require.Equal(t, "context deadline exceeded", err.Error())
|
||||
|
||||
span := mc.getSpans()
|
||||
|
||||
require.Len(t, span, 0)
|
||||
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests and 1 success request.")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Retry should fail if server throttling is higher than the MaxElapsedTime",
|
||||
rs: otlp.RetrySettings{
|
||||
Enabled: true,
|
||||
MaxElapsedTime: time.Millisecond * 100,
|
||||
InitialInterval: time.Nanosecond,
|
||||
MaxInterval: time.Nanosecond,
|
||||
},
|
||||
errors: []error{
|
||||
newThrottlingError(codes.ResourceExhausted, time.Minute),
|
||||
},
|
||||
fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) {
|
||||
err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})
|
||||
require.Error(t, err)
|
||||
require.Equal(t, "max elapsed time expired when respecting server throttle: rpc error: code = ResourceExhausted desc = ", err.Error())
|
||||
|
||||
span := mc.getSpans()
|
||||
|
||||
require.Len(t, span, 0)
|
||||
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests and 1 success request.")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Retry stops if takes too long",
|
||||
rs: otlp.RetrySettings{
|
||||
Enabled: true,
|
||||
MaxElapsedTime: time.Millisecond * 100,
|
||||
InitialInterval: time.Millisecond * 50,
|
||||
MaxInterval: time.Millisecond * 50,
|
||||
},
|
||||
errors: []error{
|
||||
status.Error(codes.Unavailable, "unavailable"),
|
||||
status.Error(codes.Unavailable, "unavailable"),
|
||||
status.Error(codes.Unavailable, "unavailable"),
|
||||
status.Error(codes.Unavailable, "unavailable"),
|
||||
status.Error(codes.Unavailable, "unavailable"),
|
||||
status.Error(codes.Unavailable, "unavailable"),
|
||||
},
|
||||
fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) {
|
||||
err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})
|
||||
require.Error(t, err)
|
||||
|
||||
require.Equal(t, "max elapsed time expired: rpc error: code = Unavailable desc = unavailable", err.Error())
|
||||
|
||||
span := mc.getSpans()
|
||||
|
||||
require.Len(t, span, 0)
|
||||
require.LessOrEqual(t, 1, mc.traceSvc.requests, "trace service must receive at least 1 failure requests.")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Disabled retry",
|
||||
rs: otlp.RetrySettings{
|
||||
Enabled: false,
|
||||
},
|
||||
errors: []error{
|
||||
status.Error(codes.Unavailable, "unavailable"),
|
||||
},
|
||||
fn: func(t *testing.T, ctx context.Context, exp *otlp.Exporter, mc *mockCollector) {
|
||||
err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})
|
||||
require.Error(t, err)
|
||||
|
||||
require.Equal(t, "rpc error: code = Unavailable desc = unavailable", err.Error())
|
||||
|
||||
span := mc.getSpans()
|
||||
|
||||
require.Len(t, span, 0)
|
||||
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 failure requests.")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tts {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
mc := runMockCollectorWithConfig(t, &mockConfig{
|
||||
errors: tt.errors,
|
||||
})
|
||||
|
||||
opts := []otlpgrpc.Option{
|
||||
otlpgrpc.WithRetry(tt.rs),
|
||||
}
|
||||
|
||||
if len(tt.opts) != 0 {
|
||||
opts = append(opts, tt.opts...)
|
||||
}
|
||||
|
||||
exp := newGRPCExporter(t, ctx, mc.endpoint, opts...)
|
||||
|
||||
tt.fn(t, ctx, exp, mc)
|
||||
|
||||
require.NoError(t, mc.Stop())
|
||||
require.NoError(t, exp.Shutdown(ctx))
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestPermanentErrorsShouldNotBeRetried(t *testing.T) {
|
||||
permanentErrors := []*status.Status{
|
||||
status.New(codes.Unknown, "Unknown"),
|
||||
status.New(codes.InvalidArgument, "InvalidArgument"),
|
||||
status.New(codes.NotFound, "NotFound"),
|
||||
status.New(codes.AlreadyExists, "AlreadyExists"),
|
||||
status.New(codes.FailedPrecondition, "FailedPrecondition"),
|
||||
status.New(codes.Unimplemented, "Unimplemented"),
|
||||
status.New(codes.Internal, "Internal"),
|
||||
status.New(codes.PermissionDenied, ""),
|
||||
status.New(codes.Unauthenticated, ""),
|
||||
}
|
||||
|
||||
for _, sts := range permanentErrors {
|
||||
t.Run(sts.Code().String(), func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
mc := runMockCollectorWithConfig(t, &mockConfig{
|
||||
errors: []error{sts.Err()},
|
||||
})
|
||||
|
||||
exp := newGRPCExporter(t, ctx, mc.endpoint)
|
||||
|
||||
err := exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "Spans"}})
|
||||
require.Error(t, err)
|
||||
require.Len(t, mc.getSpans(), 0)
|
||||
require.Equal(t, 1, mc.traceSvc.requests, "trace service must receive 1 permanent error requests.")
|
||||
|
||||
require.NoError(t, mc.Stop())
|
||||
require.NoError(t, exp.Shutdown(ctx))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newThrottlingError(code codes.Code, duration time.Duration) error {
|
||||
s := status.New(code, "")
|
||||
|
||||
s, _ = s.WithDetails(&errdetails.RetryInfo{RetryDelay: durationpb.New(duration)})
|
||||
|
||||
return s.Err()
|
||||
}
|
||||
|
||||
func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
|
||||
mc := runMockCollector(t)
|
||||
|
||||
reconnectionPeriod := 50 * time.Millisecond
|
||||
ctx := context.Background()
|
||||
exp := newGRPCExporter(t, ctx, mc.endpoint,
|
||||
otlpgrpc.WithRetry(otlp.RetrySettings{Enabled: false}),
|
||||
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
|
||||
defer func() { require.NoError(t, exp.Shutdown(ctx)) }()
|
||||
|
||||
@ -367,7 +638,7 @@ func TestNewExporter_WithTimeout(t *testing.T) {
|
||||
}()
|
||||
|
||||
ctx := context.Background()
|
||||
exp := newGRPCExporter(t, ctx, mc.endpoint, otlpgrpc.WithTimeout(tt.timeout))
|
||||
exp := newGRPCExporter(t, ctx, mc.endpoint, otlpgrpc.WithTimeout(tt.timeout), otlpgrpc.WithRetry(otlp.RetrySettings{Enabled: false}))
|
||||
defer func() {
|
||||
_ = exp.Shutdown(ctx)
|
||||
}()
|
||||
@ -406,6 +677,7 @@ func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) {
|
||||
|
||||
expectedErr := fmt.Sprintf("traces exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint)
|
||||
|
||||
require.Error(t, err)
|
||||
require.Equal(t, expectedErr, err.Error())
|
||||
|
||||
defer func() {
|
||||
|
34
exporters/otlp/retry.go
Normal file
34
exporters/otlp/retry.go
Normal file
@ -0,0 +1,34 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package otlp
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// RetrySettings defines configuration for retrying batches in case of export failure
|
||||
// using an exponential backoff.
|
||||
type RetrySettings struct {
|
||||
// Enabled indicates whether to not retry sending batches in case of export failure.
|
||||
Enabled bool
|
||||
// InitialInterval the time to wait after the first failure before retrying.
|
||||
InitialInterval time.Duration
|
||||
// MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between
|
||||
// consecutive retries will always be `MaxInterval`.
|
||||
MaxInterval time.Duration
|
||||
// MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch.
|
||||
// Once this value is reached, the data is discarded.
|
||||
MaxElapsedTime time.Duration
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user