From b4e15cd1c7db7aa9d1268134e387c4823a966bda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20Paj=C4=85k?= Date: Thu, 23 Apr 2026 22:12:44 +0200 Subject: [PATCH] Add MaxRequestSize option to OTLP exporters (#8157) Per https://github.com/open-telemetry/opentelemetry-proto/pull/782 - Introduce WithMaxRequestSize option for OTLP exporters to set a limit on the size of serialized export requests. - Implement logic in the HTTP and gRPC clients to check the request size against the configured maximum before compression and sending. - The default configuration is that the maximum request size is 32 MiB. --- CHANGELOG.md | 18 +++++++ exporters/otlp/otlplog/otlploggrpc/client.go | 25 ++++++---- .../otlp/otlplog/otlploggrpc/client_test.go | 30 ++++++++++++ exporters/otlp/otlplog/otlploggrpc/config.go | 38 +++++++++++---- .../otlp/otlplog/otlploggrpc/config_test.go | 5 ++ exporters/otlp/otlplog/otlploghttp/client.go | 30 +++++++----- .../otlp/otlplog/otlploghttp/client_test.go | 23 +++++++++ exporters/otlp/otlplog/otlploghttp/config.go | 48 +++++++++++++------ .../otlp/otlplog/otlploghttp/config_test.go | 21 ++++---- .../otlp/otlpmetric/otlpmetricgrpc/client.go | 27 +++++++---- .../otlpmetric/otlpmetricgrpc/client_test.go | 16 +++++++ .../otlp/otlpmetric/otlpmetricgrpc/config.go | 10 ++++ .../otlpmetricgrpc/internal/oconf/options.go | 43 +++++++++++------ .../internal/oconf/options_test.go | 10 ++++ .../otlp/otlpmetric/otlpmetrichttp/client.go | 23 +++++---- .../otlpmetric/otlpmetrichttp/client_test.go | 24 ++++++++++ .../otlp/otlpmetric/otlpmetrichttp/config.go | 10 ++++ .../otlpmetrichttp/internal/oconf/options.go | 43 +++++++++++------ .../internal/oconf/options_test.go | 10 ++++ .../otlp/otlptrace/otlptracegrpc/client.go | 44 ++++++++++------- .../otlptrace/otlptracegrpc/client_test.go | 23 +++++++++ .../internal/otlpconfig/options.go | 43 +++++++++++------ .../internal/otlpconfig/options_test.go | 10 ++++ .../otlp/otlptrace/otlptracegrpc/options.go | 10 ++++ .../otlp/otlptrace/otlptracehttp/client.go | 4 ++ .../otlptrace/otlptracehttp/client_test.go | 23 +++++++++ .../internal/otlpconfig/options.go | 43 +++++++++++------ .../internal/otlpconfig/options_test.go | 10 ++++ .../otlp/otlptrace/otlptracehttp/options.go | 10 ++++ .../otlp/otlpmetric/oconf/options.go.tmpl | 43 +++++++++++------ .../otlpmetric/oconf/options_test.go.tmpl | 10 ++++ .../otlp/otlptrace/otlpconfig/options.go.tmpl | 43 +++++++++++------ .../otlptrace/otlpconfig/options_test.go.tmpl | 10 ++++ 33 files changed, 601 insertions(+), 179 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d9ab0a0f..17d786a7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Support `SLICE` attributes in `go.opentelemetry.io/otel/exporters/zipkin`. (#8216) - Apply `AttributeValueLengthLimit` to `attribute.SLICE` type attribute values in `go.opentelemetry.io/otel/sdk/trace`, recursively truncating contained string values. (#8217) - Add `Error` field on `Record` type in `go.opentelemetry.io/otel/log/logtest`. (#8148) +- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#8157) +- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#8157) +- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#8157) +- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8157) +- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#8157) +- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#8157) - Add experimental support for splitting metric data across multiple batches in `go.opentelemetry.io/otel/sdk/metric`. Set `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=` to enable for all periodic readers. See `go.opentelemetry.io/otel/sdk/metric/internal/x` for feature documentation. (#8071) @@ -50,6 +56,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Fixed +- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. + The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157) +- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. + The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157) +- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. + The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157) +- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. + The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157) +- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. + The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157) +- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. + The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157) - Fix gzipped request body replay on redirect in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8135) - Fix gzipped request body replay on redirect in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#8152) - `go.opentelemetry.io/otel/exporters/prometheus` now uses `Value.String` formatting for label values following the [OpenTelemetry AnyValue representation for non-OTLP protocols](https://opentelemetry.io/docs/specs/otel/common/#anyvalue). (#8170) diff --git a/exporters/otlp/otlplog/otlploggrpc/client.go b/exporters/otlp/otlplog/otlploggrpc/client.go index 320d803e9..52c36e77a 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client.go +++ b/exporters/otlp/otlplog/otlploggrpc/client.go @@ -6,6 +6,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o import ( "context" "errors" + "fmt" "sync/atomic" "time" @@ -20,6 +21,7 @@ import ( "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ" @@ -28,9 +30,10 @@ import ( // The methods of this type are not expected to be called concurrently. type client struct { - metadata metadata.MD - exportTimeout time.Duration - requestFunc retry.RequestFunc + metadata metadata.MD + exportTimeout time.Duration + maxRequestSize int + requestFunc retry.RequestFunc // ourConn keeps track of where conn was created: true if created here in // NewClient, or false if passed with an option. This is important on @@ -49,9 +52,10 @@ var newGRPCClientFn = grpc.NewClient // newClient creates a new gRPC log client. func newClient(cfg config) (*client, error) { c := &client{ - exportTimeout: cfg.timeout.Value, - requestFunc: cfg.retryCfg.Value.RequestFunc(retryable), - conn: cfg.gRPCConn.Value, + exportTimeout: cfg.timeout.Value, + maxRequestSize: cfg.maxRequestSize.Value, + requestFunc: cfg.retryCfg.Value.RequestFunc(retryable), + conn: cfg.gRPCConn.Value, } if len(cfg.headers.Value) > 0 { @@ -146,6 +150,7 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo ctx, cancel := c.exportContext(ctx) defer cancel() + pbRequest := &collogpb.ExportLogsServiceRequest{ResourceLogs: rl} count := int64(len(rl)) if c.instrumentation != nil { eo := c.instrumentation.ExportLogs(ctx, count) @@ -154,10 +159,12 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo }() } + if maxSize := c.maxRequestSize; maxSize > 0 && proto.Size(pbRequest) > maxSize { + return fmt.Errorf("request message too large: exceeded %d bytes", maxSize) + } + return errors.Join(uploadErr, c.requestFunc(ctx, func(ctx context.Context) error { - resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{ - ResourceLogs: rl, - }) + resp, err := c.lsc.Export(ctx, pbRequest) if resp != nil && resp.PartialSuccess != nil { msg := resp.PartialSuccess.GetErrorMessage() n := resp.PartialSuccess.GetRejectedLogRecords() diff --git a/exporters/otlp/otlplog/otlploggrpc/client_test.go b/exporters/otlp/otlplog/otlploggrpc/client_test.go index b1f753b6e..9b1e2d357 100644 --- a/exporters/otlp/otlplog/otlploggrpc/client_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/client_test.go @@ -149,6 +149,16 @@ var ( }} ) +type logsServiceClientFunc func(context.Context, *collogpb.ExportLogsServiceRequest, ...grpc.CallOption) (*collogpb.ExportLogsServiceResponse, error) + +func (f logsServiceClientFunc) Export( + ctx context.Context, + req *collogpb.ExportLogsServiceRequest, + opts ...grpc.CallOption, +) (*collogpb.ExportLogsServiceResponse, error) { + return f(ctx, req, opts...) +} + func TestThrottleDelay(t *testing.T) { c := codes.ResourceExhausted testcases := []struct { @@ -267,6 +277,26 @@ func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) { assert.Equal(t, delay, d) } +func TestUploadLogsRequestSizeLimit(t *testing.T) { + var calls int + c := &client{ + maxRequestSize: 1, + requestFunc: func(ctx context.Context, fn func(context.Context) error) error { + return fn(ctx) + }, + lsc: logsServiceClientFunc( + func(context.Context, *collogpb.ExportLogsServiceRequest, ...grpc.CallOption) (*collogpb.ExportLogsServiceResponse, error) { + calls++ + return &collogpb.ExportLogsServiceResponse{}, nil + }, + ), + } + + err := c.UploadLogs(t.Context(), []*lpb.ResourceLogs{{}}) + assert.ErrorContains(t, err, "request message too large") + assert.Equal(t, 0, calls, "oversized request must fail before sending") +} + func TestNewClient(t *testing.T) { newGRPCClientFnSwap := newGRPCClientFn t.Cleanup(func() { diff --git a/exporters/otlp/otlplog/otlploggrpc/config.go b/exporters/otlp/otlplog/otlploggrpc/config.go index f2a3c552d..b878fefd8 100644 --- a/exporters/otlp/otlplog/otlploggrpc/config.go +++ b/exporters/otlp/otlplog/otlploggrpc/config.go @@ -25,9 +25,10 @@ import ( // Default values. var ( - defaultEndpoint = "localhost:4317" - defaultTimeout = 10 * time.Second - defaultRetryCfg = retry.DefaultConfig + defaultEndpoint = "localhost:4317" + defaultTimeout = 10 * time.Second + defaultMaxRequestSize = 32 * 1024 * 1024 + defaultRetryCfg = retry.DefaultConfig ) // Environment variable keys. @@ -85,13 +86,14 @@ type Option interface { } type config struct { - endpoint setting[string] - insecure setting[bool] - tlsCfg setting[*tls.Config] - headers setting[map[string]string] - compression setting[Compression] - timeout setting[time.Duration] - retryCfg setting[retry.Config] + endpoint setting[string] + insecure setting[bool] + tlsCfg setting[*tls.Config] + headers setting[map[string]string] + compression setting[Compression] + maxRequestSize setting[int] + timeout setting[time.Duration] + retryCfg setting[retry.Config] // gRPC configurations gRPCCredentials setting[credentials.TransportCredentials] @@ -129,6 +131,9 @@ func newConfig(options []Option) config { getEnv[time.Duration](envTimeout, convDuration), fallback[time.Duration](defaultTimeout), ) + c.maxRequestSize = c.maxRequestSize.Resolve( + fallback[int](defaultMaxRequestSize), + ) c.retryCfg = c.retryCfg.Resolve( fallback[retry.Config](defaultRetryCfg), ) @@ -353,6 +358,19 @@ func WithTimeout(duration time.Duration) Option { }) } +// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export +// request, before compression, that the exporter will send. +// +// If size is less than or equal to zero, no request-size limit is applied. +// Disabling the limit is not recommended because it can lead to excessive +// resource consumption or abuse. +func WithMaxRequestSize(size int) Option { + return fnOpt(func(c config) config { + c.maxRequestSize = newSetting(size) + return c + }) +} + // WithRetry sets the retry policy for transient retryable errors that are // returned by the target endpoint. // diff --git a/exporters/otlp/otlplog/otlploggrpc/config_test.go b/exporters/otlp/otlplog/otlploggrpc/config_test.go index 5005d9cca..e08de4ff5 100644 --- a/exporters/otlp/otlplog/otlploggrpc/config_test.go +++ b/exporters/otlp/otlplog/otlploggrpc/config_test.go @@ -112,6 +112,7 @@ func TestNewConfig(t *testing.T) { WithServiceConfig("{}"), WithDialOption(dialOptions...), WithGRPCConn(&grpc.ClientConn{}), + WithMaxRequestSize(1), WithTimeout(2 * time.Second), WithRetry(RetryConfig(rc)), }, @@ -120,6 +121,7 @@ func TestNewConfig(t *testing.T) { insecure: newSetting(true), headers: newSetting(headers), compression: newSetting(GzipCompression), + maxRequestSize: newSetting(1), timeout: newSetting(2 * time.Second), retryCfg: newSetting(rc), gRPCCredentials: newSetting(credentials.NewTLS(tlsCfg)), @@ -494,6 +496,9 @@ func TestNewConfig(t *testing.T) { return func() { otel.SetErrorHandler(orig) } }(otel.GetErrorHandler())) c := newConfig(tc.options) + if !tc.want.maxRequestSize.Set { + tc.want.maxRequestSize = newSetting(defaultMaxRequestSize) + } // Do not compare pointer values. assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg) diff --git a/exporters/otlp/otlplog/otlploghttp/client.go b/exporters/otlp/otlplog/otlploghttp/client.go index fbe43167d..f0c51ae6c 100644 --- a/exporters/otlp/otlplog/otlploghttp/client.go +++ b/exporters/otlp/otlplog/otlploghttp/client.go @@ -111,10 +111,11 @@ func newHTTPClient(ctx context.Context, cfg config) (*client, error) { req.Header.Set("Content-Type", "application/x-protobuf") c := &httpClient{ - compression: cfg.compression.Value, - req: req, - requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate), - client: hc, + compression: cfg.compression.Value, + maxRequestSize: cfg.maxRequestSize.Value, + req: req, + requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate), + client: hc, } id := nextExporterID() @@ -125,10 +126,11 @@ func newHTTPClient(ctx context.Context, cfg config) (*client, error) { type httpClient struct { // req is cloned for every upload the client makes. - req *http.Request - compression Compression - requestFunc retry.RequestFunc - client *http.Client + req *http.Request + compression Compression + maxRequestSize int + requestFunc retry.RequestFunc + client *http.Client inst *observ.Instrumentation } @@ -159,10 +161,6 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs) if err != nil { return err } - request, err := c.newRequest(ctx, body) - if err != nil { - return err - } var statusCode int if c.inst != nil { @@ -170,6 +168,14 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs) defer func() { op.End(uploadErr, statusCode) }() } + if maxSize := c.maxRequestSize; maxSize > 0 && len(body) > maxSize { + return fmt.Errorf("request body too large: exceeded %d bytes", maxSize) + } + request, err := c.newRequest(ctx, body) + if err != nil { + return err + } + return errors.Join(uploadErr, c.requestFunc(ctx, func(iCtx context.Context) error { select { case <-iCtx.Done(): diff --git a/exporters/otlp/otlplog/otlploghttp/client_test.go b/exporters/otlp/otlplog/otlploghttp/client_test.go index b732da781..318ff7972 100644 --- a/exporters/otlp/otlplog/otlploghttp/client_test.go +++ b/exporters/otlp/otlplog/otlploghttp/client_test.go @@ -1138,6 +1138,29 @@ func TestResponseBodySizeLimit(t *testing.T) { } } +func TestRequestBodySizeLimit(t *testing.T) { + var calls int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + calls++ + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(srv.Close) + + opts := []Option{ + WithEndpoint(srv.Listener.Addr().String()), + WithInsecure(), + WithMaxRequestSize(1), + WithRetry(RetryConfig{Enabled: false}), + } + cfg := newConfig(opts) + c, err := newHTTPClient(t.Context(), cfg) + require.NoError(t, err) + + err = c.UploadLogs(t.Context(), []*lpb.ResourceLogs{{}}) + assert.ErrorContains(t, err, "request body too large") + assert.Equal(t, 0, calls, "oversized request must fail before sending") +} + func BenchmarkExporterExportLogs(b *testing.B) { const n = 10 diff --git a/exporters/otlp/otlplog/otlploghttp/config.go b/exporters/otlp/otlplog/otlploghttp/config.go index 21e700769..480ee92a0 100644 --- a/exporters/otlp/otlplog/otlploghttp/config.go +++ b/exporters/otlp/otlplog/otlploghttp/config.go @@ -23,11 +23,12 @@ import ( // Default values. var ( - defaultEndpoint = "localhost:4318" - defaultPath = "/v1/logs" - defaultTimeout = 10 * time.Second - defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment - defaultRetryCfg = retry.DefaultConfig + defaultEndpoint = "localhost:4318" + defaultPath = "/v1/logs" + defaultTimeout = 10 * time.Second + defaultMaxRequestSize = 32 * 1024 * 1024 + defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment + defaultRetryCfg = retry.DefaultConfig ) // Environment variable keys. @@ -89,16 +90,17 @@ type fnOpt func(config) config func (f fnOpt) applyHTTPOption(c config) config { return f(c) } type config struct { - endpoint setting[string] - path setting[string] - insecure setting[bool] - tlsCfg setting[*tls.Config] - headers setting[map[string]string] - compression setting[Compression] - timeout setting[time.Duration] - proxy setting[HTTPTransportProxyFunc] - retryCfg setting[retry.Config] - httpClient *http.Client + endpoint setting[string] + path setting[string] + insecure setting[bool] + tlsCfg setting[*tls.Config] + headers setting[map[string]string] + compression setting[Compression] + maxRequestSize setting[int] + timeout setting[time.Duration] + proxy setting[HTTPTransportProxyFunc] + retryCfg setting[retry.Config] + httpClient *http.Client } func newConfig(options []Option) config { @@ -133,6 +135,9 @@ func newConfig(options []Option) config { getenv[time.Duration](envTimeout, convDuration), fallback[time.Duration](defaultTimeout), ) + c.maxRequestSize = c.maxRequestSize.Resolve( + fallback[int](defaultMaxRequestSize), + ) c.proxy = c.proxy.Resolve( fallback[HTTPTransportProxyFunc](defaultProxy), ) @@ -313,6 +318,19 @@ func WithTimeout(duration time.Duration) Option { }) } +// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export +// request, before compression, that the exporter will send. +// +// If size is less than or equal to zero, no request-size limit is applied. +// Disabling the limit is not recommended because it can lead to excessive +// resource consumption or abuse. +func WithMaxRequestSize(size int) Option { + return fnOpt(func(c config) config { + c.maxRequestSize = newSetting(size) + return c + }) +} + // RetryConfig defines configuration for retrying the export of log data that // failed. type RetryConfig retry.Config diff --git a/exporters/otlp/otlplog/otlploghttp/config_test.go b/exporters/otlp/otlplog/otlploghttp/config_test.go index ac2586b27..3f1008c0f 100644 --- a/exporters/otlp/otlplog/otlploghttp/config_test.go +++ b/exporters/otlp/otlplog/otlploghttp/config_test.go @@ -107,19 +107,21 @@ func TestNewConfig(t *testing.T) { WithTLSClientConfig(tlsCfg), WithCompression(GzipCompression), WithHeaders(headers), + WithMaxRequestSize(1), WithTimeout(time.Second), WithRetry(RetryConfig(rc)), // Do not test WithProxy. Requires func comparison. }, want: config{ - endpoint: newSetting("test"), - path: newSetting("/path"), - insecure: newSetting(true), - tlsCfg: newSetting(tlsCfg), - headers: newSetting(headers), - compression: newSetting(GzipCompression), - timeout: newSetting(time.Second), - retryCfg: newSetting(rc), + endpoint: newSetting("test"), + path: newSetting("/path"), + insecure: newSetting(true), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + maxRequestSize: newSetting(1), + timeout: newSetting(time.Second), + retryCfg: newSetting(rc), }, }, { @@ -509,6 +511,9 @@ func TestNewConfig(t *testing.T) { return func() { otel.SetErrorHandler(orig) } }(otel.GetErrorHandler())) c := newConfig(tc.options) + if !tc.want.maxRequestSize.Set { + tc.want.maxRequestSize = newSetting(defaultMaxRequestSize) + } // Do not compare pointer values. assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg) diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go index 7b7f90729..22b845053 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client.go @@ -6,6 +6,7 @@ package otlpmetricgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpme import ( "context" "errors" + "fmt" "time" colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" @@ -15,6 +16,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf" @@ -22,9 +24,10 @@ import ( ) type client struct { - metadata metadata.MD - exportTimeout time.Duration - requestFunc retry.RequestFunc + metadata metadata.MD + exportTimeout time.Duration + maxRequestSize int + requestFunc retry.RequestFunc // ourConn keeps track of where conn was created: true if created here in // NewClient, or false if passed with an option. This is important on @@ -38,9 +41,10 @@ type client struct { // newClient creates a new gRPC metric client. func newClient(_ context.Context, cfg oconf.Config) (*client, error) { c := &client{ - exportTimeout: cfg.Metrics.Timeout, - requestFunc: cfg.RetryConfig.RequestFunc(retryable), - conn: cfg.GRPCConn, + exportTimeout: cfg.Metrics.Timeout, + maxRequestSize: cfg.Metrics.MaxRequestSize, + requestFunc: cfg.RetryConfig.RequestFunc(retryable), + conn: cfg.GRPCConn, } if len(cfg.Metrics.Headers) > 0 { @@ -115,10 +119,15 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou ctx, cancel := c.exportContext(ctx) defer cancel() + pbRequest := &colmetricpb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, + } + if maxSize := c.maxRequestSize; maxSize > 0 && proto.Size(pbRequest) > maxSize { + return fmt.Errorf("request message too large: exceeded %d bytes", maxSize) + } + return errors.Join(uploadErr, c.requestFunc(ctx, func(iCtx context.Context) error { - resp, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{ - ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics}, - }) + resp, err := c.msc.Export(iCtx, pbRequest) if resp != nil && resp.PartialSuccess != nil { msg := resp.PartialSuccess.GetErrorMessage() n := resp.PartialSuccess.GetRejectedDataPoints() diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go index b756f9774..b48280d34 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go @@ -252,4 +252,20 @@ func TestConfig(t *testing.T) { got := coll.Headers() assert.Contains(t, got[key][0], customerUserAgent) }) + + t.Run("WithMaxRequestSize", func(t *testing.T) { + exp, coll := factoryFunc( + nil, + WithMaxRequestSize(1), + WithRetry(RetryConfig{Enabled: false}), + ) + t.Cleanup(coll.Shutdown) + + ctx := context.Background() //nolint:usetesting // required to avoid getting a canceled context at cleanup. + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + + err := exp.Export(ctx, &metricdata.ResourceMetrics{}) + assert.ErrorContains(t, err, "request message too large") + assert.Empty(t, coll.Collect().Dump(), "oversized request must fail before sending") + }) } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go index c831bb60b..47eb85e9f 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/config.go @@ -231,6 +231,16 @@ func WithTimeout(duration time.Duration) Option { return wrappedOption{oconf.WithTimeout(duration)} } +// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export +// request, before compression, that the exporter will send. +// +// If size is less than or equal to zero, no request-size limit is applied. +// Disabling the limit is not recommended because it can lead to excessive +// resource consumption or abuse. +func WithMaxRequestSize(size int) Option { + return wrappedOption{oconf.WithMaxRequestSize(size)} +} + // WithRetry sets the retry policy for transient retryable errors that are // returned by the target endpoint. // diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf/options.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf/options.go index 758d1ea32..0ca7074d4 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf/options.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf/options.go @@ -35,6 +35,9 @@ const ( // DefaultMetricsPath is a default URL path for endpoint that // receives metrics. DefaultMetricsPath string = "/v1/metrics" + // DefaultMaxRequestSize is the default maximum size of a serialized export + // request, before compression. + DefaultMaxRequestSize int = 32 * 1024 * 1024 // DefaultBackoff is a default base backoff time used in the // exponential backoff strategy. DefaultBackoff time.Duration = 300 * time.Millisecond @@ -49,13 +52,14 @@ type ( HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) SignalConfig struct { - Endpoint string - Insecure bool - TLSCfg *tls.Config - Headers map[string]string - Compression Compression - Timeout time.Duration - URLPath string + Endpoint string + Insecure bool + TLSCfg *tls.Config + Headers map[string]string + Compression Compression + MaxRequestSize int + Timeout time.Duration + URLPath string TemporalitySelector metric.TemporalitySelector AggregationSelector metric.AggregationSelector @@ -87,10 +91,11 @@ type ( func NewHTTPConfig(opts ...HTTPOption) Config { cfg := Config{ Metrics: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), - URLPath: DefaultMetricsPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), + URLPath: DefaultMetricsPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, TemporalitySelector: metric.DefaultTemporalitySelector, AggregationSelector: metric.DefaultAggregationSelector, @@ -123,10 +128,11 @@ func cleanPath(urlPath string, defaultPath string) string { func NewGRPCConfig(opts ...GRPCOption) Config { cfg := Config{ Metrics: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), - URLPath: DefaultMetricsPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), + URLPath: DefaultMetricsPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, TemporalitySelector: metric.DefaultTemporalitySelector, AggregationSelector: metric.DefaultAggregationSelector, @@ -354,6 +360,13 @@ func WithTimeout(duration time.Duration) GenericOption { }) } +func WithMaxRequestSize(size int) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Metrics.MaxRequestSize = size + return cfg + }) +} + func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption { return newGenericOption(func(cfg Config) Config { cfg.Metrics.TemporalitySelector = selector diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf/options_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf/options_test.go index 330467e23..9e590178f 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf/options_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf/options_test.go @@ -79,9 +79,19 @@ func TestConfigs(t *testing.T) { } assert.Equal(t, NoCompression, c.Metrics.Compression) assert.Equal(t, map[string]string(nil), c.Metrics.Headers) + assert.Equal(t, DefaultMaxRequestSize, c.Metrics.MaxRequestSize) assert.Equal(t, 10*time.Second, c.Metrics.Timeout) }, }, + { + name: "Test With Max Request Size", + opts: []GenericOption{ + WithMaxRequestSize(1), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, 1, c.Metrics.MaxRequestSize) + }, + }, // Endpoint Tests { diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index a3a131da9..b7ca17fe3 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -31,10 +31,11 @@ import ( type client struct { // req is cloned for every upload the client makes. - req *http.Request - compression Compression - requestFunc retry.RequestFunc - httpClient *http.Client + req *http.Request + compression Compression + maxRequestSize int + requestFunc retry.RequestFunc + httpClient *http.Client inst *observ.Instrumentation } @@ -119,11 +120,12 @@ func newClient(cfg oconf.Config) (*client, error) { inst, err := observ.NewInstrumentation(counter.NextExporterID(), cfg.Metrics.Endpoint) return &client{ - compression: Compression(cfg.Metrics.Compression), - req: req, - requestFunc: cfg.RetryConfig.RequestFunc(evaluate), - httpClient: httpClient, - inst: inst, + compression: Compression(cfg.Metrics.Compression), + maxRequestSize: cfg.Metrics.MaxRequestSize, + req: req, + requestFunc: cfg.RetryConfig.RequestFunc(evaluate), + httpClient: httpClient, + inst: inst, }, err } @@ -154,6 +156,9 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou if err != nil { return err } + if maxSize := c.maxRequestSize; maxSize > 0 && len(body) > maxSize { + return fmt.Errorf("request body too large: exceeded %d bytes", maxSize) + } request, err := c.newRequest(ctx, body) if err != nil { return err diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 54413a548..d2a325fe4 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -515,6 +515,30 @@ func TestResponseBodySizeLimit(t *testing.T) { } } +func TestRequestBodySizeLimit(t *testing.T) { + var calls int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + calls++ + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(srv.Close) + + opts := []Option{ + WithEndpoint(srv.Listener.Addr().String()), + WithInsecure(), + WithMaxRequestSize(1), + WithRetry(RetryConfig{Enabled: false}), + } + cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...) + c, err := newClient(cfg) + require.NoError(t, err) + t.Cleanup(func() { _ = c.Shutdown(t.Context()) }) + + err = c.UploadMetrics(t.Context(), &mpb.ResourceMetrics{}) + assert.ErrorContains(t, err, "request body too large") + assert.Equal(t, 0, calls, "oversized request must fail before sending") +} + func TestClientInstrumentation(t *testing.T) { // Enable instrumentation for this test. t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/config.go b/exporters/otlp/otlpmetric/otlpmetrichttp/config.go index 2b144f7eb..c6175507a 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/config.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/config.go @@ -185,6 +185,16 @@ func WithTimeout(duration time.Duration) Option { return wrappedOption{oconf.WithTimeout(duration)} } +// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export +// request, before compression, that the exporter will send. +// +// If size is less than or equal to zero, no request-size limit is applied. +// Disabling the limit is not recommended because it can lead to excessive +// resource consumption or abuse. +func WithMaxRequestSize(size int) Option { + return wrappedOption{oconf.WithMaxRequestSize(size)} +} + // WithRetry sets the retry policy for transient retryable errors that are // returned by the target endpoint. // diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options.go index ed66bb068..323c709f2 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options.go @@ -35,6 +35,9 @@ const ( // DefaultMetricsPath is a default URL path for endpoint that // receives metrics. DefaultMetricsPath string = "/v1/metrics" + // DefaultMaxRequestSize is the default maximum size of a serialized export + // request, before compression. + DefaultMaxRequestSize int = 32 * 1024 * 1024 // DefaultBackoff is a default base backoff time used in the // exponential backoff strategy. DefaultBackoff time.Duration = 300 * time.Millisecond @@ -49,13 +52,14 @@ type ( HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) SignalConfig struct { - Endpoint string - Insecure bool - TLSCfg *tls.Config - Headers map[string]string - Compression Compression - Timeout time.Duration - URLPath string + Endpoint string + Insecure bool + TLSCfg *tls.Config + Headers map[string]string + Compression Compression + MaxRequestSize int + Timeout time.Duration + URLPath string TemporalitySelector metric.TemporalitySelector AggregationSelector metric.AggregationSelector @@ -87,10 +91,11 @@ type ( func NewHTTPConfig(opts ...HTTPOption) Config { cfg := Config{ Metrics: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), - URLPath: DefaultMetricsPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), + URLPath: DefaultMetricsPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, TemporalitySelector: metric.DefaultTemporalitySelector, AggregationSelector: metric.DefaultAggregationSelector, @@ -123,10 +128,11 @@ func cleanPath(urlPath string, defaultPath string) string { func NewGRPCConfig(opts ...GRPCOption) Config { cfg := Config{ Metrics: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), - URLPath: DefaultMetricsPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), + URLPath: DefaultMetricsPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, TemporalitySelector: metric.DefaultTemporalitySelector, AggregationSelector: metric.DefaultAggregationSelector, @@ -354,6 +360,13 @@ func WithTimeout(duration time.Duration) GenericOption { }) } +func WithMaxRequestSize(size int) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Metrics.MaxRequestSize = size + return cfg + }) +} + func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption { return newGenericOption(func(cfg Config) Config { cfg.Metrics.TemporalitySelector = selector diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options_test.go index 8b2448570..6849e07cb 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options_test.go @@ -79,9 +79,19 @@ func TestConfigs(t *testing.T) { } assert.Equal(t, NoCompression, c.Metrics.Compression) assert.Equal(t, map[string]string(nil), c.Metrics.Headers) + assert.Equal(t, DefaultMaxRequestSize, c.Metrics.MaxRequestSize) assert.Equal(t, 10*time.Second, c.Metrics.Timeout) }, }, + { + name: "Test With Max Request Size", + opts: []GenericOption{ + WithMaxRequestSize(1), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, 1, c.Metrics.MaxRequestSize) + }, + }, // Endpoint Tests { diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client.go b/exporters/otlp/otlptrace/otlptracegrpc/client.go index 258d0ca6a..e66af1e16 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client.go @@ -6,6 +6,7 @@ package otlptracegrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlptra import ( "context" "errors" + "fmt" "sync" "time" @@ -16,6 +17,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal" @@ -26,11 +28,12 @@ import ( ) type client struct { - endpoint string - dialOpts []grpc.DialOption - metadata metadata.MD - exportTimeout time.Duration - requestFunc retry.RequestFunc + endpoint string + dialOpts []grpc.DialOption + metadata metadata.MD + exportTimeout time.Duration + maxRequestSize int + requestFunc retry.RequestFunc // stopCtx is used as a parent context for all exports. Therefore, when it // is canceled with the stopFunc all exports are canceled. @@ -65,14 +68,15 @@ func newClient(opts ...Option) *client { ctx, cancel := context.WithCancel(context.Background()) //nolint:gosec // cancel called in client shutdown. c := &client{ - endpoint: cfg.Traces.Endpoint, - exportTimeout: cfg.Traces.Timeout, - requestFunc: cfg.RetryConfig.RequestFunc(retryable), - dialOpts: cfg.DialOptions, - stopCtx: ctx, - stopFunc: cancel, - conn: cfg.GRPCConn, - instID: counter.NextExporterID(), + endpoint: cfg.Traces.Endpoint, + exportTimeout: cfg.Traces.Timeout, + maxRequestSize: cfg.Traces.MaxRequestSize, + requestFunc: cfg.RetryConfig.RequestFunc(retryable), + dialOpts: cfg.DialOptions, + stopCtx: ctx, + stopFunc: cancel, + conn: cfg.GRPCConn, + instID: counter.NextExporterID(), } if len(cfg.Traces.Headers) > 0 { @@ -205,16 +209,22 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc ctx, cancel := c.exportContext(ctx) defer cancel() - var code codes.Code + pbRequest := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: protoSpans, + } + + code := codes.Unknown if c.inst != nil { op := c.inst.ExportSpans(ctx, len(protoSpans)) defer func() { op.End(uploadErr, code) }() } + if maxSize := c.maxRequestSize; maxSize > 0 && proto.Size(pbRequest) > maxSize { + return fmt.Errorf("request message too large: exceeded %d bytes", maxSize) + } + return c.requestFunc(ctx, func(iCtx context.Context) error { - resp, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{ - ResourceSpans: protoSpans, - }) + resp, err := c.tsc.Export(iCtx, pbRequest) if resp != nil && resp.PartialSuccess != nil { msg := resp.PartialSuccess.GetErrorMessage() n := resp.PartialSuccess.GetRejectedSpans() diff --git a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go index fc552aa95..0c5d05bfc 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/client_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/client_test.go @@ -277,6 +277,29 @@ func TestExportSpansTimeoutHonored(t *testing.T) { require.True(t, strings.HasPrefix(err.Error(), "traces export: "), "%+v", err) } +func TestExportSpansRequestSizeLimit(t *testing.T) { + mc := runMockCollector(t) + t.Cleanup(func() { require.NoError(t, mc.stop()) }) + + ctx := t.Context() + exp := newGRPCExporter( + t, + ctx, + mc.endpoint, + otlptracegrpc.WithMaxRequestSize(1), + otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}), + ) + t.Cleanup(func() { + ctx, cancel := contextWithTimeout(context.WithoutCancel(t.Context()), t, 10*time.Second) + defer cancel() + require.NoError(t, exp.Shutdown(ctx)) + }) + + err := exp.ExportSpans(ctx, roSpans) + assert.ErrorContains(t, err, "request message too large") + assert.Empty(t, mc.getResourceSpans(), "oversized request must fail before sending") +} + func TestNewWithMultipleAttributeTypes(t *testing.T) { mc := runMockCollector(t) diff --git a/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig/options.go b/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig/options.go index 4f47117a5..bddc9e870 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig/options.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig/options.go @@ -31,6 +31,9 @@ const ( // DefaultTracesPath is a default URL path for endpoint that // receives spans. DefaultTracesPath string = "/v1/traces" + // DefaultMaxRequestSize is the default maximum size of a serialized export + // request, before compression. + DefaultMaxRequestSize int = 32 * 1024 * 1024 // DefaultTimeout is a default max waiting time for the backend to process // each span batch. DefaultTimeout time.Duration = 10 * time.Second @@ -42,13 +45,14 @@ type ( HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) SignalConfig struct { - Endpoint string - Insecure bool - TLSCfg *tls.Config - Headers map[string]string - Compression Compression - Timeout time.Duration - URLPath string + Endpoint string + Insecure bool + TLSCfg *tls.Config + Headers map[string]string + Compression Compression + MaxRequestSize int + Timeout time.Duration + URLPath string // gRPC configurations GRPCCredentials credentials.TransportCredentials @@ -77,10 +81,11 @@ type ( func NewHTTPConfig(opts ...HTTPOption) Config { cfg := Config{ Traces: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), - URLPath: DefaultTracesPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), + URLPath: DefaultTracesPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, }, RetryConfig: retry.DefaultConfig, } @@ -111,10 +116,11 @@ func NewGRPCConfig(opts ...GRPCOption) Config { userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version() cfg := Config{ Traces: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), - URLPath: DefaultTracesPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), + URLPath: DefaultTracesPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, }, RetryConfig: retry.DefaultConfig, DialOptions: []grpc.DialOption{grpc.WithUserAgent(userAgent)}, @@ -345,6 +351,13 @@ func WithTimeout(duration time.Duration) GenericOption { }) } +func WithMaxRequestSize(size int) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Traces.MaxRequestSize = size + return cfg + }) +} + func WithProxy(pf HTTPTransportProxyFunc) GenericOption { return newGenericOption(func(cfg Config) Config { cfg.Traces.Proxy = pf diff --git a/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig/options_test.go b/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig/options_test.go index 4ad02534b..c9ddee11a 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig/options_test.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig/options_test.go @@ -77,9 +77,19 @@ func TestConfigs(t *testing.T) { } assert.Equal(t, NoCompression, c.Traces.Compression) assert.Equal(t, map[string]string(nil), c.Traces.Headers) + assert.Equal(t, DefaultMaxRequestSize, c.Traces.MaxRequestSize) assert.Equal(t, 10*time.Second, c.Traces.Timeout) }, }, + { + name: "Test With Max Request Size", + opts: []GenericOption{ + WithMaxRequestSize(1), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, 1, c.Traces.MaxRequestSize) + }, + }, // Endpoint Tests { diff --git a/exporters/otlp/otlptrace/otlptracegrpc/options.go b/exporters/otlp/otlptrace/otlptracegrpc/options.go index 2da229870..b32006052 100644 --- a/exporters/otlp/otlptrace/otlptracegrpc/options.go +++ b/exporters/otlp/otlptrace/otlptracegrpc/options.go @@ -192,6 +192,16 @@ func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } +// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export +// request, before compression, that the exporter will send. +// +// If size is less than or equal to zero, no request-size limit is applied. +// Disabling the limit is not recommended because it can lead to excessive +// resource consumption or abuse. +func WithMaxRequestSize(size int) Option { + return wrappedOption{otlpconfig.WithMaxRequestSize(size)} +} + // WithRetry sets the retry policy for transient retryable errors that may be // returned by the target endpoint when exporting a batch of spans. // diff --git a/exporters/otlp/otlptrace/otlptracehttp/client.go b/exporters/otlp/otlptrace/otlptracehttp/client.go index f366f0685..ced494577 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client.go @@ -168,6 +168,10 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc ctx, cancel := d.contextWithStop(ctx) defer cancel() + if maxSize := d.cfg.MaxRequestSize; maxSize > 0 && len(rawRequest) > maxSize { + return fmt.Errorf("request body too large: exceeded %d bytes", maxSize) + } + request, err := d.newRequest(rawRequest) if err != nil { return err diff --git a/exporters/otlp/otlptrace/otlptracehttp/client_test.go b/exporters/otlp/otlptrace/otlptracehttp/client_test.go index 212c0e73c..707ee7849 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client_test.go @@ -652,6 +652,29 @@ func TestResponseBodySizeLimit(t *testing.T) { } } +func TestRequestBodySizeLimit(t *testing.T) { + var calls int + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + calls++ + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(srv.Close) + + client := otlptracehttp.NewClient( + otlptracehttp.WithEndpointURL(srv.URL), + otlptracehttp.WithInsecure(), + otlptracehttp.WithMaxRequestSize(1), + otlptracehttp.WithRetry(otlptracehttp.RetryConfig{Enabled: false}), + ) + exporter, err := otlptrace.New(t.Context(), client) + require.NoError(t, err) + t.Cleanup(func() { _ = exporter.Shutdown(t.Context()) }) + + err = exporter.ExportSpans(t.Context(), otlptracetest.SingleReadOnlySpan()) + assert.ErrorContains(t, err, "request body too large") + assert.Equal(t, 0, calls, "oversized request must fail before sending") +} + func TestGetBodyCalledOnRedirect(t *testing.T) { // Test that req.GetBody is set correctly, allowing the HTTP transport // to re-send the body on 307 redirects. diff --git a/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options.go b/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options.go index e415feea6..9b1c3e8c5 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options.go +++ b/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options.go @@ -31,6 +31,9 @@ const ( // DefaultTracesPath is a default URL path for endpoint that // receives spans. DefaultTracesPath string = "/v1/traces" + // DefaultMaxRequestSize is the default maximum size of a serialized export + // request, before compression. + DefaultMaxRequestSize int = 32 * 1024 * 1024 // DefaultTimeout is a default max waiting time for the backend to process // each span batch. DefaultTimeout time.Duration = 10 * time.Second @@ -42,13 +45,14 @@ type ( HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) SignalConfig struct { - Endpoint string - Insecure bool - TLSCfg *tls.Config - Headers map[string]string - Compression Compression - Timeout time.Duration - URLPath string + Endpoint string + Insecure bool + TLSCfg *tls.Config + Headers map[string]string + Compression Compression + MaxRequestSize int + Timeout time.Duration + URLPath string // gRPC configurations GRPCCredentials credentials.TransportCredentials @@ -77,10 +81,11 @@ type ( func NewHTTPConfig(opts ...HTTPOption) Config { cfg := Config{ Traces: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), - URLPath: DefaultTracesPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), + URLPath: DefaultTracesPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, }, RetryConfig: retry.DefaultConfig, } @@ -111,10 +116,11 @@ func NewGRPCConfig(opts ...GRPCOption) Config { userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version() cfg := Config{ Traces: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), - URLPath: DefaultTracesPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), + URLPath: DefaultTracesPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, }, RetryConfig: retry.DefaultConfig, DialOptions: []grpc.DialOption{grpc.WithUserAgent(userAgent)}, @@ -345,6 +351,13 @@ func WithTimeout(duration time.Duration) GenericOption { }) } +func WithMaxRequestSize(size int) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Traces.MaxRequestSize = size + return cfg + }) +} + func WithProxy(pf HTTPTransportProxyFunc) GenericOption { return newGenericOption(func(cfg Config) Config { cfg.Traces.Proxy = pf diff --git a/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options_test.go b/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options_test.go index f16ca699d..7f9036fd2 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options_test.go @@ -77,9 +77,19 @@ func TestConfigs(t *testing.T) { } assert.Equal(t, NoCompression, c.Traces.Compression) assert.Equal(t, map[string]string(nil), c.Traces.Headers) + assert.Equal(t, DefaultMaxRequestSize, c.Traces.MaxRequestSize) assert.Equal(t, 10*time.Second, c.Traces.Timeout) }, }, + { + name: "Test With Max Request Size", + opts: []GenericOption{ + WithMaxRequestSize(1), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, 1, c.Traces.MaxRequestSize) + }, + }, // Endpoint Tests { diff --git a/exporters/otlp/otlptrace/otlptracehttp/options.go b/exporters/otlp/otlptrace/otlptracehttp/options.go index cfe21dbfb..291b5f9b6 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/options.go +++ b/exporters/otlp/otlptrace/otlptracehttp/options.go @@ -138,6 +138,16 @@ func WithTimeout(duration time.Duration) Option { return wrappedOption{otlpconfig.WithTimeout(duration)} } +// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export +// request, before compression, that the exporter will send. +// +// If size is less than or equal to zero, no request-size limit is applied. +// Disabling the limit is not recommended because it can lead to excessive +// resource consumption or abuse. +func WithMaxRequestSize(size int) Option { + return wrappedOption{otlpconfig.WithMaxRequestSize(size)} +} + // WithRetry configures the retry policy for transient errors that may occurs // when exporting traces. An exponential back-off algorithm is used to ensure // endpoints are not overwhelmed with retries. If unset, the default retry diff --git a/internal/shared/otlp/otlpmetric/oconf/options.go.tmpl b/internal/shared/otlp/otlpmetric/oconf/options.go.tmpl index 3d0373110..629ec4a14 100644 --- a/internal/shared/otlp/otlpmetric/oconf/options.go.tmpl +++ b/internal/shared/otlp/otlpmetric/oconf/options.go.tmpl @@ -35,6 +35,9 @@ const ( // DefaultMetricsPath is a default URL path for endpoint that // receives metrics. DefaultMetricsPath string = "/v1/metrics" + // DefaultMaxRequestSize is the default maximum size of a serialized export + // request, before compression. + DefaultMaxRequestSize int = 32 * 1024 * 1024 // DefaultBackoff is a default base backoff time used in the // exponential backoff strategy. DefaultBackoff time.Duration = 300 * time.Millisecond @@ -49,13 +52,14 @@ type ( HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) SignalConfig struct { - Endpoint string - Insecure bool - TLSCfg *tls.Config - Headers map[string]string - Compression Compression - Timeout time.Duration - URLPath string + Endpoint string + Insecure bool + TLSCfg *tls.Config + Headers map[string]string + Compression Compression + MaxRequestSize int + Timeout time.Duration + URLPath string TemporalitySelector metric.TemporalitySelector AggregationSelector metric.AggregationSelector @@ -87,10 +91,11 @@ type ( func NewHTTPConfig(opts ...HTTPOption) Config { cfg := Config{ Metrics: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), - URLPath: DefaultMetricsPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), + URLPath: DefaultMetricsPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, TemporalitySelector: metric.DefaultTemporalitySelector, AggregationSelector: metric.DefaultAggregationSelector, @@ -123,10 +128,11 @@ func cleanPath(urlPath string, defaultPath string) string { func NewGRPCConfig(opts ...GRPCOption) Config { cfg := Config{ Metrics: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), - URLPath: DefaultMetricsPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), + URLPath: DefaultMetricsPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, TemporalitySelector: metric.DefaultTemporalitySelector, AggregationSelector: metric.DefaultAggregationSelector, @@ -354,6 +360,13 @@ func WithTimeout(duration time.Duration) GenericOption { }) } +func WithMaxRequestSize(size int) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Metrics.MaxRequestSize = size + return cfg + }) +} + func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption { return newGenericOption(func(cfg Config) Config { cfg.Metrics.TemporalitySelector = selector diff --git a/internal/shared/otlp/otlpmetric/oconf/options_test.go.tmpl b/internal/shared/otlp/otlpmetric/oconf/options_test.go.tmpl index ba4a30912..353cf72fb 100644 --- a/internal/shared/otlp/otlpmetric/oconf/options_test.go.tmpl +++ b/internal/shared/otlp/otlpmetric/oconf/options_test.go.tmpl @@ -79,9 +79,19 @@ func TestConfigs(t *testing.T) { } assert.Equal(t, NoCompression, c.Metrics.Compression) assert.Equal(t, map[string]string(nil), c.Metrics.Headers) + assert.Equal(t, DefaultMaxRequestSize, c.Metrics.MaxRequestSize) assert.Equal(t, 10*time.Second, c.Metrics.Timeout) }, }, + { + name: "Test With Max Request Size", + opts: []GenericOption{ + WithMaxRequestSize(1), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, 1, c.Metrics.MaxRequestSize) + }, + }, // Endpoint Tests { diff --git a/internal/shared/otlp/otlptrace/otlpconfig/options.go.tmpl b/internal/shared/otlp/otlptrace/otlpconfig/options.go.tmpl index 6128b2c04..1b187ad1c 100644 --- a/internal/shared/otlp/otlptrace/otlpconfig/options.go.tmpl +++ b/internal/shared/otlp/otlptrace/otlpconfig/options.go.tmpl @@ -31,6 +31,9 @@ const ( // DefaultTracesPath is a default URL path for endpoint that // receives spans. DefaultTracesPath string = "/v1/traces" + // DefaultMaxRequestSize is the default maximum size of a serialized export + // request, before compression. + DefaultMaxRequestSize int = 32 * 1024 * 1024 // DefaultTimeout is a default max waiting time for the backend to process // each span batch. DefaultTimeout time.Duration = 10 * time.Second @@ -42,13 +45,14 @@ type ( HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) SignalConfig struct { - Endpoint string - Insecure bool - TLSCfg *tls.Config - Headers map[string]string - Compression Compression - Timeout time.Duration - URLPath string + Endpoint string + Insecure bool + TLSCfg *tls.Config + Headers map[string]string + Compression Compression + MaxRequestSize int + Timeout time.Duration + URLPath string // gRPC configurations GRPCCredentials credentials.TransportCredentials @@ -77,10 +81,11 @@ type ( func NewHTTPConfig(opts ...HTTPOption) Config { cfg := Config{ Traces: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), - URLPath: DefaultTracesPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort), + URLPath: DefaultTracesPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, }, RetryConfig: retry.DefaultConfig, } @@ -111,10 +116,11 @@ func NewGRPCConfig(opts ...GRPCOption) Config { userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version() cfg := Config{ Traces: SignalConfig{ - Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), - URLPath: DefaultTracesPath, - Compression: NoCompression, - Timeout: DefaultTimeout, + Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort), + URLPath: DefaultTracesPath, + Compression: NoCompression, + MaxRequestSize: DefaultMaxRequestSize, + Timeout: DefaultTimeout, }, RetryConfig: retry.DefaultConfig, DialOptions: []grpc.DialOption{grpc.WithUserAgent(userAgent)}, @@ -345,6 +351,13 @@ func WithTimeout(duration time.Duration) GenericOption { }) } +func WithMaxRequestSize(size int) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Traces.MaxRequestSize = size + return cfg + }) +} + func WithProxy(pf HTTPTransportProxyFunc) GenericOption { return newGenericOption(func(cfg Config) Config { cfg.Traces.Proxy = pf diff --git a/internal/shared/otlp/otlptrace/otlpconfig/options_test.go.tmpl b/internal/shared/otlp/otlptrace/otlpconfig/options_test.go.tmpl index c71b0e740..cf262b49d 100644 --- a/internal/shared/otlp/otlptrace/otlpconfig/options_test.go.tmpl +++ b/internal/shared/otlp/otlptrace/otlpconfig/options_test.go.tmpl @@ -77,9 +77,19 @@ func TestConfigs(t *testing.T) { } assert.Equal(t, NoCompression, c.Traces.Compression) assert.Equal(t, map[string]string(nil), c.Traces.Headers) + assert.Equal(t, DefaultMaxRequestSize, c.Traces.MaxRequestSize) assert.Equal(t, 10*time.Second, c.Traces.Timeout) }, }, + { + name: "Test With Max Request Size", + opts: []GenericOption{ + WithMaxRequestSize(1), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Equal(t, 1, c.Traces.MaxRequestSize) + }, + }, // Endpoint Tests {