You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
otlptracehttp: reset pooled gzip writer before reuse (#8196)
Based on https://github.com/open-telemetry/opentelemetry-go/pull/8152 and https://github.com/open-telemetry/opentelemetry-go/pull/8185
This commit is contained in:
@@ -299,7 +299,10 @@ func (d *client) newRequest(body []byte) (request, error) {
|
||||
r.Header.Set("Content-Encoding", "gzip")
|
||||
|
||||
gz := gzPool.Get().(*gzip.Writer)
|
||||
defer gzPool.Put(gz)
|
||||
defer func() {
|
||||
gz.Reset(io.Discard)
|
||||
gzPool.Put(gz)
|
||||
}()
|
||||
|
||||
var b bytes.Buffer
|
||||
gz.Reset(&b)
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
package otlptracehttp_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
@@ -702,6 +704,72 @@ func TestGetBodyCalledOnRedirect(t *testing.T) {
|
||||
assert.Equal(t, requestBodies[0], requestBodies[1], "redirect body should match original")
|
||||
}
|
||||
|
||||
func TestGetBodyCalledOnRedirectWithGzip(t *testing.T) {
|
||||
// Test that req.GetBody replays the gzipped request body on redirects.
|
||||
var mu sync.Mutex
|
||||
var requestBodies [][]byte
|
||||
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
assert.Equal(t, "gzip", r.Header.Get("Content-Encoding"))
|
||||
|
||||
mu.Lock()
|
||||
requestBodies = append(requestBodies, body)
|
||||
isFirstRequest := len(requestBodies) == 1
|
||||
mu.Unlock()
|
||||
|
||||
if isFirstRequest {
|
||||
w.Header().Set("Location", "/v1/traces/final")
|
||||
w.WriteHeader(http.StatusTemporaryRedirect)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/x-protobuf")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
client := otlptracehttp.NewClient(
|
||||
otlptracehttp.WithEndpoint(server.Listener.Addr().String()),
|
||||
otlptracehttp.WithInsecure(),
|
||||
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
|
||||
)
|
||||
|
||||
ctx := t.Context()
|
||||
exporter, err := otlptrace.New(ctx, client)
|
||||
require.NoError(t, err)
|
||||
defer func() { _ = exporter.Shutdown(ctx) }()
|
||||
|
||||
err = exporter.ExportSpans(ctx, otlptracetest.SingleReadOnlySpan())
|
||||
require.NoError(t, err)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
require.Len(t, requestBodies, 2, "expected 2 requests (original + redirect)")
|
||||
assert.NotEmpty(t, requestBodies[0], "original request body should not be empty")
|
||||
assert.Equal(t, requestBodies[0], requestBodies[1], "redirect body should match original")
|
||||
|
||||
for _, body := range requestBodies {
|
||||
reader, err := gzip.NewReader(bytes.NewReader(body))
|
||||
require.NoError(t, err)
|
||||
func() {
|
||||
defer func() { assert.NoError(t, reader.Close()) }()
|
||||
|
||||
decoded, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.NotEmpty(t, decoded)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkExporterExportSpans(b *testing.B) {
|
||||
const n = 10
|
||||
|
||||
|
||||
Reference in New Issue
Block a user