From 70bc9eb391965ba22e4d9f938dc300f7e9fbc128 Mon Sep 17 00:00:00 2001 From: Gustavo Silva Paiva Date: Tue, 20 Apr 2021 14:02:02 -0300 Subject: [PATCH] Adds support for timeout on the otlp/gRPC exporter (#1821) * initial support for timeout on otlp grpc exporter * fix tests * run make * update changelog * update changelog * apply suggestions Co-authored-by: Tyler Yahn --- CHANGELOG.md | 1 + exporters/otlp/otlpgrpc/driver.go | 4 + .../otlp/otlpgrpc/mock_collector_test.go | 8 ++ exporters/otlp/otlpgrpc/options.go | 18 ++++ .../otlp/otlpgrpc/otlp_integration_test.go | 87 +++++++++++++++++++ 5 files changed, 118 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2709723be..831ed541f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `Event` and `Link` struct types from the `go.opentelemetry.io/otel` package now include a `DroppedAttributeCount` field to record the number of attributes that were not recorded due to configured limits being reached. (#1771) - The Jaeger exporter now reports dropped attributes for a Span event in the exported log. (#1771) - Adds `k8s.node.name` and `k8s.node.uid` attribute keys to the `semconv` package. (#1789) +- Adds `otlpgrpc.WithTimeout` option for configuring timeout to the otlp/gRPC exporter. (#1821) ### Fixed diff --git a/exporters/otlp/otlpgrpc/driver.go b/exporters/otlp/otlpgrpc/driver.go index 66979a360..c5df20566 100644 --- a/exporters/otlp/otlpgrpc/driver.go +++ b/exporters/otlp/otlpgrpc/driver.go @@ -123,6 +123,8 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, } ctx, cancel := d.metricsDriver.connection.contextWithStop(ctx) defer cancel() + ctx, tCancel := context.WithTimeout(ctx, d.metricsDriver.connection.sCfg.Timeout) + defer tCancel() rms, err := transform.CheckpointSet(ctx, selector, cps, 1) if err != nil { @@ -162,6 +164,8 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) } ctx, cancel := d.tracesDriver.connection.contextWithStop(ctx) defer cancel() + ctx, tCancel := context.WithTimeout(ctx, d.tracesDriver.connection.sCfg.Timeout) + defer tCancel() protoSpans := transform.SpanData(ss) if len(protoSpans) == 0 { diff --git a/exporters/otlp/otlpgrpc/mock_collector_test.go b/exporters/otlp/otlpgrpc/mock_collector_test.go index 21ae4f22e..7183b9511 100644 --- a/exporters/otlp/otlpgrpc/mock_collector_test.go +++ b/exporters/otlp/otlpgrpc/mock_collector_test.go @@ -52,6 +52,7 @@ type mockTraceService struct { mu sync.RWMutex storage otlptest.SpansStorage headers metadata.MD + delay time.Duration } func (mts *mockTraceService) getHeaders() metadata.MD { @@ -73,6 +74,9 @@ func (mts *mockTraceService) getResourceSpans() []*tracepb.ResourceSpans { } func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.ExportTraceServiceRequest) (*collectortracepb.ExportTraceServiceResponse, error) { + if mts.delay > 0 { + time.Sleep(mts.delay) + } reply := &collectortracepb.ExportTraceServiceResponse{} mts.mu.Lock() defer mts.mu.Unlock() @@ -86,6 +90,7 @@ type mockMetricService struct { mu sync.RWMutex storage otlptest.MetricsStorage + delay time.Duration } func (mms *mockMetricService) getMetrics() []*metricpb.Metric { @@ -95,6 +100,9 @@ func (mms *mockMetricService) getMetrics() []*metricpb.Metric { } func (mms *mockMetricService) Export(ctx context.Context, exp *collectormetricpb.ExportMetricsServiceRequest) (*collectormetricpb.ExportMetricsServiceResponse, error) { + if mms.delay > 0 { + time.Sleep(mms.delay) + } reply := &collectormetricpb.ExportMetricsServiceResponse{} mms.mu.Lock() defer mms.mu.Unlock() diff --git a/exporters/otlp/otlpgrpc/options.go b/exporters/otlp/otlpgrpc/options.go index 37d877bf2..dd7201f94 100644 --- a/exporters/otlp/otlpgrpc/options.go +++ b/exporters/otlp/otlpgrpc/options.go @@ -182,3 +182,21 @@ func WithDialOption(opts ...grpc.DialOption) Option { cfg.DialOptions = opts }) } + +// WithTimeout tells the driver the max waiting time for the backend to process +// each spans or metrics batch. If unset, the default will be 10 seconds. +func WithTimeout(duration time.Duration) Option { + return otlpconfig.WithTimeout(duration) +} + +// WithTracesTimeout tells the driver the max waiting time for the backend to process +// each spans batch. If unset, the default will be 10 seconds. +func WithTracesTimeout(duration time.Duration) Option { + return otlpconfig.WithTracesTimeout(duration) +} + +// WithMetricsTimeout tells the driver the max waiting time for the backend to process +// each metrics batch. If unset, the default will be 10 seconds. +func WithMetricsTimeout(duration time.Duration) Option { + return otlpconfig.WithMetricsTimeout(duration) +} diff --git a/exporters/otlp/otlpgrpc/otlp_integration_test.go b/exporters/otlp/otlpgrpc/otlp_integration_test.go index b598b9dd5..153f45dae 100644 --- a/exporters/otlp/otlpgrpc/otlp_integration_test.go +++ b/exporters/otlp/otlpgrpc/otlp_integration_test.go @@ -22,6 +22,9 @@ import ( "testing" "time" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -302,6 +305,90 @@ func TestNewExporter_withHeaders(t *testing.T) { assert.Equal(t, "value1", headers.Get("header1")[0]) } +func TestNewExporter_WithTimeout(t *testing.T) { + tts := []struct { + name string + fn func(exp *otlp.Exporter) error + timeout time.Duration + metrics int + spans int + code codes.Code + delay bool + }{ + { + name: "Timeout Spans", + fn: func(exp *otlp.Exporter) error { + return exp.ExportSpans(context.Background(), []*sdktrace.SpanSnapshot{{Name: "timed out"}}) + }, + timeout: time.Millisecond * 100, + code: codes.DeadlineExceeded, + delay: true, + }, + { + name: "Timeout Metrics", + fn: func(exp *otlp.Exporter) error { + return exp.Export(context.Background(), otlptest.OneRecordCheckpointSet{}) + }, + timeout: time.Millisecond * 100, + code: codes.DeadlineExceeded, + delay: true, + }, + + { + name: "No Timeout Spans", + fn: func(exp *otlp.Exporter) error { + return exp.ExportSpans(context.Background(), []*sdktrace.SpanSnapshot{{Name: "timed out"}}) + }, + timeout: time.Minute, + spans: 1, + code: codes.OK, + }, + { + name: "No Timeout Metrics", + fn: func(exp *otlp.Exporter) error { + return exp.Export(context.Background(), otlptest.OneRecordCheckpointSet{}) + }, + timeout: time.Minute, + metrics: 1, + code: codes.OK, + }, + } + + for _, tt := range tts { + t.Run(tt.name, func(t *testing.T) { + + mc := runMockCollector(t) + if tt.delay { + mc.traceSvc.delay = time.Second * 10 + mc.metricSvc.delay = time.Second * 10 + } + defer func() { + _ = mc.stop() + }() + + ctx := context.Background() + exp := newGRPCExporter(t, ctx, mc.endpoint, otlpgrpc.WithTimeout(tt.timeout)) + defer func() { + _ = exp.Shutdown(ctx) + }() + + err := tt.fn(exp) + + if tt.code == codes.OK { + require.NoError(t, err) + } else { + require.Error(t, err) + } + + s := status.Convert(err) + require.Equal(t, tt.code, s.Code()) + + require.Len(t, mc.getSpans(), tt.spans) + require.Len(t, mc.getMetrics(), tt.metrics) + }) + } +} + func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) { mc := runMockCollector(t) defer func() {