From 569f7430726278f40e562f81e64d21156cc3edb9 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 6 Sep 2022 12:20:19 -0700 Subject: [PATCH] Handle partial-success responses for OTLP trace (#3106) * Handle partial-success responses for OTLP trace Co-authored-by: David Ashpole Co-authored-by: Chester Cheung Co-authored-by: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> --- CHANGELOG.md | 1 + .../otlp/otlptrace/otlptracegrpc/client.go | 11 ++- .../otlptrace/otlptracegrpc/client_test.go | 27 +++++++- .../otlptracegrpc/mock_collector_test.go | 7 +- .../otlp/otlptrace/otlptracehttp/client.go | 52 ++++++++++---- .../otlptrace/otlptracehttp/client_test.go | 34 ++++++++++ .../otlptracehttp/mock_collector_test.go | 7 +- exporters/otlp/partialsuccess.go | 68 +++++++++++++++++++ exporters/otlp/partialsuccess_test.go | 44 ++++++++++++ 9 files changed, 232 insertions(+), 19 deletions(-) create mode 100644 exporters/otlp/partialsuccess.go create mode 100644 exporters/otlp/partialsuccess_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e4a8df64..4dec154c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Support Go 1.19. Include compatibility testing and document support. (#3077) +- Support the OTLP ExportTracePartialSuccess response; these are passed to the registered error handler. (#3106) - Upgrade go.opentelemetry.io/proto/otlp from v0.18.0 to v0.19.0 (#3107) - Add an `Attribute` field to the `Scope` type in `go.opentelemetry.io/otel/sdk/instrumentation`. (#3131) - Add the `WithScopeAttributes` `TracerOption` to the `go.opentelemetry.io/otel/trace` package. (#3131) diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client.go b/exporters/otlp/otlptrace/otlptracegrpc/client.go index 6be3fec86..4a139fc69 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client.go @@ -26,6 +26,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" @@ -196,9 +198,16 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc defer cancel() return c.requestFunc(ctx, func(iCtx context.Context) error { - _, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{ + resp, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{ ResourceSpans: protoSpans, }) + if resp != nil && resp.PartialSuccess != nil { + otel.Handle(otlp.PartialSuccessToError( + otlp.TracingPartialSuccess, + resp.PartialSuccess.RejectedSpans, + resp.PartialSuccess.ErrorMessage, + )) + } // nil is converted to OK. if status.Code(err) == codes.OK { // Success. diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go index 84e7da801..d11111ed1 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go @@ -4,7 +4,7 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, @@ -30,12 +30,14 @@ import ( "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlptracetest" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" + coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" commonpb "go.opentelemetry.io/proto/otlp/common/v1" ) @@ -386,3 +388,26 @@ func TestEmptyData(t *testing.T) { assert.NoError(t, exp.ExportSpans(ctx, nil)) } + +func TestPartialSuccess(t *testing.T) { + mc := runMockCollectorWithConfig(t, &mockConfig{ + partial: &coltracepb.ExportTracePartialSuccess{ + RejectedSpans: 2, + ErrorMessage: "partially successful", + }, + }) + t.Cleanup(func() { require.NoError(t, mc.stop()) }) + + errors := []error{} + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + errors = append(errors, err) + })) + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint) + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + require.NoError(t, exp.ExportSpans(ctx, roSpans)) + + require.Equal(t, 1, len(errors)) + require.Contains(t, errors[0].Error(), "partially successful") + require.Contains(t, errors[0].Error(), "2 spans rejected") +} diff --git a/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go b/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go index d3ebd8357..56b65a5d6 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/mock_collector_test.go @@ -36,6 +36,7 @@ func makeMockCollector(t *testing.T, mockConfig *mockConfig) *mockCollector { traceSvc: &mockTraceService{ storage: otlptracetest.NewSpansStorage(), errors: mockConfig.errors, + partial: mockConfig.partial, }, } } @@ -44,6 +45,7 @@ type mockTraceService struct { collectortracepb.UnimplementedTraceServiceServer errors []error + partial *collectortracepb.ExportTracePartialSuccess requests int mu sync.RWMutex storage otlptracetest.SpansStorage @@ -82,7 +84,9 @@ func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.E <-mts.exportBlock } - reply := &collectortracepb.ExportTraceServiceResponse{} + reply := &collectortracepb.ExportTraceServiceResponse{ + PartialSuccess: mts.partial, + } if mts.requests < len(mts.errors) { idx := mts.requests return reply, mts.errors[idx] @@ -106,6 +110,7 @@ type mockCollector struct { type mockConfig struct { errors []error endpoint string + partial *collectortracepb.ExportTracePartialSuccess } var _ collectortracepb.TraceServiceServer = (*mockTraceService)(nil) diff --git a/exporters/otlp/otlptrace/otlptracehttp/client.go b/exporters/otlp/otlptrace/otlptracehttp/client.go index 0c050eb2f..59b7e2092 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client.go @@ -29,6 +29,8 @@ import ( "google.golang.org/protobuf/proto" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp" "go.opentelemetry.io/otel/exporters/otlp/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig" @@ -154,28 +156,48 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc return err } - var rErr error + if resp != nil && resp.Body != nil { + defer func() { + if err := resp.Body.Close(); err != nil { + otel.Handle(err) + } + }() + } + switch resp.StatusCode { case http.StatusOK: // Success, do not retry. - case http.StatusTooManyRequests, - http.StatusServiceUnavailable: - // Retry-able failure. - rErr = newResponseError(resp.Header) - - // Going to retry, drain the body to reuse the connection. - if _, err := io.Copy(io.Discard, resp.Body); err != nil { - _ = resp.Body.Close() + // Read the partial success message, if any. + var respData bytes.Buffer + if _, err := io.Copy(&respData, resp.Body); err != nil { return err } - default: - rErr = fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status) - } - if err := resp.Body.Close(); err != nil { - return err + if respData.Len() != 0 { + var respProto coltracepb.ExportTraceServiceResponse + if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { + return err + } + + if respProto.PartialSuccess != nil { + otel.Handle(otlp.PartialSuccessToError( + otlp.TracingPartialSuccess, + respProto.PartialSuccess.RejectedSpans, + respProto.PartialSuccess.ErrorMessage, + )) + } + } + return nil + + case http.StatusTooManyRequests, http.StatusServiceUnavailable: + // Retry-able failures. Drain the body to reuse the connection. + if _, err := io.Copy(io.Discard, resp.Body); err != nil { + otel.Handle(err) + } + return newResponseError(resp.Header) + default: + return fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status) } - return rErr }) } diff --git a/exporters/otlp/otlptrace/otlptracehttp/client_test.go b/exporters/otlp/otlptrace/otlptracehttp/client_test.go index d14a6ce80..bf497bad4 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client_test.go @@ -25,9 +25,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlptracetest" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" ) const ( @@ -348,3 +350,35 @@ func TestStopWhileExporting(t *testing.T) { assert.NoError(t, err) <-doneCh } + +func TestPartialSuccess(t *testing.T) { + mcCfg := mockCollectorConfig{ + Partial: &coltracepb.ExportTracePartialSuccess{ + RejectedSpans: 2, + ErrorMessage: "partially successful", + }, + } + mc := runMockCollector(t, mcCfg) + defer mc.MustStop(t) + driver := otlptracehttp.NewClient( + otlptracehttp.WithEndpoint(mc.Endpoint()), + otlptracehttp.WithInsecure(), + ) + ctx := context.Background() + exporter, err := otlptrace.New(ctx, driver) + require.NoError(t, err) + defer func() { + assert.NoError(t, exporter.Shutdown(context.Background())) + }() + + errors := []error{} + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + errors = append(errors, err) + })) + err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan()) + assert.NoError(t, err) + + require.Equal(t, 1, len(errors)) + require.Contains(t, errors[0].Error(), "partially successful") + require.Contains(t, errors[0].Error(), "2 spans rejected") +} diff --git a/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go b/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go index 895b34af4..d999c1c89 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go @@ -46,6 +46,7 @@ type mockCollector struct { injectHTTPStatus []int injectResponseHeader []map[string]string injectContentType string + partial *collectortracepb.ExportTracePartialSuccess delay <-chan struct{} clientTLSConfig *tls.Config @@ -93,7 +94,9 @@ func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } - response := collectortracepb.ExportTraceServiceResponse{} + response := collectortracepb.ExportTraceServiceResponse{ + PartialSuccess: c.partial, + } rawResponse, err := proto.Marshal(&response) if err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -207,6 +210,7 @@ type mockCollectorConfig struct { InjectHTTPStatus []int InjectContentType string InjectResponseHeader []map[string]string + Partial *collectortracepb.ExportTracePartialSuccess Delay <-chan struct{} WithTLS bool ExpectedHeaders map[string]string @@ -230,6 +234,7 @@ func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector { injectHTTPStatus: cfg.InjectHTTPStatus, injectResponseHeader: cfg.InjectResponseHeader, injectContentType: cfg.InjectContentType, + partial: cfg.Partial, delay: cfg.Delay, expectedHeaders: cfg.ExpectedHeaders, } diff --git a/exporters/otlp/partialsuccess.go b/exporters/otlp/partialsuccess.go new file mode 100644 index 000000000..2652304e9 --- /dev/null +++ b/exporters/otlp/partialsuccess.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp // import "go.opentelemetry.io/otel/exporters/otlp" + +import "fmt" + +// PartialSuccessDropKind indicates the kind of partial success error +// received by an OTLP exporter, which corresponds with the signal +// being exported. +type PartialSuccessDropKind string + +const ( + // TracingPartialSuccess indicates that some spans were rejected. + TracingPartialSuccess PartialSuccessDropKind = "spans" + + // MetricsPartialSuccess indicates that some metric data points were rejected. + MetricsPartialSuccess PartialSuccessDropKind = "metric data points" +) + +// PartialSuccess represents the underlying error for all handling +// OTLP partial success messages. Use `errors.Is(err, +// PartialSuccess{})` to test whether an error passed to the OTel +// error handler belongs to this category. +type PartialSuccess struct { + ErrorMessage string + RejectedItems int64 + RejectedKind PartialSuccessDropKind +} + +var _ error = PartialSuccess{} + +// Error implements the error interface. +func (ps PartialSuccess) Error() string { + msg := ps.ErrorMessage + if msg == "" { + msg = "empty message" + } + return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind) +} + +// Is supports the errors.Is() interface. +func (ps PartialSuccess) Is(err error) bool { + _, ok := err.(PartialSuccess) + return ok +} + +// PartialSuccessToError produces an error suitable for passing to +// `otel.Handle()` out of the fields in a partial success response, +// independent of which signal produced the outcome. +func PartialSuccessToError(kind PartialSuccessDropKind, itemsRejected int64, errorMessage string) error { + return PartialSuccess{ + ErrorMessage: errorMessage, + RejectedItems: itemsRejected, + RejectedKind: kind, + } +} diff --git a/exporters/otlp/partialsuccess_test.go b/exporters/otlp/partialsuccess_test.go new file mode 100644 index 000000000..1a6a350a2 --- /dev/null +++ b/exporters/otlp/partialsuccess_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package otlp // import "go.opentelemetry.io/otel/exporters/otlp" + +import ( + "errors" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func requireErrorString(t *testing.T, expect string, err error) { + t.Helper() + require.NotNil(t, err) + require.Error(t, err) + require.True(t, errors.Is(err, PartialSuccess{})) + + const pfx = "OTLP partial success: " + + msg := err.Error() + require.True(t, strings.HasPrefix(msg, pfx)) + require.Equal(t, expect, msg[len(pfx):]) +} + +func TestPartialSuccessFormat(t *testing.T) { + requireErrorString(t, "empty message (0 metric data points rejected)", PartialSuccessToError(MetricsPartialSuccess, 0, "")) + requireErrorString(t, "help help (0 metric data points rejected)", PartialSuccessToError(MetricsPartialSuccess, 0, "help help")) + requireErrorString(t, "what happened (10 metric data points rejected)", PartialSuccessToError(MetricsPartialSuccess, 10, "what happened")) + requireErrorString(t, "what happened (15 spans rejected)", PartialSuccessToError(TracingPartialSuccess, 15, "what happened")) + requireErrorString(t, "empty message (7 log records rejected)", PartialSuccessToError("log records", 7, "")) +}