You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
Add MaxRequestSize option to OTLP exporters (#8157)
Per https://github.com/open-telemetry/opentelemetry-proto/pull/782 - Introduce WithMaxRequestSize option for OTLP exporters to set a limit on the size of serialized export requests. - Implement logic in the HTTP and gRPC clients to check the request size against the configured maximum before compression and sending. - The default configuration is that the maximum request size is 32 MiB.
This commit is contained in:
@@ -24,6 +24,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Support `SLICE` attributes in `go.opentelemetry.io/otel/exporters/zipkin`. (#8216)
|
||||
- Apply `AttributeValueLengthLimit` to `attribute.SLICE` type attribute values in `go.opentelemetry.io/otel/sdk/trace`, recursively truncating contained string values. (#8217)
|
||||
- Add `Error` field on `Record` type in `go.opentelemetry.io/otel/log/logtest`. (#8148)
|
||||
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#8157)
|
||||
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#8157)
|
||||
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#8157)
|
||||
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8157)
|
||||
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#8157)
|
||||
- Add `WithMaxRequestSize` option in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#8157)
|
||||
- Add experimental support for splitting metric data across multiple batches in `go.opentelemetry.io/otel/sdk/metric`.
|
||||
Set `OTEL_GO_X_METRIC_EXPORT_BATCH_SIZE=<max_size>` to enable for all periodic readers.
|
||||
See `go.opentelemetry.io/otel/sdk/metric/internal/x` for feature documentation. (#8071)
|
||||
@@ -50,6 +56,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
|
||||
### Fixed
|
||||
|
||||
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`.
|
||||
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
|
||||
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`.
|
||||
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
|
||||
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`.
|
||||
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
|
||||
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`.
|
||||
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
|
||||
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`.
|
||||
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
|
||||
- Limit OTLP request size to 32 MiB by default in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`.
|
||||
The limit applies before compression, oversized requests are treated as non-retryable errors, and the limit can be configured with the new `WithMaxRequestSize` option. (#8157)
|
||||
- Fix gzipped request body replay on redirect in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8135)
|
||||
- Fix gzipped request body replay on redirect in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#8152)
|
||||
- `go.opentelemetry.io/otel/exporters/prometheus` now uses `Value.String` formatting for label values following the [OpenTelemetry AnyValue representation for non-OTLP protocols](https://opentelemetry.io/docs/specs/otel/common/#anyvalue). (#8170)
|
||||
|
||||
@@ -6,6 +6,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@@ -20,6 +21,7 @@ import (
|
||||
"google.golang.org/grpc/encoding/gzip"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
|
||||
@@ -28,9 +30,10 @@ import (
|
||||
|
||||
// The methods of this type are not expected to be called concurrently.
|
||||
type client struct {
|
||||
metadata metadata.MD
|
||||
exportTimeout time.Duration
|
||||
requestFunc retry.RequestFunc
|
||||
metadata metadata.MD
|
||||
exportTimeout time.Duration
|
||||
maxRequestSize int
|
||||
requestFunc retry.RequestFunc
|
||||
|
||||
// ourConn keeps track of where conn was created: true if created here in
|
||||
// NewClient, or false if passed with an option. This is important on
|
||||
@@ -49,9 +52,10 @@ var newGRPCClientFn = grpc.NewClient
|
||||
// newClient creates a new gRPC log client.
|
||||
func newClient(cfg config) (*client, error) {
|
||||
c := &client{
|
||||
exportTimeout: cfg.timeout.Value,
|
||||
requestFunc: cfg.retryCfg.Value.RequestFunc(retryable),
|
||||
conn: cfg.gRPCConn.Value,
|
||||
exportTimeout: cfg.timeout.Value,
|
||||
maxRequestSize: cfg.maxRequestSize.Value,
|
||||
requestFunc: cfg.retryCfg.Value.RequestFunc(retryable),
|
||||
conn: cfg.gRPCConn.Value,
|
||||
}
|
||||
|
||||
if len(cfg.headers.Value) > 0 {
|
||||
@@ -146,6 +150,7 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
|
||||
ctx, cancel := c.exportContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
pbRequest := &collogpb.ExportLogsServiceRequest{ResourceLogs: rl}
|
||||
count := int64(len(rl))
|
||||
if c.instrumentation != nil {
|
||||
eo := c.instrumentation.ExportLogs(ctx, count)
|
||||
@@ -154,10 +159,12 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
|
||||
}()
|
||||
}
|
||||
|
||||
if maxSize := c.maxRequestSize; maxSize > 0 && proto.Size(pbRequest) > maxSize {
|
||||
return fmt.Errorf("request message too large: exceeded %d bytes", maxSize)
|
||||
}
|
||||
|
||||
return errors.Join(uploadErr, c.requestFunc(ctx, func(ctx context.Context) error {
|
||||
resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{
|
||||
ResourceLogs: rl,
|
||||
})
|
||||
resp, err := c.lsc.Export(ctx, pbRequest)
|
||||
if resp != nil && resp.PartialSuccess != nil {
|
||||
msg := resp.PartialSuccess.GetErrorMessage()
|
||||
n := resp.PartialSuccess.GetRejectedLogRecords()
|
||||
|
||||
@@ -149,6 +149,16 @@ var (
|
||||
}}
|
||||
)
|
||||
|
||||
type logsServiceClientFunc func(context.Context, *collogpb.ExportLogsServiceRequest, ...grpc.CallOption) (*collogpb.ExportLogsServiceResponse, error)
|
||||
|
||||
func (f logsServiceClientFunc) Export(
|
||||
ctx context.Context,
|
||||
req *collogpb.ExportLogsServiceRequest,
|
||||
opts ...grpc.CallOption,
|
||||
) (*collogpb.ExportLogsServiceResponse, error) {
|
||||
return f(ctx, req, opts...)
|
||||
}
|
||||
|
||||
func TestThrottleDelay(t *testing.T) {
|
||||
c := codes.ResourceExhausted
|
||||
testcases := []struct {
|
||||
@@ -267,6 +277,26 @@ func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
|
||||
assert.Equal(t, delay, d)
|
||||
}
|
||||
|
||||
func TestUploadLogsRequestSizeLimit(t *testing.T) {
|
||||
var calls int
|
||||
c := &client{
|
||||
maxRequestSize: 1,
|
||||
requestFunc: func(ctx context.Context, fn func(context.Context) error) error {
|
||||
return fn(ctx)
|
||||
},
|
||||
lsc: logsServiceClientFunc(
|
||||
func(context.Context, *collogpb.ExportLogsServiceRequest, ...grpc.CallOption) (*collogpb.ExportLogsServiceResponse, error) {
|
||||
calls++
|
||||
return &collogpb.ExportLogsServiceResponse{}, nil
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
err := c.UploadLogs(t.Context(), []*lpb.ResourceLogs{{}})
|
||||
assert.ErrorContains(t, err, "request message too large")
|
||||
assert.Equal(t, 0, calls, "oversized request must fail before sending")
|
||||
}
|
||||
|
||||
func TestNewClient(t *testing.T) {
|
||||
newGRPCClientFnSwap := newGRPCClientFn
|
||||
t.Cleanup(func() {
|
||||
|
||||
@@ -25,9 +25,10 @@ import (
|
||||
|
||||
// Default values.
|
||||
var (
|
||||
defaultEndpoint = "localhost:4317"
|
||||
defaultTimeout = 10 * time.Second
|
||||
defaultRetryCfg = retry.DefaultConfig
|
||||
defaultEndpoint = "localhost:4317"
|
||||
defaultTimeout = 10 * time.Second
|
||||
defaultMaxRequestSize = 32 * 1024 * 1024
|
||||
defaultRetryCfg = retry.DefaultConfig
|
||||
)
|
||||
|
||||
// Environment variable keys.
|
||||
@@ -85,13 +86,14 @@ type Option interface {
|
||||
}
|
||||
|
||||
type config struct {
|
||||
endpoint setting[string]
|
||||
insecure setting[bool]
|
||||
tlsCfg setting[*tls.Config]
|
||||
headers setting[map[string]string]
|
||||
compression setting[Compression]
|
||||
timeout setting[time.Duration]
|
||||
retryCfg setting[retry.Config]
|
||||
endpoint setting[string]
|
||||
insecure setting[bool]
|
||||
tlsCfg setting[*tls.Config]
|
||||
headers setting[map[string]string]
|
||||
compression setting[Compression]
|
||||
maxRequestSize setting[int]
|
||||
timeout setting[time.Duration]
|
||||
retryCfg setting[retry.Config]
|
||||
|
||||
// gRPC configurations
|
||||
gRPCCredentials setting[credentials.TransportCredentials]
|
||||
@@ -129,6 +131,9 @@ func newConfig(options []Option) config {
|
||||
getEnv[time.Duration](envTimeout, convDuration),
|
||||
fallback[time.Duration](defaultTimeout),
|
||||
)
|
||||
c.maxRequestSize = c.maxRequestSize.Resolve(
|
||||
fallback[int](defaultMaxRequestSize),
|
||||
)
|
||||
c.retryCfg = c.retryCfg.Resolve(
|
||||
fallback[retry.Config](defaultRetryCfg),
|
||||
)
|
||||
@@ -353,6 +358,19 @@ func WithTimeout(duration time.Duration) Option {
|
||||
})
|
||||
}
|
||||
|
||||
// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export
|
||||
// request, before compression, that the exporter will send.
|
||||
//
|
||||
// If size is less than or equal to zero, no request-size limit is applied.
|
||||
// Disabling the limit is not recommended because it can lead to excessive
|
||||
// resource consumption or abuse.
|
||||
func WithMaxRequestSize(size int) Option {
|
||||
return fnOpt(func(c config) config {
|
||||
c.maxRequestSize = newSetting(size)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithRetry sets the retry policy for transient retryable errors that are
|
||||
// returned by the target endpoint.
|
||||
//
|
||||
|
||||
@@ -112,6 +112,7 @@ func TestNewConfig(t *testing.T) {
|
||||
WithServiceConfig("{}"),
|
||||
WithDialOption(dialOptions...),
|
||||
WithGRPCConn(&grpc.ClientConn{}),
|
||||
WithMaxRequestSize(1),
|
||||
WithTimeout(2 * time.Second),
|
||||
WithRetry(RetryConfig(rc)),
|
||||
},
|
||||
@@ -120,6 +121,7 @@ func TestNewConfig(t *testing.T) {
|
||||
insecure: newSetting(true),
|
||||
headers: newSetting(headers),
|
||||
compression: newSetting(GzipCompression),
|
||||
maxRequestSize: newSetting(1),
|
||||
timeout: newSetting(2 * time.Second),
|
||||
retryCfg: newSetting(rc),
|
||||
gRPCCredentials: newSetting(credentials.NewTLS(tlsCfg)),
|
||||
@@ -494,6 +496,9 @@ func TestNewConfig(t *testing.T) {
|
||||
return func() { otel.SetErrorHandler(orig) }
|
||||
}(otel.GetErrorHandler()))
|
||||
c := newConfig(tc.options)
|
||||
if !tc.want.maxRequestSize.Set {
|
||||
tc.want.maxRequestSize = newSetting(defaultMaxRequestSize)
|
||||
}
|
||||
|
||||
// Do not compare pointer values.
|
||||
assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg)
|
||||
|
||||
@@ -111,10 +111,11 @@ func newHTTPClient(ctx context.Context, cfg config) (*client, error) {
|
||||
req.Header.Set("Content-Type", "application/x-protobuf")
|
||||
|
||||
c := &httpClient{
|
||||
compression: cfg.compression.Value,
|
||||
req: req,
|
||||
requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate),
|
||||
client: hc,
|
||||
compression: cfg.compression.Value,
|
||||
maxRequestSize: cfg.maxRequestSize.Value,
|
||||
req: req,
|
||||
requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate),
|
||||
client: hc,
|
||||
}
|
||||
|
||||
id := nextExporterID()
|
||||
@@ -125,10 +126,11 @@ func newHTTPClient(ctx context.Context, cfg config) (*client, error) {
|
||||
|
||||
type httpClient struct {
|
||||
// req is cloned for every upload the client makes.
|
||||
req *http.Request
|
||||
compression Compression
|
||||
requestFunc retry.RequestFunc
|
||||
client *http.Client
|
||||
req *http.Request
|
||||
compression Compression
|
||||
maxRequestSize int
|
||||
requestFunc retry.RequestFunc
|
||||
client *http.Client
|
||||
|
||||
inst *observ.Instrumentation
|
||||
}
|
||||
@@ -159,10 +161,6 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
request, err := c.newRequest(ctx, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var statusCode int
|
||||
if c.inst != nil {
|
||||
@@ -170,6 +168,14 @@ func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs)
|
||||
defer func() { op.End(uploadErr, statusCode) }()
|
||||
}
|
||||
|
||||
if maxSize := c.maxRequestSize; maxSize > 0 && len(body) > maxSize {
|
||||
return fmt.Errorf("request body too large: exceeded %d bytes", maxSize)
|
||||
}
|
||||
request, err := c.newRequest(ctx, body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return errors.Join(uploadErr, c.requestFunc(ctx, func(iCtx context.Context) error {
|
||||
select {
|
||||
case <-iCtx.Done():
|
||||
|
||||
@@ -1138,6 +1138,29 @@ func TestResponseBodySizeLimit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestBodySizeLimit(t *testing.T) {
|
||||
var calls int
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
calls++
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
opts := []Option{
|
||||
WithEndpoint(srv.Listener.Addr().String()),
|
||||
WithInsecure(),
|
||||
WithMaxRequestSize(1),
|
||||
WithRetry(RetryConfig{Enabled: false}),
|
||||
}
|
||||
cfg := newConfig(opts)
|
||||
c, err := newHTTPClient(t.Context(), cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = c.UploadLogs(t.Context(), []*lpb.ResourceLogs{{}})
|
||||
assert.ErrorContains(t, err, "request body too large")
|
||||
assert.Equal(t, 0, calls, "oversized request must fail before sending")
|
||||
}
|
||||
|
||||
func BenchmarkExporterExportLogs(b *testing.B) {
|
||||
const n = 10
|
||||
|
||||
|
||||
@@ -23,11 +23,12 @@ import (
|
||||
|
||||
// Default values.
|
||||
var (
|
||||
defaultEndpoint = "localhost:4318"
|
||||
defaultPath = "/v1/logs"
|
||||
defaultTimeout = 10 * time.Second
|
||||
defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment
|
||||
defaultRetryCfg = retry.DefaultConfig
|
||||
defaultEndpoint = "localhost:4318"
|
||||
defaultPath = "/v1/logs"
|
||||
defaultTimeout = 10 * time.Second
|
||||
defaultMaxRequestSize = 32 * 1024 * 1024
|
||||
defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment
|
||||
defaultRetryCfg = retry.DefaultConfig
|
||||
)
|
||||
|
||||
// Environment variable keys.
|
||||
@@ -89,16 +90,17 @@ type fnOpt func(config) config
|
||||
func (f fnOpt) applyHTTPOption(c config) config { return f(c) }
|
||||
|
||||
type config struct {
|
||||
endpoint setting[string]
|
||||
path setting[string]
|
||||
insecure setting[bool]
|
||||
tlsCfg setting[*tls.Config]
|
||||
headers setting[map[string]string]
|
||||
compression setting[Compression]
|
||||
timeout setting[time.Duration]
|
||||
proxy setting[HTTPTransportProxyFunc]
|
||||
retryCfg setting[retry.Config]
|
||||
httpClient *http.Client
|
||||
endpoint setting[string]
|
||||
path setting[string]
|
||||
insecure setting[bool]
|
||||
tlsCfg setting[*tls.Config]
|
||||
headers setting[map[string]string]
|
||||
compression setting[Compression]
|
||||
maxRequestSize setting[int]
|
||||
timeout setting[time.Duration]
|
||||
proxy setting[HTTPTransportProxyFunc]
|
||||
retryCfg setting[retry.Config]
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
func newConfig(options []Option) config {
|
||||
@@ -133,6 +135,9 @@ func newConfig(options []Option) config {
|
||||
getenv[time.Duration](envTimeout, convDuration),
|
||||
fallback[time.Duration](defaultTimeout),
|
||||
)
|
||||
c.maxRequestSize = c.maxRequestSize.Resolve(
|
||||
fallback[int](defaultMaxRequestSize),
|
||||
)
|
||||
c.proxy = c.proxy.Resolve(
|
||||
fallback[HTTPTransportProxyFunc](defaultProxy),
|
||||
)
|
||||
@@ -313,6 +318,19 @@ func WithTimeout(duration time.Duration) Option {
|
||||
})
|
||||
}
|
||||
|
||||
// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export
|
||||
// request, before compression, that the exporter will send.
|
||||
//
|
||||
// If size is less than or equal to zero, no request-size limit is applied.
|
||||
// Disabling the limit is not recommended because it can lead to excessive
|
||||
// resource consumption or abuse.
|
||||
func WithMaxRequestSize(size int) Option {
|
||||
return fnOpt(func(c config) config {
|
||||
c.maxRequestSize = newSetting(size)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// RetryConfig defines configuration for retrying the export of log data that
|
||||
// failed.
|
||||
type RetryConfig retry.Config
|
||||
|
||||
@@ -107,19 +107,21 @@ func TestNewConfig(t *testing.T) {
|
||||
WithTLSClientConfig(tlsCfg),
|
||||
WithCompression(GzipCompression),
|
||||
WithHeaders(headers),
|
||||
WithMaxRequestSize(1),
|
||||
WithTimeout(time.Second),
|
||||
WithRetry(RetryConfig(rc)),
|
||||
// Do not test WithProxy. Requires func comparison.
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("test"),
|
||||
path: newSetting("/path"),
|
||||
insecure: newSetting(true),
|
||||
tlsCfg: newSetting(tlsCfg),
|
||||
headers: newSetting(headers),
|
||||
compression: newSetting(GzipCompression),
|
||||
timeout: newSetting(time.Second),
|
||||
retryCfg: newSetting(rc),
|
||||
endpoint: newSetting("test"),
|
||||
path: newSetting("/path"),
|
||||
insecure: newSetting(true),
|
||||
tlsCfg: newSetting(tlsCfg),
|
||||
headers: newSetting(headers),
|
||||
compression: newSetting(GzipCompression),
|
||||
maxRequestSize: newSetting(1),
|
||||
timeout: newSetting(time.Second),
|
||||
retryCfg: newSetting(rc),
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -509,6 +511,9 @@ func TestNewConfig(t *testing.T) {
|
||||
return func() { otel.SetErrorHandler(orig) }
|
||||
}(otel.GetErrorHandler()))
|
||||
c := newConfig(tc.options)
|
||||
if !tc.want.maxRequestSize.Set {
|
||||
tc.want.maxRequestSize = newSetting(defaultMaxRequestSize)
|
||||
}
|
||||
|
||||
// Do not compare pointer values.
|
||||
assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg)
|
||||
|
||||
@@ -6,6 +6,7 @@ package otlpmetricgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpme
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
|
||||
@@ -15,6 +16,7 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc/internal/oconf"
|
||||
@@ -22,9 +24,10 @@ import (
|
||||
)
|
||||
|
||||
type client struct {
|
||||
metadata metadata.MD
|
||||
exportTimeout time.Duration
|
||||
requestFunc retry.RequestFunc
|
||||
metadata metadata.MD
|
||||
exportTimeout time.Duration
|
||||
maxRequestSize int
|
||||
requestFunc retry.RequestFunc
|
||||
|
||||
// ourConn keeps track of where conn was created: true if created here in
|
||||
// NewClient, or false if passed with an option. This is important on
|
||||
@@ -38,9 +41,10 @@ type client struct {
|
||||
// newClient creates a new gRPC metric client.
|
||||
func newClient(_ context.Context, cfg oconf.Config) (*client, error) {
|
||||
c := &client{
|
||||
exportTimeout: cfg.Metrics.Timeout,
|
||||
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
|
||||
conn: cfg.GRPCConn,
|
||||
exportTimeout: cfg.Metrics.Timeout,
|
||||
maxRequestSize: cfg.Metrics.MaxRequestSize,
|
||||
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
|
||||
conn: cfg.GRPCConn,
|
||||
}
|
||||
|
||||
if len(cfg.Metrics.Headers) > 0 {
|
||||
@@ -115,10 +119,15 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
|
||||
ctx, cancel := c.exportContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
pbRequest := &colmetricpb.ExportMetricsServiceRequest{
|
||||
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
|
||||
}
|
||||
if maxSize := c.maxRequestSize; maxSize > 0 && proto.Size(pbRequest) > maxSize {
|
||||
return fmt.Errorf("request message too large: exceeded %d bytes", maxSize)
|
||||
}
|
||||
|
||||
return errors.Join(uploadErr, c.requestFunc(ctx, func(iCtx context.Context) error {
|
||||
resp, err := c.msc.Export(iCtx, &colmetricpb.ExportMetricsServiceRequest{
|
||||
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
|
||||
})
|
||||
resp, err := c.msc.Export(iCtx, pbRequest)
|
||||
if resp != nil && resp.PartialSuccess != nil {
|
||||
msg := resp.PartialSuccess.GetErrorMessage()
|
||||
n := resp.PartialSuccess.GetRejectedDataPoints()
|
||||
|
||||
@@ -252,4 +252,20 @@ func TestConfig(t *testing.T) {
|
||||
got := coll.Headers()
|
||||
assert.Contains(t, got[key][0], customerUserAgent)
|
||||
})
|
||||
|
||||
t.Run("WithMaxRequestSize", func(t *testing.T) {
|
||||
exp, coll := factoryFunc(
|
||||
nil,
|
||||
WithMaxRequestSize(1),
|
||||
WithRetry(RetryConfig{Enabled: false}),
|
||||
)
|
||||
t.Cleanup(coll.Shutdown)
|
||||
|
||||
ctx := context.Background() //nolint:usetesting // required to avoid getting a canceled context at cleanup.
|
||||
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
|
||||
|
||||
err := exp.Export(ctx, &metricdata.ResourceMetrics{})
|
||||
assert.ErrorContains(t, err, "request message too large")
|
||||
assert.Empty(t, coll.Collect().Dump(), "oversized request must fail before sending")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -231,6 +231,16 @@ func WithTimeout(duration time.Duration) Option {
|
||||
return wrappedOption{oconf.WithTimeout(duration)}
|
||||
}
|
||||
|
||||
// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export
|
||||
// request, before compression, that the exporter will send.
|
||||
//
|
||||
// If size is less than or equal to zero, no request-size limit is applied.
|
||||
// Disabling the limit is not recommended because it can lead to excessive
|
||||
// resource consumption or abuse.
|
||||
func WithMaxRequestSize(size int) Option {
|
||||
return wrappedOption{oconf.WithMaxRequestSize(size)}
|
||||
}
|
||||
|
||||
// WithRetry sets the retry policy for transient retryable errors that are
|
||||
// returned by the target endpoint.
|
||||
//
|
||||
|
||||
@@ -35,6 +35,9 @@ const (
|
||||
// DefaultMetricsPath is a default URL path for endpoint that
|
||||
// receives metrics.
|
||||
DefaultMetricsPath string = "/v1/metrics"
|
||||
// DefaultMaxRequestSize is the default maximum size of a serialized export
|
||||
// request, before compression.
|
||||
DefaultMaxRequestSize int = 32 * 1024 * 1024
|
||||
// DefaultBackoff is a default base backoff time used in the
|
||||
// exponential backoff strategy.
|
||||
DefaultBackoff time.Duration = 300 * time.Millisecond
|
||||
@@ -49,13 +52,14 @@ type (
|
||||
HTTPTransportProxyFunc func(*http.Request) (*url.URL, error)
|
||||
|
||||
SignalConfig struct {
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
MaxRequestSize int
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
|
||||
TemporalitySelector metric.TemporalitySelector
|
||||
AggregationSelector metric.AggregationSelector
|
||||
@@ -87,10 +91,11 @@ type (
|
||||
func NewHTTPConfig(opts ...HTTPOption) Config {
|
||||
cfg := Config{
|
||||
Metrics: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
|
||||
TemporalitySelector: metric.DefaultTemporalitySelector,
|
||||
AggregationSelector: metric.DefaultAggregationSelector,
|
||||
@@ -123,10 +128,11 @@ func cleanPath(urlPath string, defaultPath string) string {
|
||||
func NewGRPCConfig(opts ...GRPCOption) Config {
|
||||
cfg := Config{
|
||||
Metrics: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
|
||||
TemporalitySelector: metric.DefaultTemporalitySelector,
|
||||
AggregationSelector: metric.DefaultAggregationSelector,
|
||||
@@ -354,6 +360,13 @@ func WithTimeout(duration time.Duration) GenericOption {
|
||||
})
|
||||
}
|
||||
|
||||
func WithMaxRequestSize(size int) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Metrics.MaxRequestSize = size
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
||||
func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Metrics.TemporalitySelector = selector
|
||||
|
||||
@@ -79,9 +79,19 @@ func TestConfigs(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, NoCompression, c.Metrics.Compression)
|
||||
assert.Equal(t, map[string]string(nil), c.Metrics.Headers)
|
||||
assert.Equal(t, DefaultMaxRequestSize, c.Metrics.MaxRequestSize)
|
||||
assert.Equal(t, 10*time.Second, c.Metrics.Timeout)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test With Max Request Size",
|
||||
opts: []GenericOption{
|
||||
WithMaxRequestSize(1),
|
||||
},
|
||||
asserts: func(t *testing.T, c *Config, grpcOption bool) {
|
||||
assert.Equal(t, 1, c.Metrics.MaxRequestSize)
|
||||
},
|
||||
},
|
||||
|
||||
// Endpoint Tests
|
||||
{
|
||||
|
||||
@@ -31,10 +31,11 @@ import (
|
||||
|
||||
type client struct {
|
||||
// req is cloned for every upload the client makes.
|
||||
req *http.Request
|
||||
compression Compression
|
||||
requestFunc retry.RequestFunc
|
||||
httpClient *http.Client
|
||||
req *http.Request
|
||||
compression Compression
|
||||
maxRequestSize int
|
||||
requestFunc retry.RequestFunc
|
||||
httpClient *http.Client
|
||||
|
||||
inst *observ.Instrumentation
|
||||
}
|
||||
@@ -119,11 +120,12 @@ func newClient(cfg oconf.Config) (*client, error) {
|
||||
inst, err := observ.NewInstrumentation(counter.NextExporterID(), cfg.Metrics.Endpoint)
|
||||
|
||||
return &client{
|
||||
compression: Compression(cfg.Metrics.Compression),
|
||||
req: req,
|
||||
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
|
||||
httpClient: httpClient,
|
||||
inst: inst,
|
||||
compression: Compression(cfg.Metrics.Compression),
|
||||
maxRequestSize: cfg.Metrics.MaxRequestSize,
|
||||
req: req,
|
||||
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
|
||||
httpClient: httpClient,
|
||||
inst: inst,
|
||||
}, err
|
||||
}
|
||||
|
||||
@@ -154,6 +156,9 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maxSize := c.maxRequestSize; maxSize > 0 && len(body) > maxSize {
|
||||
return fmt.Errorf("request body too large: exceeded %d bytes", maxSize)
|
||||
}
|
||||
request, err := c.newRequest(ctx, body)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -515,6 +515,30 @@ func TestResponseBodySizeLimit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestBodySizeLimit(t *testing.T) {
|
||||
var calls int
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
calls++
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
opts := []Option{
|
||||
WithEndpoint(srv.Listener.Addr().String()),
|
||||
WithInsecure(),
|
||||
WithMaxRequestSize(1),
|
||||
WithRetry(RetryConfig{Enabled: false}),
|
||||
}
|
||||
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)
|
||||
c, err := newClient(cfg)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = c.Shutdown(t.Context()) })
|
||||
|
||||
err = c.UploadMetrics(t.Context(), &mpb.ResourceMetrics{})
|
||||
assert.ErrorContains(t, err, "request body too large")
|
||||
assert.Equal(t, 0, calls, "oversized request must fail before sending")
|
||||
}
|
||||
|
||||
func TestClientInstrumentation(t *testing.T) {
|
||||
// Enable instrumentation for this test.
|
||||
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
|
||||
@@ -185,6 +185,16 @@ func WithTimeout(duration time.Duration) Option {
|
||||
return wrappedOption{oconf.WithTimeout(duration)}
|
||||
}
|
||||
|
||||
// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export
|
||||
// request, before compression, that the exporter will send.
|
||||
//
|
||||
// If size is less than or equal to zero, no request-size limit is applied.
|
||||
// Disabling the limit is not recommended because it can lead to excessive
|
||||
// resource consumption or abuse.
|
||||
func WithMaxRequestSize(size int) Option {
|
||||
return wrappedOption{oconf.WithMaxRequestSize(size)}
|
||||
}
|
||||
|
||||
// WithRetry sets the retry policy for transient retryable errors that are
|
||||
// returned by the target endpoint.
|
||||
//
|
||||
|
||||
@@ -35,6 +35,9 @@ const (
|
||||
// DefaultMetricsPath is a default URL path for endpoint that
|
||||
// receives metrics.
|
||||
DefaultMetricsPath string = "/v1/metrics"
|
||||
// DefaultMaxRequestSize is the default maximum size of a serialized export
|
||||
// request, before compression.
|
||||
DefaultMaxRequestSize int = 32 * 1024 * 1024
|
||||
// DefaultBackoff is a default base backoff time used in the
|
||||
// exponential backoff strategy.
|
||||
DefaultBackoff time.Duration = 300 * time.Millisecond
|
||||
@@ -49,13 +52,14 @@ type (
|
||||
HTTPTransportProxyFunc func(*http.Request) (*url.URL, error)
|
||||
|
||||
SignalConfig struct {
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
MaxRequestSize int
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
|
||||
TemporalitySelector metric.TemporalitySelector
|
||||
AggregationSelector metric.AggregationSelector
|
||||
@@ -87,10 +91,11 @@ type (
|
||||
func NewHTTPConfig(opts ...HTTPOption) Config {
|
||||
cfg := Config{
|
||||
Metrics: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
|
||||
TemporalitySelector: metric.DefaultTemporalitySelector,
|
||||
AggregationSelector: metric.DefaultAggregationSelector,
|
||||
@@ -123,10 +128,11 @@ func cleanPath(urlPath string, defaultPath string) string {
|
||||
func NewGRPCConfig(opts ...GRPCOption) Config {
|
||||
cfg := Config{
|
||||
Metrics: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
|
||||
TemporalitySelector: metric.DefaultTemporalitySelector,
|
||||
AggregationSelector: metric.DefaultAggregationSelector,
|
||||
@@ -354,6 +360,13 @@ func WithTimeout(duration time.Duration) GenericOption {
|
||||
})
|
||||
}
|
||||
|
||||
func WithMaxRequestSize(size int) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Metrics.MaxRequestSize = size
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
||||
func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Metrics.TemporalitySelector = selector
|
||||
|
||||
@@ -79,9 +79,19 @@ func TestConfigs(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, NoCompression, c.Metrics.Compression)
|
||||
assert.Equal(t, map[string]string(nil), c.Metrics.Headers)
|
||||
assert.Equal(t, DefaultMaxRequestSize, c.Metrics.MaxRequestSize)
|
||||
assert.Equal(t, 10*time.Second, c.Metrics.Timeout)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test With Max Request Size",
|
||||
opts: []GenericOption{
|
||||
WithMaxRequestSize(1),
|
||||
},
|
||||
asserts: func(t *testing.T, c *Config, grpcOption bool) {
|
||||
assert.Equal(t, 1, c.Metrics.MaxRequestSize)
|
||||
},
|
||||
},
|
||||
|
||||
// Endpoint Tests
|
||||
{
|
||||
|
||||
@@ -6,6 +6,7 @@ package otlptracegrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlptra
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -16,6 +17,7 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
|
||||
@@ -26,11 +28,12 @@ import (
|
||||
)
|
||||
|
||||
type client struct {
|
||||
endpoint string
|
||||
dialOpts []grpc.DialOption
|
||||
metadata metadata.MD
|
||||
exportTimeout time.Duration
|
||||
requestFunc retry.RequestFunc
|
||||
endpoint string
|
||||
dialOpts []grpc.DialOption
|
||||
metadata metadata.MD
|
||||
exportTimeout time.Duration
|
||||
maxRequestSize int
|
||||
requestFunc retry.RequestFunc
|
||||
|
||||
// stopCtx is used as a parent context for all exports. Therefore, when it
|
||||
// is canceled with the stopFunc all exports are canceled.
|
||||
@@ -65,14 +68,15 @@ func newClient(opts ...Option) *client {
|
||||
ctx, cancel := context.WithCancel(context.Background()) //nolint:gosec // cancel called in client shutdown.
|
||||
|
||||
c := &client{
|
||||
endpoint: cfg.Traces.Endpoint,
|
||||
exportTimeout: cfg.Traces.Timeout,
|
||||
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
|
||||
dialOpts: cfg.DialOptions,
|
||||
stopCtx: ctx,
|
||||
stopFunc: cancel,
|
||||
conn: cfg.GRPCConn,
|
||||
instID: counter.NextExporterID(),
|
||||
endpoint: cfg.Traces.Endpoint,
|
||||
exportTimeout: cfg.Traces.Timeout,
|
||||
maxRequestSize: cfg.Traces.MaxRequestSize,
|
||||
requestFunc: cfg.RetryConfig.RequestFunc(retryable),
|
||||
dialOpts: cfg.DialOptions,
|
||||
stopCtx: ctx,
|
||||
stopFunc: cancel,
|
||||
conn: cfg.GRPCConn,
|
||||
instID: counter.NextExporterID(),
|
||||
}
|
||||
|
||||
if len(cfg.Traces.Headers) > 0 {
|
||||
@@ -205,16 +209,22 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
||||
ctx, cancel := c.exportContext(ctx)
|
||||
defer cancel()
|
||||
|
||||
var code codes.Code
|
||||
pbRequest := &coltracepb.ExportTraceServiceRequest{
|
||||
ResourceSpans: protoSpans,
|
||||
}
|
||||
|
||||
code := codes.Unknown
|
||||
if c.inst != nil {
|
||||
op := c.inst.ExportSpans(ctx, len(protoSpans))
|
||||
defer func() { op.End(uploadErr, code) }()
|
||||
}
|
||||
|
||||
if maxSize := c.maxRequestSize; maxSize > 0 && proto.Size(pbRequest) > maxSize {
|
||||
return fmt.Errorf("request message too large: exceeded %d bytes", maxSize)
|
||||
}
|
||||
|
||||
return c.requestFunc(ctx, func(iCtx context.Context) error {
|
||||
resp, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{
|
||||
ResourceSpans: protoSpans,
|
||||
})
|
||||
resp, err := c.tsc.Export(iCtx, pbRequest)
|
||||
if resp != nil && resp.PartialSuccess != nil {
|
||||
msg := resp.PartialSuccess.GetErrorMessage()
|
||||
n := resp.PartialSuccess.GetRejectedSpans()
|
||||
|
||||
@@ -277,6 +277,29 @@ func TestExportSpansTimeoutHonored(t *testing.T) {
|
||||
require.True(t, strings.HasPrefix(err.Error(), "traces export: "), "%+v", err)
|
||||
}
|
||||
|
||||
func TestExportSpansRequestSizeLimit(t *testing.T) {
|
||||
mc := runMockCollector(t)
|
||||
t.Cleanup(func() { require.NoError(t, mc.stop()) })
|
||||
|
||||
ctx := t.Context()
|
||||
exp := newGRPCExporter(
|
||||
t,
|
||||
ctx,
|
||||
mc.endpoint,
|
||||
otlptracegrpc.WithMaxRequestSize(1),
|
||||
otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{Enabled: false}),
|
||||
)
|
||||
t.Cleanup(func() {
|
||||
ctx, cancel := contextWithTimeout(context.WithoutCancel(t.Context()), t, 10*time.Second)
|
||||
defer cancel()
|
||||
require.NoError(t, exp.Shutdown(ctx))
|
||||
})
|
||||
|
||||
err := exp.ExportSpans(ctx, roSpans)
|
||||
assert.ErrorContains(t, err, "request message too large")
|
||||
assert.Empty(t, mc.getResourceSpans(), "oversized request must fail before sending")
|
||||
}
|
||||
|
||||
func TestNewWithMultipleAttributeTypes(t *testing.T) {
|
||||
mc := runMockCollector(t)
|
||||
|
||||
|
||||
@@ -31,6 +31,9 @@ const (
|
||||
// DefaultTracesPath is a default URL path for endpoint that
|
||||
// receives spans.
|
||||
DefaultTracesPath string = "/v1/traces"
|
||||
// DefaultMaxRequestSize is the default maximum size of a serialized export
|
||||
// request, before compression.
|
||||
DefaultMaxRequestSize int = 32 * 1024 * 1024
|
||||
// DefaultTimeout is a default max waiting time for the backend to process
|
||||
// each span batch.
|
||||
DefaultTimeout time.Duration = 10 * time.Second
|
||||
@@ -42,13 +45,14 @@ type (
|
||||
HTTPTransportProxyFunc func(*http.Request) (*url.URL, error)
|
||||
|
||||
SignalConfig struct {
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
MaxRequestSize int
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
|
||||
// gRPC configurations
|
||||
GRPCCredentials credentials.TransportCredentials
|
||||
@@ -77,10 +81,11 @@ type (
|
||||
func NewHTTPConfig(opts ...HTTPOption) Config {
|
||||
cfg := Config{
|
||||
Traces: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
},
|
||||
RetryConfig: retry.DefaultConfig,
|
||||
}
|
||||
@@ -111,10 +116,11 @@ func NewGRPCConfig(opts ...GRPCOption) Config {
|
||||
userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version()
|
||||
cfg := Config{
|
||||
Traces: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
},
|
||||
RetryConfig: retry.DefaultConfig,
|
||||
DialOptions: []grpc.DialOption{grpc.WithUserAgent(userAgent)},
|
||||
@@ -345,6 +351,13 @@ func WithTimeout(duration time.Duration) GenericOption {
|
||||
})
|
||||
}
|
||||
|
||||
func WithMaxRequestSize(size int) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Traces.MaxRequestSize = size
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
||||
func WithProxy(pf HTTPTransportProxyFunc) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Traces.Proxy = pf
|
||||
|
||||
@@ -77,9 +77,19 @@ func TestConfigs(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, NoCompression, c.Traces.Compression)
|
||||
assert.Equal(t, map[string]string(nil), c.Traces.Headers)
|
||||
assert.Equal(t, DefaultMaxRequestSize, c.Traces.MaxRequestSize)
|
||||
assert.Equal(t, 10*time.Second, c.Traces.Timeout)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test With Max Request Size",
|
||||
opts: []GenericOption{
|
||||
WithMaxRequestSize(1),
|
||||
},
|
||||
asserts: func(t *testing.T, c *Config, grpcOption bool) {
|
||||
assert.Equal(t, 1, c.Traces.MaxRequestSize)
|
||||
},
|
||||
},
|
||||
|
||||
// Endpoint Tests
|
||||
{
|
||||
|
||||
@@ -192,6 +192,16 @@ func WithTimeout(duration time.Duration) Option {
|
||||
return wrappedOption{otlpconfig.WithTimeout(duration)}
|
||||
}
|
||||
|
||||
// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export
|
||||
// request, before compression, that the exporter will send.
|
||||
//
|
||||
// If size is less than or equal to zero, no request-size limit is applied.
|
||||
// Disabling the limit is not recommended because it can lead to excessive
|
||||
// resource consumption or abuse.
|
||||
func WithMaxRequestSize(size int) Option {
|
||||
return wrappedOption{otlpconfig.WithMaxRequestSize(size)}
|
||||
}
|
||||
|
||||
// WithRetry sets the retry policy for transient retryable errors that may be
|
||||
// returned by the target endpoint when exporting a batch of spans.
|
||||
//
|
||||
|
||||
@@ -168,6 +168,10 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
||||
ctx, cancel := d.contextWithStop(ctx)
|
||||
defer cancel()
|
||||
|
||||
if maxSize := d.cfg.MaxRequestSize; maxSize > 0 && len(rawRequest) > maxSize {
|
||||
return fmt.Errorf("request body too large: exceeded %d bytes", maxSize)
|
||||
}
|
||||
|
||||
request, err := d.newRequest(rawRequest)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -652,6 +652,29 @@ func TestResponseBodySizeLimit(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestBodySizeLimit(t *testing.T) {
|
||||
var calls int
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
calls++
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
client := otlptracehttp.NewClient(
|
||||
otlptracehttp.WithEndpointURL(srv.URL),
|
||||
otlptracehttp.WithInsecure(),
|
||||
otlptracehttp.WithMaxRequestSize(1),
|
||||
otlptracehttp.WithRetry(otlptracehttp.RetryConfig{Enabled: false}),
|
||||
)
|
||||
exporter, err := otlptrace.New(t.Context(), client)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { _ = exporter.Shutdown(t.Context()) })
|
||||
|
||||
err = exporter.ExportSpans(t.Context(), otlptracetest.SingleReadOnlySpan())
|
||||
assert.ErrorContains(t, err, "request body too large")
|
||||
assert.Equal(t, 0, calls, "oversized request must fail before sending")
|
||||
}
|
||||
|
||||
func TestGetBodyCalledOnRedirect(t *testing.T) {
|
||||
// Test that req.GetBody is set correctly, allowing the HTTP transport
|
||||
// to re-send the body on 307 redirects.
|
||||
|
||||
@@ -31,6 +31,9 @@ const (
|
||||
// DefaultTracesPath is a default URL path for endpoint that
|
||||
// receives spans.
|
||||
DefaultTracesPath string = "/v1/traces"
|
||||
// DefaultMaxRequestSize is the default maximum size of a serialized export
|
||||
// request, before compression.
|
||||
DefaultMaxRequestSize int = 32 * 1024 * 1024
|
||||
// DefaultTimeout is a default max waiting time for the backend to process
|
||||
// each span batch.
|
||||
DefaultTimeout time.Duration = 10 * time.Second
|
||||
@@ -42,13 +45,14 @@ type (
|
||||
HTTPTransportProxyFunc func(*http.Request) (*url.URL, error)
|
||||
|
||||
SignalConfig struct {
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
MaxRequestSize int
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
|
||||
// gRPC configurations
|
||||
GRPCCredentials credentials.TransportCredentials
|
||||
@@ -77,10 +81,11 @@ type (
|
||||
func NewHTTPConfig(opts ...HTTPOption) Config {
|
||||
cfg := Config{
|
||||
Traces: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
},
|
||||
RetryConfig: retry.DefaultConfig,
|
||||
}
|
||||
@@ -111,10 +116,11 @@ func NewGRPCConfig(opts ...GRPCOption) Config {
|
||||
userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version()
|
||||
cfg := Config{
|
||||
Traces: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
},
|
||||
RetryConfig: retry.DefaultConfig,
|
||||
DialOptions: []grpc.DialOption{grpc.WithUserAgent(userAgent)},
|
||||
@@ -345,6 +351,13 @@ func WithTimeout(duration time.Duration) GenericOption {
|
||||
})
|
||||
}
|
||||
|
||||
func WithMaxRequestSize(size int) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Traces.MaxRequestSize = size
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
||||
func WithProxy(pf HTTPTransportProxyFunc) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Traces.Proxy = pf
|
||||
|
||||
@@ -77,9 +77,19 @@ func TestConfigs(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, NoCompression, c.Traces.Compression)
|
||||
assert.Equal(t, map[string]string(nil), c.Traces.Headers)
|
||||
assert.Equal(t, DefaultMaxRequestSize, c.Traces.MaxRequestSize)
|
||||
assert.Equal(t, 10*time.Second, c.Traces.Timeout)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test With Max Request Size",
|
||||
opts: []GenericOption{
|
||||
WithMaxRequestSize(1),
|
||||
},
|
||||
asserts: func(t *testing.T, c *Config, grpcOption bool) {
|
||||
assert.Equal(t, 1, c.Traces.MaxRequestSize)
|
||||
},
|
||||
},
|
||||
|
||||
// Endpoint Tests
|
||||
{
|
||||
|
||||
@@ -138,6 +138,16 @@ func WithTimeout(duration time.Duration) Option {
|
||||
return wrappedOption{otlpconfig.WithTimeout(duration)}
|
||||
}
|
||||
|
||||
// WithMaxRequestSize sets the maximum size, in bytes, of a serialized export
|
||||
// request, before compression, that the exporter will send.
|
||||
//
|
||||
// If size is less than or equal to zero, no request-size limit is applied.
|
||||
// Disabling the limit is not recommended because it can lead to excessive
|
||||
// resource consumption or abuse.
|
||||
func WithMaxRequestSize(size int) Option {
|
||||
return wrappedOption{otlpconfig.WithMaxRequestSize(size)}
|
||||
}
|
||||
|
||||
// WithRetry configures the retry policy for transient errors that may occurs
|
||||
// when exporting traces. An exponential back-off algorithm is used to ensure
|
||||
// endpoints are not overwhelmed with retries. If unset, the default retry
|
||||
|
||||
@@ -35,6 +35,9 @@ const (
|
||||
// DefaultMetricsPath is a default URL path for endpoint that
|
||||
// receives metrics.
|
||||
DefaultMetricsPath string = "/v1/metrics"
|
||||
// DefaultMaxRequestSize is the default maximum size of a serialized export
|
||||
// request, before compression.
|
||||
DefaultMaxRequestSize int = 32 * 1024 * 1024
|
||||
// DefaultBackoff is a default base backoff time used in the
|
||||
// exponential backoff strategy.
|
||||
DefaultBackoff time.Duration = 300 * time.Millisecond
|
||||
@@ -49,13 +52,14 @@ type (
|
||||
HTTPTransportProxyFunc func(*http.Request) (*url.URL, error)
|
||||
|
||||
SignalConfig struct {
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
MaxRequestSize int
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
|
||||
TemporalitySelector metric.TemporalitySelector
|
||||
AggregationSelector metric.AggregationSelector
|
||||
@@ -87,10 +91,11 @@ type (
|
||||
func NewHTTPConfig(opts ...HTTPOption) Config {
|
||||
cfg := Config{
|
||||
Metrics: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
|
||||
TemporalitySelector: metric.DefaultTemporalitySelector,
|
||||
AggregationSelector: metric.DefaultAggregationSelector,
|
||||
@@ -123,10 +128,11 @@ func cleanPath(urlPath string, defaultPath string) string {
|
||||
func NewGRPCConfig(opts ...GRPCOption) Config {
|
||||
cfg := Config{
|
||||
Metrics: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultMetricsPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
|
||||
TemporalitySelector: metric.DefaultTemporalitySelector,
|
||||
AggregationSelector: metric.DefaultAggregationSelector,
|
||||
@@ -354,6 +360,13 @@ func WithTimeout(duration time.Duration) GenericOption {
|
||||
})
|
||||
}
|
||||
|
||||
func WithMaxRequestSize(size int) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Metrics.MaxRequestSize = size
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
||||
func WithTemporalitySelector(selector metric.TemporalitySelector) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Metrics.TemporalitySelector = selector
|
||||
|
||||
@@ -79,9 +79,19 @@ func TestConfigs(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, NoCompression, c.Metrics.Compression)
|
||||
assert.Equal(t, map[string]string(nil), c.Metrics.Headers)
|
||||
assert.Equal(t, DefaultMaxRequestSize, c.Metrics.MaxRequestSize)
|
||||
assert.Equal(t, 10*time.Second, c.Metrics.Timeout)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test With Max Request Size",
|
||||
opts: []GenericOption{
|
||||
WithMaxRequestSize(1),
|
||||
},
|
||||
asserts: func(t *testing.T, c *Config, grpcOption bool) {
|
||||
assert.Equal(t, 1, c.Metrics.MaxRequestSize)
|
||||
},
|
||||
},
|
||||
|
||||
// Endpoint Tests
|
||||
{
|
||||
|
||||
@@ -31,6 +31,9 @@ const (
|
||||
// DefaultTracesPath is a default URL path for endpoint that
|
||||
// receives spans.
|
||||
DefaultTracesPath string = "/v1/traces"
|
||||
// DefaultMaxRequestSize is the default maximum size of a serialized export
|
||||
// request, before compression.
|
||||
DefaultMaxRequestSize int = 32 * 1024 * 1024
|
||||
// DefaultTimeout is a default max waiting time for the backend to process
|
||||
// each span batch.
|
||||
DefaultTimeout time.Duration = 10 * time.Second
|
||||
@@ -42,13 +45,14 @@ type (
|
||||
HTTPTransportProxyFunc func(*http.Request) (*url.URL, error)
|
||||
|
||||
SignalConfig struct {
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
Endpoint string
|
||||
Insecure bool
|
||||
TLSCfg *tls.Config
|
||||
Headers map[string]string
|
||||
Compression Compression
|
||||
MaxRequestSize int
|
||||
Timeout time.Duration
|
||||
URLPath string
|
||||
|
||||
// gRPC configurations
|
||||
GRPCCredentials credentials.TransportCredentials
|
||||
@@ -77,10 +81,11 @@ type (
|
||||
func NewHTTPConfig(opts ...HTTPOption) Config {
|
||||
cfg := Config{
|
||||
Traces: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
},
|
||||
RetryConfig: retry.DefaultConfig,
|
||||
}
|
||||
@@ -111,10 +116,11 @@ func NewGRPCConfig(opts ...GRPCOption) Config {
|
||||
userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version()
|
||||
cfg := Config{
|
||||
Traces: SignalConfig{
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
Timeout: DefaultTimeout,
|
||||
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
|
||||
URLPath: DefaultTracesPath,
|
||||
Compression: NoCompression,
|
||||
MaxRequestSize: DefaultMaxRequestSize,
|
||||
Timeout: DefaultTimeout,
|
||||
},
|
||||
RetryConfig: retry.DefaultConfig,
|
||||
DialOptions: []grpc.DialOption{grpc.WithUserAgent(userAgent)},
|
||||
@@ -345,6 +351,13 @@ func WithTimeout(duration time.Duration) GenericOption {
|
||||
})
|
||||
}
|
||||
|
||||
func WithMaxRequestSize(size int) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Traces.MaxRequestSize = size
|
||||
return cfg
|
||||
})
|
||||
}
|
||||
|
||||
func WithProxy(pf HTTPTransportProxyFunc) GenericOption {
|
||||
return newGenericOption(func(cfg Config) Config {
|
||||
cfg.Traces.Proxy = pf
|
||||
|
||||
@@ -77,9 +77,19 @@ func TestConfigs(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, NoCompression, c.Traces.Compression)
|
||||
assert.Equal(t, map[string]string(nil), c.Traces.Headers)
|
||||
assert.Equal(t, DefaultMaxRequestSize, c.Traces.MaxRequestSize)
|
||||
assert.Equal(t, 10*time.Second, c.Traces.Timeout)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Test With Max Request Size",
|
||||
opts: []GenericOption{
|
||||
WithMaxRequestSize(1),
|
||||
},
|
||||
asserts: func(t *testing.T, c *Config, grpcOption bool) {
|
||||
assert.Equal(t, 1, c.Traces.MaxRequestSize)
|
||||
},
|
||||
},
|
||||
|
||||
// Endpoint Tests
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user