1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2024-12-12 10:04:29 +02:00
opentelemetry-go/exporters/otlp/otlphttp/mock_collector_test.go
Krzesimir Nowak 8d80981465
Move gRPC driver to a subpackage and add an HTTP driver (#1420)
* Move grpc stuff to separate package

* Drop duplicated retryable status code

* Set default port to 4317

This is what the specification says for both gRPC and HTTP.

* Document gRPC option type

* Add an HTTP protocol driver for OTLP exporter

Currently it supports only binary protobuf payloads.

* Move end to end test to a separate package

It also adds some common code mock collectors can use. This will be
useful for testing the HTTP driver.

* Move export data creators to otlptest

It also extends the one record checkpointer a bit. This will be useful
for testing the HTTP driver.

* Add an HTTP mock collector and tests for HTTP driver

* Update changelog

* Do not depend on DefaultTransport

We create our own instance of the transport, which is based on
golang's DefaultTransport. That way we sidestep the issue of the
DefaultTransport being modified/overwritten. We won't have any panics
at init. The cost of it is to keep the transport fields in sync with
DefaultTransport.

* Read the whole response body before closing it

This may help with connection reuse.

* Change options to conform to our style guide

* Add jitter to backoff time

* Test TLS option

* Test extra headers

* Fix a comment

* Increase coverage

* Add a source of the backoff strategy
2021-01-11 22:55:24 -05:00

273 lines
7.1 KiB
Go

// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttp_test
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
collectormetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1"
collectortracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
tracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/trace/v1"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlptest"
"go.opentelemetry.io/otel/exporters/otlp/otlphttp"
)
type mockCollector struct {
endpoint string
server *http.Server
spanLock sync.Mutex
spansStorage otlptest.SpansStorage
metricLock sync.Mutex
metricsStorage otlptest.MetricsStorage
injectHTTPStatus []int
injectContentType string
clientTLSConfig *tls.Config
expectedHeaders map[string]string
}
func (c *mockCollector) Stop() error {
return c.server.Shutdown(context.Background())
}
func (c *mockCollector) MustStop(t *testing.T) {
assert.NoError(t, c.server.Shutdown(context.Background()))
}
func (c *mockCollector) GetSpans() []*tracepb.Span {
c.spanLock.Lock()
defer c.spanLock.Unlock()
return c.spansStorage.GetSpans()
}
func (c *mockCollector) GetResourceSpans() []*tracepb.ResourceSpans {
c.spanLock.Lock()
defer c.spanLock.Unlock()
return c.spansStorage.GetResourceSpans()
}
func (c *mockCollector) GetMetrics() []*metricpb.Metric {
c.metricLock.Lock()
defer c.metricLock.Unlock()
return c.metricsStorage.GetMetrics()
}
func (c *mockCollector) Endpoint() string {
return c.endpoint
}
func (c *mockCollector) ClientTLSConfig() *tls.Config {
return c.clientTLSConfig
}
func (c *mockCollector) serveMetrics(w http.ResponseWriter, r *http.Request) {
if !c.checkHeaders(r) {
w.WriteHeader(http.StatusBadRequest)
return
}
response := collectormetricpb.ExportMetricsServiceResponse{}
rawResponse, err := response.Marshal()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 {
writeReply(w, rawResponse, injectedStatus, c.injectContentType)
return
}
rawRequest, err := readRequest(r)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
request := collectormetricpb.ExportMetricsServiceRequest{}
if err := request.Unmarshal(rawRequest); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
writeReply(w, rawResponse, 0, c.injectContentType)
c.metricLock.Lock()
defer c.metricLock.Unlock()
c.metricsStorage.AddMetrics(&request)
}
func (c *mockCollector) serveTraces(w http.ResponseWriter, r *http.Request) {
if !c.checkHeaders(r) {
w.WriteHeader(http.StatusBadRequest)
return
}
response := collectortracepb.ExportTraceServiceResponse{}
rawResponse, err := response.Marshal()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if injectedStatus := c.getInjectHTTPStatus(); injectedStatus != 0 {
writeReply(w, rawResponse, injectedStatus, c.injectContentType)
return
}
rawRequest, err := readRequest(r)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
request := collectortracepb.ExportTraceServiceRequest{}
if err := request.Unmarshal(rawRequest); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
writeReply(w, rawResponse, 0, c.injectContentType)
c.spanLock.Lock()
defer c.spanLock.Unlock()
c.spansStorage.AddSpans(&request)
}
func (c *mockCollector) checkHeaders(r *http.Request) bool {
for k, v := range c.expectedHeaders {
got := r.Header.Get(k)
if got != v {
return false
}
}
return true
}
func (c *mockCollector) getInjectHTTPStatus() int {
if len(c.injectHTTPStatus) == 0 {
return 0
}
status := c.injectHTTPStatus[0]
c.injectHTTPStatus = c.injectHTTPStatus[1:]
if len(c.injectHTTPStatus) == 0 {
c.injectHTTPStatus = nil
}
return status
}
func readRequest(r *http.Request) ([]byte, error) {
if r.Header.Get("Content-Encoding") == "gzip" {
return readGzipBody(r.Body)
}
return ioutil.ReadAll(r.Body)
}
func readGzipBody(body io.Reader) ([]byte, error) {
rawRequest := bytes.Buffer{}
gunzipper, err := gzip.NewReader(body)
if err != nil {
return nil, err
}
defer gunzipper.Close()
_, err = io.Copy(&rawRequest, gunzipper)
if err != nil {
return nil, err
}
return rawRequest.Bytes(), nil
}
func writeReply(w http.ResponseWriter, rawResponse []byte, injectHTTPStatus int, injectContentType string) {
status := http.StatusOK
if injectHTTPStatus != 0 {
status = injectHTTPStatus
}
contentType := "application/x-protobuf"
if injectContentType != "" {
contentType = injectContentType
}
w.Header().Set("Content-Type", contentType)
w.WriteHeader(status)
_, _ = w.Write(rawResponse)
}
type mockCollectorConfig struct {
MetricsURLPath string
TracesURLPath string
Port int
InjectHTTPStatus []int
InjectContentType string
WithTLS bool
ExpectedHeaders map[string]string
}
func (c *mockCollectorConfig) fillInDefaults() {
if c.MetricsURLPath == "" {
c.MetricsURLPath = otlphttp.DefaultMetricsPath
}
if c.TracesURLPath == "" {
c.TracesURLPath = otlphttp.DefaultTracesPath
}
}
func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector {
cfg.fillInDefaults()
ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port))
require.NoError(t, err)
_, portStr, err := net.SplitHostPort(ln.Addr().String())
require.NoError(t, err)
m := &mockCollector{
endpoint: fmt.Sprintf("localhost:%s", portStr),
spansStorage: otlptest.NewSpansStorage(),
metricsStorage: otlptest.NewMetricsStorage(),
injectHTTPStatus: cfg.InjectHTTPStatus,
injectContentType: cfg.InjectContentType,
expectedHeaders: cfg.ExpectedHeaders,
}
mux := http.NewServeMux()
mux.Handle(cfg.MetricsURLPath, http.HandlerFunc(m.serveMetrics))
mux.Handle(cfg.TracesURLPath, http.HandlerFunc(m.serveTraces))
server := &http.Server{
Handler: mux,
}
if cfg.WithTLS {
pem, err := generateWeakCertificate()
require.NoError(t, err)
tlsCertificate, err := tls.X509KeyPair(pem.Certificate, pem.PrivateKey)
require.NoError(t, err)
server.TLSConfig = &tls.Config{
Certificates: []tls.Certificate{tlsCertificate},
}
m.clientTLSConfig = &tls.Config{
InsecureSkipVerify: true,
}
}
go func() {
if cfg.WithTLS {
_ = server.ServeTLS(ln, "", "")
} else {
_ = server.Serve(ln)
}
}()
m.server = server
return m
}