You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
otlptracegrpc, otlpmetricgrpc: Retry for RESOURCE_EXHAUSTED only if RetryInfo is returned (#4669)
This commit is contained in:
@@ -176,28 +176,36 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
|
||||
// duration to wait for if an explicit throttle time is included in err.
|
||||
func retryable(err error) (bool, time.Duration) {
|
||||
s := status.Convert(err)
|
||||
return retryableGRPCStatus(s)
|
||||
}
|
||||
|
||||
func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
|
||||
switch s.Code() {
|
||||
case codes.Canceled,
|
||||
codes.DeadlineExceeded,
|
||||
codes.ResourceExhausted,
|
||||
codes.Aborted,
|
||||
codes.OutOfRange,
|
||||
codes.Unavailable,
|
||||
codes.DataLoss:
|
||||
return true, throttleDelay(s)
|
||||
// Additionally, handle RetryInfo.
|
||||
_, d := throttleDelay(s)
|
||||
return true, d
|
||||
case codes.ResourceExhausted:
|
||||
// Retry only if the server signals that the recovery from resource exhaustion is possible.
|
||||
return throttleDelay(s)
|
||||
}
|
||||
|
||||
// Not a retry-able error.
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// throttleDelay returns a duration to wait for if an explicit throttle time
|
||||
// is included in the response status.
|
||||
func throttleDelay(s *status.Status) time.Duration {
|
||||
// throttleDelay returns if the status is RetryInfo
|
||||
// and the duration to wait for if an explicit throttle time is included.
|
||||
func throttleDelay(s *status.Status) (bool, time.Duration) {
|
||||
for _, detail := range s.Details() {
|
||||
if t, ok := detail.(*errdetails.RetryInfo); ok {
|
||||
return t.RetryDelay.AsDuration()
|
||||
return true, t.RetryDelay.AsDuration()
|
||||
}
|
||||
}
|
||||
return 0
|
||||
return false, 0
|
||||
}
|
||||
|
||||
@@ -33,15 +33,17 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
func TestThrottleDuration(t *testing.T) {
|
||||
func TestThrottleDelay(t *testing.T) {
|
||||
c := codes.ResourceExhausted
|
||||
testcases := []struct {
|
||||
status *status.Status
|
||||
expected time.Duration
|
||||
status *status.Status
|
||||
wantOK bool
|
||||
wantDuration time.Duration
|
||||
}{
|
||||
{
|
||||
status: status.New(c, "NoRetryInfo"),
|
||||
expected: 0,
|
||||
status: status.New(c, "NoRetryInfo"),
|
||||
wantOK: false,
|
||||
wantDuration: 0,
|
||||
},
|
||||
{
|
||||
status: func() *status.Status {
|
||||
@@ -53,7 +55,8 @@ func TestThrottleDuration(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}(),
|
||||
expected: 15 * time.Millisecond,
|
||||
wantOK: true,
|
||||
wantDuration: 15 * time.Millisecond,
|
||||
},
|
||||
{
|
||||
status: func() *status.Status {
|
||||
@@ -63,7 +66,8 @@ func TestThrottleDuration(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}(),
|
||||
expected: 0,
|
||||
wantOK: false,
|
||||
wantDuration: 0,
|
||||
},
|
||||
{
|
||||
status: func() *status.Status {
|
||||
@@ -76,7 +80,8 @@ func TestThrottleDuration(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}(),
|
||||
expected: 13 * time.Minute,
|
||||
wantOK: true,
|
||||
wantDuration: 13 * time.Minute,
|
||||
},
|
||||
{
|
||||
status: func() *status.Status {
|
||||
@@ -91,13 +96,16 @@ func TestThrottleDuration(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}(),
|
||||
expected: 13 * time.Minute,
|
||||
wantOK: true,
|
||||
wantDuration: 13 * time.Minute,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.status.Message(), func(t *testing.T) {
|
||||
require.Equal(t, tc.expected, throttleDelay(tc.status))
|
||||
ok, d := throttleDelay(tc.status)
|
||||
assert.Equal(t, tc.wantOK, ok)
|
||||
assert.Equal(t, tc.wantDuration, d)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -112,7 +120,7 @@ func TestRetryable(t *testing.T) {
|
||||
codes.NotFound: false,
|
||||
codes.AlreadyExists: false,
|
||||
codes.PermissionDenied: false,
|
||||
codes.ResourceExhausted: true,
|
||||
codes.ResourceExhausted: false,
|
||||
codes.FailedPrecondition: false,
|
||||
codes.Aborted: true,
|
||||
codes.OutOfRange: true,
|
||||
@@ -129,6 +137,20 @@ func TestRetryable(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
|
||||
delay := 15 * time.Millisecond
|
||||
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
|
||||
&errdetails.RetryInfo{
|
||||
RetryDelay: durationpb.New(delay),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
ok, d := retryableGRPCStatus(s)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, delay, d)
|
||||
}
|
||||
|
||||
type clientShim struct {
|
||||
*client
|
||||
}
|
||||
|
||||
@@ -260,30 +260,38 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
|
||||
// duration to wait for if an explicit throttle time is included in err.
|
||||
func retryable(err error) (bool, time.Duration) {
|
||||
s := status.Convert(err)
|
||||
return retryableGRPCStatus(s)
|
||||
}
|
||||
|
||||
func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
|
||||
switch s.Code() {
|
||||
case codes.Canceled,
|
||||
codes.DeadlineExceeded,
|
||||
codes.ResourceExhausted,
|
||||
codes.Aborted,
|
||||
codes.OutOfRange,
|
||||
codes.Unavailable,
|
||||
codes.DataLoss:
|
||||
return true, throttleDelay(s)
|
||||
// Additionally handle RetryInfo.
|
||||
_, d := throttleDelay(s)
|
||||
return true, d
|
||||
case codes.ResourceExhausted:
|
||||
// Retry only if the server signals that the recovery from resource exhaustion is possible.
|
||||
return throttleDelay(s)
|
||||
}
|
||||
|
||||
// Not a retry-able error.
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// throttleDelay returns a duration to wait for if an explicit throttle time
|
||||
// is included in the response status.
|
||||
func throttleDelay(s *status.Status) time.Duration {
|
||||
// throttleDelay returns of the status is RetryInfo
|
||||
// and the its duration to wait for if an explicit throttle time.
|
||||
func throttleDelay(s *status.Status) (bool, time.Duration) {
|
||||
for _, detail := range s.Details() {
|
||||
if t, ok := detail.(*errdetails.RetryInfo); ok {
|
||||
return t.RetryDelay.AsDuration()
|
||||
return true, t.RetryDelay.AsDuration()
|
||||
}
|
||||
}
|
||||
return 0
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// MarshalLog is the marshaling function used by the logging system to represent this Client.
|
||||
|
||||
@@ -27,19 +27,21 @@ import (
|
||||
"google.golang.org/protobuf/types/known/durationpb"
|
||||
)
|
||||
|
||||
func TestThrottleDuration(t *testing.T) {
|
||||
func TestThrottleDelay(t *testing.T) {
|
||||
c := codes.ResourceExhausted
|
||||
testcases := []struct {
|
||||
status *status.Status
|
||||
expected time.Duration
|
||||
status *status.Status
|
||||
wantOK bool
|
||||
wantDuration time.Duration
|
||||
}{
|
||||
{
|
||||
status: status.New(c, "no retry info"),
|
||||
expected: 0,
|
||||
status: status.New(c, "NoRetryInfo"),
|
||||
wantOK: false,
|
||||
wantDuration: 0,
|
||||
},
|
||||
{
|
||||
status: func() *status.Status {
|
||||
s, err := status.New(c, "single retry info").WithDetails(
|
||||
s, err := status.New(c, "SingleRetryInfo").WithDetails(
|
||||
&errdetails.RetryInfo{
|
||||
RetryDelay: durationpb.New(15 * time.Millisecond),
|
||||
},
|
||||
@@ -47,21 +49,23 @@ func TestThrottleDuration(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}(),
|
||||
expected: 15 * time.Millisecond,
|
||||
wantOK: true,
|
||||
wantDuration: 15 * time.Millisecond,
|
||||
},
|
||||
{
|
||||
status: func() *status.Status {
|
||||
s, err := status.New(c, "error info").WithDetails(
|
||||
s, err := status.New(c, "ErrorInfo").WithDetails(
|
||||
&errdetails.ErrorInfo{Reason: "no throttle detail"},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}(),
|
||||
expected: 0,
|
||||
wantOK: false,
|
||||
wantDuration: 0,
|
||||
},
|
||||
{
|
||||
status: func() *status.Status {
|
||||
s, err := status.New(c, "error and retry info").WithDetails(
|
||||
s, err := status.New(c, "ErrorAndRetryInfo").WithDetails(
|
||||
&errdetails.ErrorInfo{Reason: "with throttle detail"},
|
||||
&errdetails.RetryInfo{
|
||||
RetryDelay: durationpb.New(13 * time.Minute),
|
||||
@@ -70,11 +74,12 @@ func TestThrottleDuration(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}(),
|
||||
expected: 13 * time.Minute,
|
||||
wantOK: true,
|
||||
wantDuration: 13 * time.Minute,
|
||||
},
|
||||
{
|
||||
status: func() *status.Status {
|
||||
s, err := status.New(c, "double retry info").WithDetails(
|
||||
s, err := status.New(c, "DoubleRetryInfo").WithDetails(
|
||||
&errdetails.RetryInfo{
|
||||
RetryDelay: durationpb.New(13 * time.Minute),
|
||||
},
|
||||
@@ -85,13 +90,16 @@ func TestThrottleDuration(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}(),
|
||||
expected: 13 * time.Minute,
|
||||
wantOK: true,
|
||||
wantDuration: 13 * time.Minute,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.status.Message(), func(t *testing.T) {
|
||||
require.Equal(t, tc.expected, throttleDelay(tc.status))
|
||||
ok, d := throttleDelay(tc.status)
|
||||
assert.Equal(t, tc.wantOK, ok)
|
||||
assert.Equal(t, tc.wantDuration, d)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -106,7 +114,7 @@ func TestRetryable(t *testing.T) {
|
||||
codes.NotFound: false,
|
||||
codes.AlreadyExists: false,
|
||||
codes.PermissionDenied: false,
|
||||
codes.ResourceExhausted: true,
|
||||
codes.ResourceExhausted: false,
|
||||
codes.FailedPrecondition: false,
|
||||
codes.Aborted: true,
|
||||
codes.OutOfRange: true,
|
||||
@@ -123,6 +131,20 @@ func TestRetryable(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
|
||||
delay := 15 * time.Millisecond
|
||||
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
|
||||
&errdetails.RetryInfo{
|
||||
RetryDelay: durationpb.New(delay),
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
ok, d := retryableGRPCStatus(s)
|
||||
assert.True(t, ok)
|
||||
assert.Equal(t, delay, d)
|
||||
}
|
||||
|
||||
func TestUnstartedStop(t *testing.T) {
|
||||
client := NewClient()
|
||||
assert.ErrorIs(t, client.Stop(context.Background()), errAlreadyStopped)
|
||||
|
||||
Reference in New Issue
Block a user