You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
support stdlib request.GetBody on metrics (#7931)
Based on https://github.com/open-telemetry/opentelemetry-go/pull/7794 This solves the exact same problem but on metrics. > traces export: Post "https://***/v1/metrics": http2: Transport: cannot retry err [http2: Transport received Server's graceful shutdown GOAWAY] after Request.Body was written; define Request.GetBody to avoid this error --------- Co-authored-by: Damien Mathieu <42@dmathieu.com>
This commit is contained in:
@@ -14,6 +14,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
The package contains semantic conventions from the `v1.40.0` version of the OpenTelemetry Semantic Conventions.
|
||||
See the [migration documentation](./semconv/v1.40.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.39.0`. (#7985)
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fix missing `request.GetBody` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` to correctly handle HTTP2 GOAWAY frame. (#7931)
|
||||
|
||||
### Removed
|
||||
|
||||
- Drop support for [Go 1.24]. (#7984)
|
||||
|
||||
@@ -243,6 +243,7 @@ func (c *client) newRequest(ctx context.Context, body []byte) (request, error) {
|
||||
case NoCompression:
|
||||
r.ContentLength = int64(len(body))
|
||||
req.bodyReader = bodyReader(body)
|
||||
req.GetBody = bodyReaderErr(body)
|
||||
case GzipCompression:
|
||||
// Ensure the content length is not used.
|
||||
r.ContentLength = -1
|
||||
@@ -263,6 +264,7 @@ func (c *client) newRequest(ctx context.Context, body []byte) (request, error) {
|
||||
}
|
||||
|
||||
req.bodyReader = bodyReader(b.Bytes())
|
||||
req.GetBody = bodyReaderErr(body)
|
||||
}
|
||||
|
||||
return req, nil
|
||||
@@ -275,6 +277,13 @@ func bodyReader(buf []byte) func() io.ReadCloser {
|
||||
}
|
||||
}
|
||||
|
||||
// bodyReaderErr returns a closure returning a new reader for buf.
|
||||
func bodyReaderErr(buf []byte) func() (io.ReadCloser, error) {
|
||||
return func() (io.ReadCloser, error) {
|
||||
return io.NopCloser(bytes.NewReader(buf)), nil
|
||||
}
|
||||
}
|
||||
|
||||
// request wraps an http.Request with a resettable body reader.
|
||||
type request struct {
|
||||
*http.Request
|
||||
|
||||
@@ -8,9 +8,12 @@ import (
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -324,3 +327,55 @@ func TestConfig(t *testing.T) {
|
||||
assert.NoError(t, exCtx.Err())
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetBodyCalledOnRedirect(t *testing.T) {
|
||||
// Test that req.GetBody is set correctly, allowing the HTTP transport
|
||||
// to re-send the body on 307 redirects.
|
||||
var mu sync.Mutex
|
||||
var requestBodies [][]byte
|
||||
|
||||
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
requestBodies = append(requestBodies, body)
|
||||
isFirstRequest := len(requestBodies) == 1
|
||||
mu.Unlock()
|
||||
|
||||
if isFirstRequest {
|
||||
w.Header().Set("Location", "/v1/metrics/final")
|
||||
w.WriteHeader(http.StatusTemporaryRedirect)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/x-protobuf")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
||||
opts := []Option{WithEndpoint(server.Listener.Addr().String()), WithInsecure()}
|
||||
cfg := oconf.NewHTTPConfig(asHTTPOptions(opts)...)
|
||||
client, err := newClient(cfg)
|
||||
require.NoError(t, err)
|
||||
|
||||
exporter, err := newExporter(client, cfg)
|
||||
require.NoError(t, err)
|
||||
ctx := t.Context()
|
||||
defer func() { _ = exporter.Shutdown(ctx) }()
|
||||
|
||||
err = exporter.Export(ctx, &metricdata.ResourceMetrics{})
|
||||
require.NoError(t, err)
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
require.Len(t, requestBodies, 2, "expected 2 requests (original + redirect)")
|
||||
assert.NotEmpty(t, requestBodies[0], "original request body should not be empty")
|
||||
assert.Equal(t, requestBodies[0], requestBodies[1], "redirect body should match original")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user