1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-28 03:57:09 +02:00
Tyler Yahn fdfc821bac
Add godot linter to golangci (#2845)
Comment should be complete sentences outside of lists with sentence
fragments. This adds the godot linter to check these complete sentences
end with punctuation. If they do not, running fix will append a period.
2022-04-25 13:22:49 -07:00

302 lines
7.4 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpmetrichttp // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"strconv"
"sync"
"time"
"google.golang.org/protobuf/proto"
"go.opentelemetry.io/otel/exporters/otlp/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpconfig"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
const contentTypeProto = "application/x-protobuf"
var gzPool = sync.Pool{
New: func() interface{} {
w := gzip.NewWriter(ioutil.Discard)
return w
},
}
// Keep it in sync with golang's DefaultTransport from net/http! We
// have our own copy to avoid handling a situation where the
// DefaultTransport is overwritten with some different implementation
// of http.RoundTripper or it's modified by other package.
var ourTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
type client struct {
name string
cfg otlpconfig.SignalConfig
generalCfg otlpconfig.Config
requestFunc retry.RequestFunc
client *http.Client
stopCh chan struct{}
stopOnce sync.Once
}
// NewClient creates a new HTTP metric client.
func NewClient(opts ...Option) otlpmetric.Client {
cfg := otlpconfig.NewHTTPConfig(asHTTPOptions(opts)...)
httpClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.Metrics.Timeout,
}
if cfg.Metrics.TLSCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.Metrics.TLSCfg
httpClient.Transport = transport
}
stopCh := make(chan struct{})
return &client{
name: "metrics",
cfg: cfg.Metrics,
generalCfg: cfg,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
stopCh: stopCh,
client: httpClient,
}
}
// Start does nothing in a HTTP client.
func (d *client) Start(ctx context.Context) error {
// nothing to do
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
// Stop shuts down the client and interrupt any in-flight request.
func (d *client) Stop(ctx context.Context) error {
d.stopOnce.Do(func() {
close(d.stopCh)
})
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}
// UploadMetrics sends a batch of metrics to the collector.
func (d *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics) error {
pbRequest := &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricpb.ResourceMetrics{protoMetrics},
}
rawRequest, err := proto.Marshal(pbRequest)
if err != nil {
return err
}
ctx, cancel := d.contextWithStop(ctx)
defer cancel()
request, err := d.newRequest(rawRequest)
if err != nil {
return err
}
return d.requestFunc(ctx, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
request.reset(ctx)
resp, err := d.client.Do(request.Request)
if err != nil {
return err
}
var rErr error
switch resp.StatusCode {
case http.StatusOK:
// Success, do not retry.
case http.StatusTooManyRequests,
http.StatusServiceUnavailable:
// Retry-able failure.
rErr = newResponseError(resp.Header)
// Going to retry, drain the body to reuse the connection.
if _, err := io.Copy(ioutil.Discard, resp.Body); err != nil {
_ = resp.Body.Close()
return err
}
default:
rErr = fmt.Errorf("failed to send %s to %s: %s", d.name, request.URL, resp.Status)
}
if err := resp.Body.Close(); err != nil {
return err
}
return rErr
})
}
func (d *client) newRequest(body []byte) (request, error) {
u := url.URL{Scheme: d.getScheme(), Host: d.cfg.Endpoint, Path: d.cfg.URLPath}
r, err := http.NewRequest(http.MethodPost, u.String(), nil)
if err != nil {
return request{Request: r}, err
}
for k, v := range d.cfg.Headers {
r.Header.Set(k, v)
}
r.Header.Set("Content-Type", contentTypeProto)
req := request{Request: r}
switch Compression(d.cfg.Compression) {
case NoCompression:
r.ContentLength = (int64)(len(body))
req.bodyReader = bodyReader(body)
case GzipCompression:
// Ensure the content length is not used.
r.ContentLength = -1
r.Header.Set("Content-Encoding", "gzip")
gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)
var b bytes.Buffer
gz.Reset(&b)
if _, err := gz.Write(body); err != nil {
return req, err
}
// Close needs to be called to ensure body if fully written.
if err := gz.Close(); err != nil {
return req, err
}
req.bodyReader = bodyReader(b.Bytes())
}
return req, nil
}
// bodyReader returns a closure returning a new reader for buf.
func bodyReader(buf []byte) func() io.ReadCloser {
return func() io.ReadCloser {
return ioutil.NopCloser(bytes.NewReader(buf))
}
}
// request wraps an http.Request with a resettable body reader.
type request struct {
*http.Request
// bodyReader allows the same body to be used for multiple requests.
bodyReader func() io.ReadCloser
}
// reset reinitializes the request Body and uses ctx for the request.
func (r *request) reset(ctx context.Context) {
r.Body = r.bodyReader()
r.Request = r.Request.WithContext(ctx)
}
// retryableError represents a request failure that can be retried.
type retryableError struct {
throttle int64
}
// newResponseError returns a retryableError and will extract any explicit
// throttle delay contained in headers.
func newResponseError(header http.Header) error {
var rErr retryableError
if s, ok := header["Retry-After"]; ok {
if t, err := strconv.ParseInt(s[0], 10, 64); err == nil {
rErr.throttle = t
}
}
return rErr
}
func (e retryableError) Error() string {
return "retry-able request failure"
}
// evaluate returns if err is retry-able. If it is and it includes an explicit
// throttling delay, that delay is also returned.
func evaluate(err error) (bool, time.Duration) {
if err == nil {
return false, 0
}
rErr, ok := err.(retryableError)
if !ok {
return false, 0
}
return true, time.Duration(rErr.throttle)
}
func (d *client) getScheme() string {
if d.cfg.Insecure {
return "http"
}
return "https"
}
func (d *client) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the client's stop
// channel.
ctx, cancel := context.WithCancel(ctx)
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
// Nothing to do, either cancelled or deadline
// happened.
case <-d.stopCh:
cancel()
}
}(ctx, cancel)
return ctx, cancel
}