mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-05 22:54:18 +02:00
Handle partial-success responses for OTLP trace (#3106)
* Handle partial-success responses for OTLP trace Co-authored-by: David Ashpole <dashpole@google.com> Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com> Co-authored-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com>
This commit is contained in:
parent
9c2a0c2d69
commit
569f743072
@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
|
||||
- Support Go 1.19.
|
||||
Include compatibility testing and document support. (#3077)
|
||||
- Support the OTLP ExportTracePartialSuccess response; these are passed to the registered error handler. (#3106)
|
||||
- Upgrade go.opentelemetry.io/proto/otlp from v0.18.0 to v0.19.0 (#3107)
|
||||
- Add an `Attribute` field to the `Scope` type in `go.opentelemetry.io/otel/sdk/instrumentation`. (#3131)
|
||||
- Add the `WithScopeAttributes` `TracerOption` to the `go.opentelemetry.io/otel/trace` package. (#3131)
|
||||
|
@ -26,6 +26,8 @@ import (
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
|
||||
@ -196,9 +198,16 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
||||
defer cancel()
|
||||
|
||||
return c.requestFunc(ctx, func(iCtx context.Context) error {
|
||||
_, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{
|
||||
resp, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{
|
||||
ResourceSpans: protoSpans,
|
||||
})
|
||||
if resp != nil && resp.PartialSuccess != nil {
|
||||
otel.Handle(otlp.PartialSuccessToError(
|
||||
otlp.TracingPartialSuccess,
|
||||
resp.PartialSuccess.RejectedSpans,
|
||||
resp.PartialSuccess.ErrorMessage,
|
||||
))
|
||||
}
|
||||
// nil is converted to OK.
|
||||
if status.Code(err) == codes.OK {
|
||||
// Success.
|
||||
|
@ -4,7 +4,7 @@
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
@ -30,12 +30,14 @@ import (
|
||||
"google.golang.org/grpc/encoding/gzip"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlptracetest"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
||||
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
|
||||
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
|
||||
)
|
||||
|
||||
@ -386,3 +388,26 @@ func TestEmptyData(t *testing.T) {
|
||||
|
||||
assert.NoError(t, exp.ExportSpans(ctx, nil))
|
||||
}
|
||||
|
||||
func TestPartialSuccess(t *testing.T) {
|
||||
mc := runMockCollectorWithConfig(t, &mockConfig{
|
||||
partial: &coltracepb.ExportTracePartialSuccess{
|
||||
RejectedSpans: 2,
|
||||
ErrorMessage: "partially successful",
|
||||
},
|
||||
})
|
||||
t.Cleanup(func() { require.NoError(t, mc.stop()) })
|
||||
|
||||
errors := []error{}
|
||||
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
|
||||
errors = append(errors, err)
|
||||
}))
|
||||
ctx := context.Background()
|
||||
exp := newGRPCExporter(t, ctx, mc.endpoint)
|
||||
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||
require.NoError(t, exp.ExportSpans(ctx, roSpans))
|
||||
|
||||
require.Equal(t, 1, len(errors))
|
||||
require.Contains(t, errors[0].Error(), "partially successful")
|
||||
require.Contains(t, errors[0].Error(), "2 spans rejected")
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ func makeMockCollector(t *testing.T, mockConfig *mockConfig) *mockCollector {
|
||||
traceSvc: &mockTraceService{
|
||||
storage: otlptracetest.NewSpansStorage(),
|
||||
errors: mockConfig.errors,
|
||||
partial: mockConfig.partial,
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -44,6 +45,7 @@ type mockTraceService struct {
|
||||
collectortracepb.UnimplementedTraceServiceServer
|
||||
|
||||
errors []error
|
||||
partial *collectortracepb.ExportTracePartialSuccess
|
||||
requests int
|
||||
mu sync.RWMutex
|
||||
storage otlptracetest.SpansStorage
|
||||
@ -82,7 +84,9 @@ func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.E
|
||||
<-mts.exportBlock
|
||||
}
|
||||
|
||||
reply := &collectortracepb.ExportTraceServiceResponse{}
|
||||
reply := &collectortracepb.ExportTraceServiceResponse{
|
||||
PartialSuccess: mts.partial,
|
||||
}
|
||||
if mts.requests < len(mts.errors) {
|
||||
idx := mts.requests
|
||||
return reply, mts.errors[idx]
|
||||
@ -106,6 +110,7 @@ type mockCollector struct {
|
||||
type mockConfig struct {
|
||||
errors []error
|
||||
endpoint string
|
||||
partial *collectortracepb.ExportTracePartialSuccess
|
||||
}
|
||||
|
||||
var _ collectortracepb.TraceServiceServer = (*mockTraceService)(nil)
|
||||
|
@ -29,6 +29,8 @@ import (
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
|
||||
@ -154,28 +156,48 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
||||
return err
|
||||
}
|
||||
|
||||
var rErr error
|
||||
if resp != nil && resp.Body != nil {
|
||||
defer func() {
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
otel.Handle(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusOK:
|
||||
// Success, do not retry.
|
||||
case http.StatusTooManyRequests,
|
||||
http.StatusServiceUnavailable:
|
||||
// Retry-able failure.
|
||||
rErr = newResponseError(resp.Header)
|
||||
|
||||
// Going to retry, drain the body to reuse the connection.
|
||||
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
|
||||
_ = resp.Body.Close()
|
||||
// Read the partial success message, if any.
|
||||
var respData bytes.Buffer
|
||||
if _, err := io.Copy(&respData, resp.Body); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
rErr = fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status)
|
||||
}
|
||||
|
||||
if err := resp.Body.Close(); err != nil {
|
||||
return err
|
||||
if respData.Len() != 0 {
|
||||
var respProto coltracepb.ExportTraceServiceResponse
|
||||
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if respProto.PartialSuccess != nil {
|
||||
otel.Handle(otlp.PartialSuccessToError(
|
||||
otlp.TracingPartialSuccess,
|
||||
respProto.PartialSuccess.RejectedSpans,
|
||||
respProto.PartialSuccess.ErrorMessage,
|
||||
))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
case http.StatusTooManyRequests, http.StatusServiceUnavailable:
|
||||
// Retry-able failures. Drain the body to reuse the connection.
|
||||
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
|
||||
otel.Handle(err)
|
||||
}
|
||||
return newResponseError(resp.Header)
|
||||
default:
|
||||
return fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status)
|
||||
}
|
||||
return rErr
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -25,9 +25,11 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlptracetest"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -348,3 +350,35 @@ func TestStopWhileExporting(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
<-doneCh
|
||||
}
|
||||
|
||||
func TestPartialSuccess(t *testing.T) {
|
||||
mcCfg := mockCollectorConfig{
|
||||
Partial: &coltracepb.ExportTracePartialSuccess{
|
||||
RejectedSpans: 2,
|
||||
ErrorMessage: "partially successful",
|
||||
},
|
||||
}
|
||||
mc := runMockCollector(t, mcCfg)
|
||||
defer mc.MustStop(t)
|
||||
driver := otlptracehttp.NewClient(
|
||||
otlptracehttp.WithEndpoint(mc.Endpoint()),
|
||||
otlptracehttp.WithInsecure(),
|
||||
)
|
||||
ctx := context.Background()
|
||||
exporter, err := otlptrace.New(ctx, driver)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
assert.NoError(t, exporter.Shutdown(context.Background()))
|
||||
}()
|
||||
|
||||
errors := []error{}
|
||||
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
|
||||
errors = append(errors, err)
|
||||
}))
|
||||
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
|
||||
assert.NoError(t, err)
|
||||
|
||||
require.Equal(t, 1, len(errors))
|
||||
require.Contains(t, errors[0].Error(), "partially successful")
|
||||
require.Contains(t, errors[0].Error(), "2 spans rejected")
|
||||
}
|
||||
|
@ -46,6 +46,7 @@ type mockCollector struct {
|
||||
injectHTTPStatus []int
|
||||
injectResponseHeader []map[string]string
|
||||
injectContentType string
|
||||
partial *collectortracepb.ExportTracePartialSuccess
|
||||
delay <-chan struct{}
|
||||
|
||||
clientTLSConfig *tls.Config
|
||||
@ -93,7 +94,9 @@ func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
response := collectortracepb.ExportTraceServiceResponse{}
|
||||
response := collectortracepb.ExportTraceServiceResponse{
|
||||
PartialSuccess: c.partial,
|
||||
}
|
||||
rawResponse, err := proto.Marshal(&response)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
@ -207,6 +210,7 @@ type mockCollectorConfig struct {
|
||||
InjectHTTPStatus []int
|
||||
InjectContentType string
|
||||
InjectResponseHeader []map[string]string
|
||||
Partial *collectortracepb.ExportTracePartialSuccess
|
||||
Delay <-chan struct{}
|
||||
WithTLS bool
|
||||
ExpectedHeaders map[string]string
|
||||
@ -230,6 +234,7 @@ func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector {
|
||||
injectHTTPStatus: cfg.InjectHTTPStatus,
|
||||
injectResponseHeader: cfg.InjectResponseHeader,
|
||||
injectContentType: cfg.InjectContentType,
|
||||
partial: cfg.Partial,
|
||||
delay: cfg.Delay,
|
||||
expectedHeaders: cfg.ExpectedHeaders,
|
||||
}
|
||||
|
68
exporters/otlp/partialsuccess.go
Normal file
68
exporters/otlp/partialsuccess.go
Normal file
@ -0,0 +1,68 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
|
||||
|
||||
import "fmt"
|
||||
|
||||
// PartialSuccessDropKind indicates the kind of partial success error
|
||||
// received by an OTLP exporter, which corresponds with the signal
|
||||
// being exported.
|
||||
type PartialSuccessDropKind string
|
||||
|
||||
const (
|
||||
// TracingPartialSuccess indicates that some spans were rejected.
|
||||
TracingPartialSuccess PartialSuccessDropKind = "spans"
|
||||
|
||||
// MetricsPartialSuccess indicates that some metric data points were rejected.
|
||||
MetricsPartialSuccess PartialSuccessDropKind = "metric data points"
|
||||
)
|
||||
|
||||
// PartialSuccess represents the underlying error for all handling
|
||||
// OTLP partial success messages. Use `errors.Is(err,
|
||||
// PartialSuccess{})` to test whether an error passed to the OTel
|
||||
// error handler belongs to this category.
|
||||
type PartialSuccess struct {
|
||||
ErrorMessage string
|
||||
RejectedItems int64
|
||||
RejectedKind PartialSuccessDropKind
|
||||
}
|
||||
|
||||
var _ error = PartialSuccess{}
|
||||
|
||||
// Error implements the error interface.
|
||||
func (ps PartialSuccess) Error() string {
|
||||
msg := ps.ErrorMessage
|
||||
if msg == "" {
|
||||
msg = "empty message"
|
||||
}
|
||||
return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind)
|
||||
}
|
||||
|
||||
// Is supports the errors.Is() interface.
|
||||
func (ps PartialSuccess) Is(err error) bool {
|
||||
_, ok := err.(PartialSuccess)
|
||||
return ok
|
||||
}
|
||||
|
||||
// PartialSuccessToError produces an error suitable for passing to
|
||||
// `otel.Handle()` out of the fields in a partial success response,
|
||||
// independent of which signal produced the outcome.
|
||||
func PartialSuccessToError(kind PartialSuccessDropKind, itemsRejected int64, errorMessage string) error {
|
||||
return PartialSuccess{
|
||||
ErrorMessage: errorMessage,
|
||||
RejectedItems: itemsRejected,
|
||||
RejectedKind: kind,
|
||||
}
|
||||
}
|
44
exporters/otlp/partialsuccess_test.go
Normal file
44
exporters/otlp/partialsuccess_test.go
Normal file
@ -0,0 +1,44 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func requireErrorString(t *testing.T, expect string, err error) {
|
||||
t.Helper()
|
||||
require.NotNil(t, err)
|
||||
require.Error(t, err)
|
||||
require.True(t, errors.Is(err, PartialSuccess{}))
|
||||
|
||||
const pfx = "OTLP partial success: "
|
||||
|
||||
msg := err.Error()
|
||||
require.True(t, strings.HasPrefix(msg, pfx))
|
||||
require.Equal(t, expect, msg[len(pfx):])
|
||||
}
|
||||
|
||||
func TestPartialSuccessFormat(t *testing.T) {
|
||||
requireErrorString(t, "empty message (0 metric data points rejected)", PartialSuccessToError(MetricsPartialSuccess, 0, ""))
|
||||
requireErrorString(t, "help help (0 metric data points rejected)", PartialSuccessToError(MetricsPartialSuccess, 0, "help help"))
|
||||
requireErrorString(t, "what happened (10 metric data points rejected)", PartialSuccessToError(MetricsPartialSuccess, 10, "what happened"))
|
||||
requireErrorString(t, "what happened (15 spans rejected)", PartialSuccessToError(TracingPartialSuccess, 15, "what happened"))
|
||||
requireErrorString(t, "empty message (7 log records rejected)", PartialSuccessToError("log records", 7, ""))
|
||||
}
|
Loading…
Reference in New Issue
Block a user