1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-16 10:19:23 +02:00
opentelemetry-go/exporters/otlp/otlptrace/otlptracehttp/client.go

325 lines
7.8 KiB
Go
Raw Normal View History

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlptracehttp // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"sync"
"time"
"google.golang.org/protobuf/proto"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
)
const contentTypeProto = "application/x-protobuf"
var gzPool = sync.Pool{
New: func() interface{} {
w := gzip.NewWriter(ioutil.Discard)
return w
},
}
// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
// DefaultTransport is overwritten with some different implementation
// of http.RoundTripper or it's modified by other package.
var ourTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
type client struct {
name string
cfg otlpconfig.SignalConfig
generalCfg otlpconfig.Config
requestFunc retry.RequestFunc
client *http.Client
stopCh chan struct{}
Fix goroutine leaks in otlptracegrpc testing (#2409) * Fail in RunEndToEndTest if collector stop fails * Use testing T.Cleanup to check shut downs * Add goroutine leak detection * Fix TestExporterShutdown go leak The shutdown tests checking if a context error is honored did not completely clean up the resources used by the client after the error was evaluated. Update the connection client to handle multiple calls to shutdown and make a second call to these clients that must succeed so the test does not have abandoned goroutines. * Fix leak in TestNew_WithTimeout The mockTraceService did not delay with its lock being held. This resulted in the mockCollector stopping and being able to acquire the lock. It was assumed that no export was taking place because of this and the mockTraceService was abandoned without cleaning up resources it held and goroutines it had spawned. This reworks the export blocking logic to block on a channel read. This will make the block more deterministic and not depend on the scheduler timing. Additionally, this blocking is moved inside the lock acquire. Meaning code will deadlock if the block is not released before a shutdown (something the developer will immediately be aware of when they submit a bad patch), and will ensure all resources are released before shutdown. Replace TestNew_WithTimeout with TestExportSpansTimeoutHonored which directly tests if a span export errors when the timeout is reached. This is the only unique thing that TestNew_WithTimeout, but it also tests the non-error path. That non-error path is tested in many other tests. * Guard otlptracehttp client stopCh when stopping In normal operations the exporter is guaranteed to only ever call the client Stop method once. However in testing we need to call this multiple times when checking it returns an error in particular context. Add a lightweight sync.Once to the closing of the stopCh to ensure tests do not panic when cleaning up. * Release export block after export Prevent deadlock in TestExportSpansTimeoutHonored.
2021-11-22 17:54:32 +02:00
stopOnce sync.Once
}
var _ otlptrace.Client = (*client)(nil)
// NewClient creates a new HTTP trace client.
func NewClient(opts ...Option) otlptrace.Client {
cfg := otlpconfig.NewDefaultConfig()
cfg = otlpconfig.ApplyHTTPEnvConfigs(cfg)
for _, opt := range opts {
cfg = opt.applyHTTPOption(cfg)
}
for pathPtr, defaultPath := range map[*string]string{
&cfg.Traces.URLPath: otlpconfig.DefaultTracesPath,
} {
tmp := strings.TrimSpace(*pathPtr)
if tmp == "" {
tmp = defaultPath
} else {
tmp = path.Clean(tmp)
if !path.IsAbs(tmp) {
tmp = fmt.Sprintf("/%s", tmp)
}
}
*pathPtr = tmp
}
httpClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.Traces.Timeout,
}
if cfg.Traces.TLSCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.Traces.TLSCfg
httpClient.Transport = transport
}
stopCh := make(chan struct{})
return &client{
name: "traces",
cfg: cfg.Traces,
generalCfg: cfg,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
stopCh: stopCh,
client: httpClient,
}
}
// Start does nothing in a HTTP client
func (d *client) Start(ctx context.Context) error {
// nothing to do
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
// Stop shuts down the client and interrupt any in-flight request.
func (d *client) Stop(ctx context.Context) error {
Fix goroutine leaks in otlptracegrpc testing (#2409) * Fail in RunEndToEndTest if collector stop fails * Use testing T.Cleanup to check shut downs * Add goroutine leak detection * Fix TestExporterShutdown go leak The shutdown tests checking if a context error is honored did not completely clean up the resources used by the client after the error was evaluated. Update the connection client to handle multiple calls to shutdown and make a second call to these clients that must succeed so the test does not have abandoned goroutines. * Fix leak in TestNew_WithTimeout The mockTraceService did not delay with its lock being held. This resulted in the mockCollector stopping and being able to acquire the lock. It was assumed that no export was taking place because of this and the mockTraceService was abandoned without cleaning up resources it held and goroutines it had spawned. This reworks the export blocking logic to block on a channel read. This will make the block more deterministic and not depend on the scheduler timing. Additionally, this blocking is moved inside the lock acquire. Meaning code will deadlock if the block is not released before a shutdown (something the developer will immediately be aware of when they submit a bad patch), and will ensure all resources are released before shutdown. Replace TestNew_WithTimeout with TestExportSpansTimeoutHonored which directly tests if a span export errors when the timeout is reached. This is the only unique thing that TestNew_WithTimeout, but it also tests the non-error path. That non-error path is tested in many other tests. * Guard otlptracehttp client stopCh when stopping In normal operations the exporter is guaranteed to only ever call the client Stop method once. However in testing we need to call this multiple times when checking it returns an error in particular context. Add a lightweight sync.Once to the closing of the stopCh to ensure tests do not panic when cleaning up. * Release export block after export Prevent deadlock in TestExportSpansTimeoutHonored.
2021-11-22 17:54:32 +02:00
d.stopOnce.Do(func() {
close(d.stopCh)
})
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
// UploadTraces sends a batch of spans to the collector.
func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
pbRequest := &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
}
rawRequest, err := proto.Marshal(pbRequest)
if err != nil {
return err
}
ctx, cancel := d.contextWithStop(ctx)
defer cancel()
request, err := d.newRequest(rawRequest)
if err != nil {
return err
}
return d.requestFunc(ctx, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
request.reset(ctx)
resp, err := d.client.Do(request.Request)
if err != nil {
return err
}
var rErr error
switch resp.StatusCode {
case http.StatusOK:
// Success, do not retry.
case http.StatusTooManyRequests,
http.StatusServiceUnavailable:
// Retry-able failure.
rErr = newResponseError(resp.Header)
// Going to retry, drain the body to reuse the connection.
if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil {
_ = resp.Body.Close()
return err
}
default:
rErr = fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status)
}
if err := resp.Body.Close(); err != nil {
return err
}
return rErr
})
}
func (d *client) newRequest(body []byte) (request, error) {
u := url.URL{Scheme: d.getScheme(), Host: d.cfg.Endpoint, Path: d.cfg.URLPath}
r, err := http.NewRequest(http.MethodPost, u.String(), nil)
if err != nil {
return request{Request: r}, err
}
for k, v := range d.cfg.Headers {
r.Header.Set(k, v)
}
r.Header.Set("Content-Type", contentTypeProto)
req := request{Request: r}
switch Compression(d.cfg.Compression) {
case NoCompression:
r.ContentLength = (int64)(len(body))
req.bodyReader = bodyReader(body)
case GzipCompression:
// Ensure the content length is not used.
r.ContentLength = -1
r.Header.Set("Content-Encoding", "gzip")
gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)
var b bytes.Buffer
gz.Reset(&b)
if _, err := gz.Write(body); err != nil {
return req, err
}
// Close needs to be called to ensure body if fully written.
if err := gz.Close(); err != nil {
return req, err
}
req.bodyReader = bodyReader(b.Bytes())
}
return req, nil
}
// bodyReader returns a closure returning a new reader for buf.
func bodyReader(buf []byte) func() io.ReadCloser {
return func() io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader(buf))
}
}
// request wraps an http.Request with a resettable body reader.
type request struct {
*http.Request
// bodyReader allows the same body to be used for multiple requests.
bodyReader func() io.ReadCloser
}
// reset reinitializes the request Body and uses ctx for the request.
func (r *request) reset(ctx context.Context) {
r.Body = r.bodyReader()
r.Request = r.Request.WithContext(ctx)
}
// retryableError represents a request failure that can be retried.
type retryableError struct {
throttle int64
}
// newResponseError returns a retryableError and will extract any explicit
// throttle delay contained in headers.
func newResponseError(header http.Header) error {
var rErr retryableError
if s, ok := header["Retry-After"]; ok {
if t, err := strconv.ParseInt(s[0], 10, 64); err == nil {
rErr.throttle = t
}
}
return rErr
}
func (e retryableError) Error() string {
return "retry-able request failure"
}
// evaluate returns if err is retry-able. If it is and it includes an explicit
// throttling delay, that delay is also returned.
func evaluate(err error) (bool, time.Duration) {
if err == nil {
return false, 0
}
rErr, ok := err.(retryableError)
if !ok {
return false, 0
}
return true, time.Duration(rErr.throttle)
}
func (d *client) getScheme() string {
if d.cfg.Insecure {
return "http"
}
return "https"
}
func (d *client) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the client's stop
// channel.
ctx, cancel := context.WithCancel(ctx)
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
// Nothing to do, either cancelled or deadline
// happened.
case <-d.stopCh:
cancel()
}
}(ctx, cancel)
return ctx, cancel
}