You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-07-15 01:04:25 +02:00
Propagate non-retryable error messages to client (#5929)
PR #5541 (and issue #5536) enhance error handling, returning body text as part of the error. However, this is only done for retryable errors; if non-retryable, error text still does not propagate to clients. This PR adds handling of non-retryable errors, ensuring any body text is part of the message returned to the user's code. There is no change to the circumstances under which errors are reported, just an enhancement of the content of such an error. --------- Co-authored-by: Damien Mathieu <42@dmathieu.com>
This commit is contained in:
@ -8,6 +8,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Propagate non-retryable error messages to client in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#5929)
|
||||||
|
- Propagate non-retryable error messages to client in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#5929)
|
||||||
|
- Propagate non-retryable error messages to client in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#5929)
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
- Fix inconsistent request body closing in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#5954)
|
- Fix inconsistent request body closing in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#5954)
|
||||||
|
@ -157,9 +157,7 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
var rErr error
|
if sc := resp.StatusCode; sc >= 200 && sc <= 299 {
|
||||||
switch sc := resp.StatusCode; {
|
|
||||||
case sc >= 200 && sc <= 299:
|
|
||||||
// Success, do not retry.
|
// Success, do not retry.
|
||||||
|
|
||||||
// Read the partial success message, if any.
|
// Read the partial success message, if any.
|
||||||
@ -187,12 +185,8 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
case sc == http.StatusTooManyRequests,
|
}
|
||||||
sc == http.StatusBadGateway,
|
// Error cases.
|
||||||
sc == http.StatusServiceUnavailable,
|
|
||||||
sc == http.StatusGatewayTimeout:
|
|
||||||
// Retry-able failure.
|
|
||||||
rErr = newResponseError(resp.Header, nil)
|
|
||||||
|
|
||||||
// server may return a message with the response
|
// server may return a message with the response
|
||||||
// body, so we read it to include in the error
|
// body, so we read it to include in the error
|
||||||
@ -202,19 +196,23 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
|
|||||||
if _, err := io.Copy(&respData, resp.Body); err != nil {
|
if _, err := io.Copy(&respData, resp.Body); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
respStr := strings.TrimSpace(respData.String())
|
||||||
// overwrite the error message with the response body
|
if len(respStr) == 0 {
|
||||||
// if it is not empty
|
respStr = "(empty)"
|
||||||
if respStr := strings.TrimSpace(respData.String()); respStr != "" {
|
|
||||||
// Include response for context.
|
|
||||||
e := errors.New(respStr)
|
|
||||||
rErr = newResponseError(resp.Header, e)
|
|
||||||
}
|
}
|
||||||
|
bodyErr := fmt.Errorf("body: %s", respStr)
|
||||||
|
|
||||||
|
switch resp.StatusCode {
|
||||||
|
case http.StatusTooManyRequests,
|
||||||
|
http.StatusBadGateway,
|
||||||
|
http.StatusServiceUnavailable,
|
||||||
|
http.StatusGatewayTimeout:
|
||||||
|
// Retryable failure.
|
||||||
|
return newResponseError(resp.Header, bodyErr)
|
||||||
default:
|
default:
|
||||||
rErr = fmt.Errorf("failed to send logs to %s: %s", request.URL, resp.Status)
|
// Non-retryable failure.
|
||||||
|
return fmt.Errorf("failed to send logs to %s: %s (%w)", request.URL, resp.Status, bodyErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return rErr
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -778,4 +778,24 @@ func TestConfig(t *testing.T) {
|
|||||||
require.Contains(t, got, headerKeySetInProxy)
|
require.Contains(t, got, headerKeySetInProxy)
|
||||||
assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy])
|
assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("non-retryable errors are propagated", func(t *testing.T) {
|
||||||
|
exporterErr := errors.New("missing required attribute aaaa")
|
||||||
|
rCh := make(chan exportResult, 1)
|
||||||
|
rCh <- exportResult{Err: &httpResponseError{
|
||||||
|
Status: http.StatusBadRequest,
|
||||||
|
Err: exporterErr,
|
||||||
|
}}
|
||||||
|
|
||||||
|
exp, coll := factoryFunc("", rCh, WithRetry(RetryConfig{
|
||||||
|
Enabled: false,
|
||||||
|
}))
|
||||||
|
ctx := context.Background()
|
||||||
|
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
|
||||||
|
// Push this after Shutdown so the HTTP server doesn't hang.
|
||||||
|
t.Cleanup(func() { close(rCh) })
|
||||||
|
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||||
|
err := exp.Export(ctx, make([]log.Record, 1))
|
||||||
|
assert.ErrorContains(t, err, exporterErr.Error())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
@ -160,9 +160,7 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
var rErr error
|
if sc := resp.StatusCode; sc >= 200 && sc <= 299 {
|
||||||
switch sc := resp.StatusCode; {
|
|
||||||
case sc >= 200 && sc <= 299:
|
|
||||||
// Success, do not retry.
|
// Success, do not retry.
|
||||||
|
|
||||||
// Read the partial success message, if any.
|
// Read the partial success message, if any.
|
||||||
@ -190,12 +188,8 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
case sc == http.StatusTooManyRequests,
|
}
|
||||||
sc == http.StatusBadGateway,
|
// Error cases.
|
||||||
sc == http.StatusServiceUnavailable,
|
|
||||||
sc == http.StatusGatewayTimeout:
|
|
||||||
// Retry-able failure.
|
|
||||||
rErr = newResponseError(resp.Header, nil)
|
|
||||||
|
|
||||||
// server may return a message with the response
|
// server may return a message with the response
|
||||||
// body, so we read it to include in the error
|
// body, so we read it to include in the error
|
||||||
@ -205,19 +199,23 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
|
|||||||
if _, err := io.Copy(&respData, resp.Body); err != nil {
|
if _, err := io.Copy(&respData, resp.Body); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
respStr := strings.TrimSpace(respData.String())
|
||||||
// overwrite the error message with the response body
|
if len(respStr) == 0 {
|
||||||
// if it is not empty
|
respStr = "(empty)"
|
||||||
if respStr := strings.TrimSpace(respData.String()); respStr != "" {
|
|
||||||
// Include response for context.
|
|
||||||
e := errors.New(respStr)
|
|
||||||
rErr = newResponseError(resp.Header, e)
|
|
||||||
}
|
}
|
||||||
|
bodyErr := fmt.Errorf("body: %s", respStr)
|
||||||
|
|
||||||
|
switch resp.StatusCode {
|
||||||
|
case http.StatusTooManyRequests,
|
||||||
|
http.StatusBadGateway,
|
||||||
|
http.StatusServiceUnavailable,
|
||||||
|
http.StatusGatewayTimeout:
|
||||||
|
// Retryable failure.
|
||||||
|
return newResponseError(resp.Header, bodyErr)
|
||||||
default:
|
default:
|
||||||
rErr = fmt.Errorf("failed to send metrics to %s: %s", request.URL, resp.Status)
|
// Non-retryable failure.
|
||||||
|
return fmt.Errorf("failed to send metrics to %s: %s (%w)", request.URL, resp.Status, bodyErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return rErr
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,4 +270,25 @@ func TestConfig(t *testing.T) {
|
|||||||
require.Contains(t, got, headerKeySetInProxy)
|
require.Contains(t, got, headerKeySetInProxy)
|
||||||
assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy])
|
assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("non-retryable errors are propagated", func(t *testing.T) {
|
||||||
|
exporterErr := errors.New("missing required attribute aaa")
|
||||||
|
rCh := make(chan otest.ExportResult, 1)
|
||||||
|
rCh <- otest.ExportResult{Err: &otest.HTTPResponseError{
|
||||||
|
Status: http.StatusBadRequest,
|
||||||
|
Err: exporterErr,
|
||||||
|
}}
|
||||||
|
exp, coll := factoryFunc("", rCh)
|
||||||
|
ctx := context.Background()
|
||||||
|
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
|
||||||
|
// Push this after Shutdown so the HTTP server doesn't hang.
|
||||||
|
t.Cleanup(func() { close(rCh) })
|
||||||
|
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||||
|
exCtx, cancel := context.WithTimeout(ctx, time.Second)
|
||||||
|
defer cancel()
|
||||||
|
err := exp.Export(exCtx, &metricdata.ResourceMetrics{})
|
||||||
|
assert.ErrorContains(t, err, exporterErr.Error())
|
||||||
|
|
||||||
|
assert.NoError(t, exCtx.Err())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
@ -166,8 +166,7 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
switch sc := resp.StatusCode; {
|
if sc := resp.StatusCode; sc >= 200 && sc <= 299 {
|
||||||
case sc >= 200 && sc <= 299:
|
|
||||||
// Success, do not retry.
|
// Success, do not retry.
|
||||||
// Read the partial success message, if any.
|
// Read the partial success message, if any.
|
||||||
var respData bytes.Buffer
|
var respData bytes.Buffer
|
||||||
@ -194,13 +193,8 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
case sc == http.StatusTooManyRequests,
|
// Error cases.
|
||||||
sc == http.StatusBadGateway,
|
|
||||||
sc == http.StatusServiceUnavailable,
|
|
||||||
sc == http.StatusGatewayTimeout:
|
|
||||||
// Retry-able failures.
|
|
||||||
rErr := newResponseError(resp.Header, nil)
|
|
||||||
|
|
||||||
// server may return a message with the response
|
// server may return a message with the response
|
||||||
// body, so we read it to include in the error
|
// body, so we read it to include in the error
|
||||||
@ -210,17 +204,22 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
|||||||
if _, err := io.Copy(&respData, resp.Body); err != nil {
|
if _, err := io.Copy(&respData, resp.Body); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
respStr := strings.TrimSpace(respData.String())
|
||||||
// overwrite the error message with the response body
|
if len(respStr) == 0 {
|
||||||
// if it is not empty
|
respStr = "(empty)"
|
||||||
if respStr := strings.TrimSpace(respData.String()); respStr != "" {
|
|
||||||
// Include response for context.
|
|
||||||
e := errors.New(respStr)
|
|
||||||
rErr = newResponseError(resp.Header, e)
|
|
||||||
}
|
}
|
||||||
return rErr
|
bodyErr := fmt.Errorf("body: %s", respStr)
|
||||||
|
|
||||||
|
switch resp.StatusCode {
|
||||||
|
case http.StatusTooManyRequests,
|
||||||
|
http.StatusBadGateway,
|
||||||
|
http.StatusServiceUnavailable,
|
||||||
|
http.StatusGatewayTimeout:
|
||||||
|
// Retryable failure.
|
||||||
|
return newResponseError(resp.Header, bodyErr)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status)
|
// Non-retryable failure.
|
||||||
|
return fmt.Errorf("failed to send to %s: %s (%w)", request.URL, resp.Status, bodyErr)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -244,6 +244,9 @@ func TestTimeout(t *testing.T) {
|
|||||||
func TestNoRetry(t *testing.T) {
|
func TestNoRetry(t *testing.T) {
|
||||||
mc := runMockCollector(t, mockCollectorConfig{
|
mc := runMockCollector(t, mockCollectorConfig{
|
||||||
InjectHTTPStatus: []int{http.StatusBadRequest},
|
InjectHTTPStatus: []int{http.StatusBadRequest},
|
||||||
|
Partial: &coltracepb.ExportTracePartialSuccess{
|
||||||
|
ErrorMessage: "missing required attribute aaa",
|
||||||
|
},
|
||||||
})
|
})
|
||||||
defer mc.MustStop(t)
|
defer mc.MustStop(t)
|
||||||
driver := otlptracehttp.NewClient(
|
driver := otlptracehttp.NewClient(
|
||||||
@ -265,9 +268,14 @@ func TestNoRetry(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
|
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
unwrapped := errors.Unwrap(err)
|
|
||||||
assert.Equal(t, fmt.Sprintf("failed to send to http://%s/v1/traces: 400 Bad Request", mc.endpoint), unwrapped.Error())
|
|
||||||
assert.True(t, strings.HasPrefix(err.Error(), "traces export: "))
|
assert.True(t, strings.HasPrefix(err.Error(), "traces export: "))
|
||||||
|
|
||||||
|
unwrapped := errors.Unwrap(err)
|
||||||
|
assert.Contains(t, unwrapped.Error(), fmt.Sprintf("failed to send to http://%s/v1/traces: 400 Bad Request", mc.endpoint))
|
||||||
|
|
||||||
|
unwrapped2 := errors.Unwrap(unwrapped)
|
||||||
|
assert.Contains(t, unwrapped2.Error(), "missing required attribute aaa")
|
||||||
|
|
||||||
assert.Empty(t, mc.GetSpans())
|
assert.Empty(t, mc.GetSpans())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user