mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-26 03:52:03 +02:00
bf37c5a3a4
* Revert "otlpmetrichttp: Use go.opentelemetry.io/proto/slim/otlp (#5222)" This reverts commit 6e92163d6adf91ec77fa41c155c557da6e078563. * Revert "otlploghttp: Use go.opentelemetry.io/proto/slim/otlp (#5216)" This reverts commit fe3de7059e19a0e88c7e8b342ed345e50df94aa3. * Remove slim dep * Fix CI
301 lines
7.3 KiB
Go
301 lines
7.3 KiB
Go
// Copyright The OpenTelemetry Authors
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package otlploghttp // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
|
|
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
|
|
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry"
|
|
)
|
|
|
|
type client struct {
|
|
uploadLogs func(context.Context, []*logpb.ResourceLogs) error
|
|
}
|
|
|
|
func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error {
|
|
if c.uploadLogs != nil {
|
|
return c.uploadLogs(ctx, rl)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func newNoopClient() *client {
|
|
return &client{}
|
|
}
|
|
|
|
// newHTTPClient creates a new HTTP log client.
|
|
func newHTTPClient(cfg config) (*client, error) {
|
|
hc := &http.Client{
|
|
Transport: ourTransport,
|
|
Timeout: cfg.timeout.Value,
|
|
}
|
|
|
|
if cfg.tlsCfg.Value != nil || cfg.proxy.Value != nil {
|
|
clonedTransport := ourTransport.Clone()
|
|
hc.Transport = clonedTransport
|
|
|
|
if cfg.tlsCfg.Value != nil {
|
|
clonedTransport.TLSClientConfig = cfg.tlsCfg.Value
|
|
}
|
|
if cfg.proxy.Value != nil {
|
|
clonedTransport.Proxy = cfg.proxy.Value
|
|
}
|
|
}
|
|
|
|
u := &url.URL{
|
|
Scheme: "https",
|
|
Host: cfg.endpoint.Value,
|
|
Path: cfg.path.Value,
|
|
}
|
|
if cfg.insecure.Value {
|
|
u.Scheme = "http"
|
|
}
|
|
// Body is set when this is cloned during upload.
|
|
req, err := http.NewRequest(http.MethodPost, u.String(), http.NoBody)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
userAgent := "OTel Go OTLP over HTTP/protobuf logs exporter/" + Version()
|
|
req.Header.Set("User-Agent", userAgent)
|
|
|
|
if n := len(cfg.headers.Value); n > 0 {
|
|
for k, v := range cfg.headers.Value {
|
|
req.Header.Set(k, v)
|
|
}
|
|
}
|
|
req.Header.Set("Content-Type", "application/x-protobuf")
|
|
|
|
c := &httpClient{
|
|
compression: cfg.compression.Value,
|
|
req: req,
|
|
requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate),
|
|
client: hc,
|
|
}
|
|
return &client{uploadLogs: c.uploadLogs}, nil
|
|
}
|
|
|
|
type httpClient struct {
|
|
// req is cloned for every upload the client makes.
|
|
req *http.Request
|
|
compression Compression
|
|
requestFunc retry.RequestFunc
|
|
client *http.Client
|
|
}
|
|
|
|
// Keep it in sync with golang's DefaultTransport from net/http! We
|
|
// have our own copy to avoid handling a situation where the
|
|
// DefaultTransport is overwritten with some different implementation
|
|
// of http.RoundTripper or it's modified by another package.
|
|
var ourTransport = &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
DialContext: (&net.Dialer{
|
|
Timeout: 30 * time.Second,
|
|
KeepAlive: 30 * time.Second,
|
|
}).DialContext,
|
|
ForceAttemptHTTP2: true,
|
|
MaxIdleConns: 100,
|
|
IdleConnTimeout: 90 * time.Second,
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
ExpectContinueTimeout: 1 * time.Second,
|
|
}
|
|
|
|
func (c *httpClient) uploadLogs(ctx context.Context, data []*logpb.ResourceLogs) error {
|
|
// The Exporter synchronizes access to client methods. This is not called
|
|
// after the Exporter is shutdown. Only thing to do here is send data.
|
|
|
|
pbRequest := &collogpb.ExportLogsServiceRequest{ResourceLogs: data}
|
|
body, err := proto.Marshal(pbRequest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
request, err := c.newRequest(ctx, body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.requestFunc(ctx, func(iCtx context.Context) error {
|
|
select {
|
|
case <-iCtx.Done():
|
|
return iCtx.Err()
|
|
default:
|
|
}
|
|
|
|
request.reset(iCtx)
|
|
resp, err := c.client.Do(request.Request)
|
|
var urlErr *url.Error
|
|
if errors.As(err, &urlErr) && urlErr.Temporary() {
|
|
return newResponseError(http.Header{})
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var rErr error
|
|
switch sc := resp.StatusCode; {
|
|
case sc >= 200 && sc <= 299:
|
|
// Success, do not retry.
|
|
|
|
// Read the partial success message, if any.
|
|
var respData bytes.Buffer
|
|
if _, err := io.Copy(&respData, resp.Body); err != nil {
|
|
return err
|
|
}
|
|
if respData.Len() == 0 {
|
|
return nil
|
|
}
|
|
|
|
if resp.Header.Get("Content-Type") == "application/x-protobuf" {
|
|
var respProto collogpb.ExportLogsServiceResponse
|
|
if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil {
|
|
return err
|
|
}
|
|
|
|
if respProto.PartialSuccess != nil {
|
|
msg := respProto.PartialSuccess.GetErrorMessage()
|
|
n := respProto.PartialSuccess.GetRejectedLogRecords()
|
|
if n != 0 || msg != "" {
|
|
err := fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n)
|
|
otel.Handle(err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
case sc == http.StatusTooManyRequests,
|
|
sc == http.StatusBadGateway,
|
|
sc == http.StatusServiceUnavailable,
|
|
sc == http.StatusGatewayTimeout:
|
|
// Retry-able failure.
|
|
rErr = newResponseError(resp.Header)
|
|
|
|
// Going to retry, drain the body to reuse the connection.
|
|
if _, err := io.Copy(io.Discard, resp.Body); err != nil {
|
|
_ = resp.Body.Close()
|
|
return err
|
|
}
|
|
default:
|
|
rErr = fmt.Errorf("failed to send logs to %s: %s", request.URL, resp.Status)
|
|
}
|
|
|
|
if err := resp.Body.Close(); err != nil {
|
|
return err
|
|
}
|
|
return rErr
|
|
})
|
|
}
|
|
|
|
var gzPool = sync.Pool{
|
|
New: func() interface{} {
|
|
w := gzip.NewWriter(io.Discard)
|
|
return w
|
|
},
|
|
}
|
|
|
|
func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, error) {
|
|
r := c.req.Clone(ctx)
|
|
req := request{Request: r}
|
|
|
|
switch c.compression {
|
|
case NoCompression:
|
|
r.ContentLength = (int64)(len(body))
|
|
req.bodyReader = bodyReader(body)
|
|
case GzipCompression:
|
|
// Ensure the content length is not used.
|
|
r.ContentLength = -1
|
|
r.Header.Set("Content-Encoding", "gzip")
|
|
|
|
gz := gzPool.Get().(*gzip.Writer)
|
|
defer gzPool.Put(gz)
|
|
|
|
var b bytes.Buffer
|
|
gz.Reset(&b)
|
|
|
|
if _, err := gz.Write(body); err != nil {
|
|
return req, err
|
|
}
|
|
// Close needs to be called to ensure body is fully written.
|
|
if err := gz.Close(); err != nil {
|
|
return req, err
|
|
}
|
|
|
|
req.bodyReader = bodyReader(b.Bytes())
|
|
}
|
|
|
|
return req, nil
|
|
}
|
|
|
|
// bodyReader returns a closure returning a new reader for buf.
|
|
func bodyReader(buf []byte) func() io.ReadCloser {
|
|
return func() io.ReadCloser {
|
|
return io.NopCloser(bytes.NewReader(buf))
|
|
}
|
|
}
|
|
|
|
// request wraps an http.Request with a resettable body reader.
|
|
type request struct {
|
|
*http.Request
|
|
|
|
// bodyReader allows the same body to be used for multiple requests.
|
|
bodyReader func() io.ReadCloser
|
|
}
|
|
|
|
// reset reinitializes the request Body and uses ctx for the request.
|
|
func (r *request) reset(ctx context.Context) {
|
|
r.Body = r.bodyReader()
|
|
r.Request = r.WithContext(ctx)
|
|
}
|
|
|
|
// retryableError represents a request failure that can be retried.
|
|
type retryableError struct {
|
|
throttle int64
|
|
}
|
|
|
|
// newResponseError returns a retryableError and will extract any explicit
|
|
// throttle delay contained in headers.
|
|
func newResponseError(header http.Header) error {
|
|
var rErr retryableError
|
|
if v := header.Get("Retry-After"); v != "" {
|
|
if t, err := strconv.ParseInt(v, 10, 64); err == nil {
|
|
rErr.throttle = t
|
|
}
|
|
}
|
|
return rErr
|
|
}
|
|
|
|
func (e retryableError) Error() string {
|
|
return "retry-able request failure"
|
|
}
|
|
|
|
// evaluate returns if err is retry-able. If it is and it includes an explicit
|
|
// throttling delay, that delay is also returned.
|
|
func evaluate(err error) (bool, time.Duration) {
|
|
if err == nil {
|
|
return false, 0
|
|
}
|
|
|
|
rErr, ok := err.(retryableError)
|
|
if !ok {
|
|
return false, 0
|
|
}
|
|
|
|
return true, time.Duration(rErr.throttle)
|
|
}
|