1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-30 04:40:41 +02:00

add support for env var configuration to otlp/gRPC (#1811)

* move options to `otlpconfig` internal package

* add support for env configs on otel/gRPC

* remove duplicate code

* refactor options

* format imports

* move marshal option to oltphttp

* clone tls configs and tests grpc certificates

* add more context to http errors

* add todo

* add entry to changelog

* update changelog with pr number

* apply suggestions

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Gustavo Silva Paiva 2021-04-16 18:52:24 -03:00 committed by GitHub
parent d616df61f5
commit a2cecb6e80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1169 additions and 752 deletions

View File

@ -16,7 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Adds test to check BatchSpanProcessor ignores `OnEnd` and `ForceFlush` post `Shutdown`. (#1772)
- Option `ExportTimeout` was added to batch span processor. (#1755)
- Adds semantic conventions for exceptions. (#1492)
- Added support for configuring OTLP/HTTP Endpoints, Headers, Compression and Timeout via the Environment Variables. (#1758)
- Added support for configuring OTLP/HTTP and OTLP/gRPC Endpoints, TLS Certificates, Headers, Compression and Timeout via Environment Variables. (#1758, #1769 and #1811)
- `OTEL_EXPORTER_OTLP_ENDPOINT`
- `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`
- `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`
@ -29,7 +29,6 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `OTEL_EXPORTER_OTLP_TIMEOUT`
- `OTEL_EXPORTER_OTLP_TRACES_TIMEOUT`
- `OTEL_EXPORTER_OTLP_METRICS_TIMEOUT`
- Added support for configuring OTLP/HTTP TLS Certificates via the Environment Variables. (#1769)
- `OTEL_EXPORTER_OTLP_CERTIFICATE`
- `OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE`
- `OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE`

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttp
package otlpconfig
import (
"crypto/tls"
@ -24,37 +24,50 @@ import (
"strings"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig"
"go.opentelemetry.io/otel"
)
func applyEnvConfigs(cfg *config) {
e := envOptionsReader{
getEnv: os.Getenv,
readFile: ioutil.ReadFile,
func ApplyGRPCEnvConfigs(cfg *Config) {
e := EnvOptionsReader{
GetEnv: os.Getenv,
ReadFile: ioutil.ReadFile,
}
opts := e.getOptionsFromEnv()
e.ApplyGRPCEnvConfigs(cfg)
}
func ApplyHTTPEnvConfigs(cfg *Config) {
e := EnvOptionsReader{
GetEnv: os.Getenv,
ReadFile: ioutil.ReadFile,
}
e.ApplyHTTPEnvConfigs(cfg)
}
type EnvOptionsReader struct {
GetEnv func(string) string
ReadFile func(filename string) ([]byte, error)
}
func (e *EnvOptionsReader) ApplyHTTPEnvConfigs(cfg *Config) {
opts := e.GetOptionsFromEnv()
for _, opt := range opts {
opt.Apply(cfg)
opt.ApplyHTTPOption(cfg)
}
}
type envOptionsReader struct {
getEnv func(string) string
readFile func(filename string) ([]byte, error)
}
func (e *envOptionsReader) applyEnvConfigs(cfg *config) {
opts := e.getOptionsFromEnv()
func (e *EnvOptionsReader) ApplyGRPCEnvConfigs(cfg *Config) {
opts := e.GetOptionsFromEnv()
for _, opt := range opts {
opt.Apply(cfg)
opt.ApplyGRPCOption(cfg)
}
}
func (e *envOptionsReader) getOptionsFromEnv() []Option {
var opts []Option
func (e *EnvOptionsReader) GetOptionsFromEnv() []GenericOption {
var opts []GenericOption
// Endpoint
if v, ok := e.getEnvValue("ENDPOINT"); ok {
@ -132,28 +145,28 @@ func (e *envOptionsReader) getOptionsFromEnv() []Option {
return opts
}
// getEnvValue gets an OTLP environment variable value of the specified key using the getEnv function.
// getEnvValue gets an OTLP environment variable value of the specified key using the GetEnv function.
// This function already prepends the OTLP prefix to all key lookup.
func (e *envOptionsReader) getEnvValue(key string) (string, bool) {
v := strings.TrimSpace(e.getEnv(fmt.Sprintf("OTEL_EXPORTER_OTLP_%s", key)))
func (e *EnvOptionsReader) getEnvValue(key string) (string, bool) {
v := strings.TrimSpace(e.GetEnv(fmt.Sprintf("OTEL_EXPORTER_OTLP_%s", key)))
return v, v != ""
}
func (e *envOptionsReader) readTLSConfig(path string) (*tls.Config, error) {
b, err := e.readFile(path)
func (e *EnvOptionsReader) readTLSConfig(path string) (*tls.Config, error) {
b, err := e.ReadFile(path)
if err != nil {
return nil, err
}
return otlpconfig.CreateTLSConfig(b)
return CreateTLSConfig(b)
}
func stringToCompression(value string) Compression {
func stringToCompression(value string) otlp.Compression {
switch value {
case "gzip":
return GzipCompression
return otlp.GzipCompression
}
return NoCompression
return otlp.NoCompression
}
func stringToHeader(value string) map[string]string {

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttp
package otlpconfig
import (
"reflect"

View File

@ -0,0 +1,376 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig"
import (
"crypto/tls"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"go.opentelemetry.io/otel/exporters/otlp"
)
const (
// DefaultMaxAttempts describes how many times the driver
// should retry the sending of the payload in case of a
// retryable error.
DefaultMaxAttempts int = 5
// DefaultTracesPath is a default URL path for endpoint that
// receives spans.
DefaultTracesPath string = "/v1/traces"
// DefaultMetricsPath is a default URL path for endpoint that
// receives metrics.
DefaultMetricsPath string = "/v1/metrics"
// DefaultBackoff is a default base backoff time used in the
// exponential backoff strategy.
DefaultBackoff time.Duration = 300 * time.Millisecond
// DefaultTimeout is a default max waiting time for the backend to process
// each span or metrics batch.
DefaultTimeout time.Duration = 10 * time.Second
// DefaultServiceConfig is the gRPC service config used if none is
// provided by the user.
DefaultServiceConfig = `{
"methodConfig":[{
"name":[
{ "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" },
{ "service":"opentelemetry.proto.collector.trace.v1.TraceService" }
],
"retryPolicy":{
"MaxAttempts":5,
"InitialBackoff":"0.3s",
"MaxBackoff":"5s",
"BackoffMultiplier":2,
"RetryableStatusCodes":[
"CANCELLED",
"DEADLINE_EXCEEDED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"UNAVAILABLE",
"DATA_LOSS"
]
}
}]
}`
)
type (
SignalConfig struct {
Endpoint string
Insecure bool
TLSCfg *tls.Config
Headers map[string]string
Compression otlp.Compression
Timeout time.Duration
URLPath string
// gRPC configurations
GRPCCredentials credentials.TransportCredentials
}
Config struct {
// Signal specific configurations
Metrics SignalConfig
Traces SignalConfig
// General configurations
MaxAttempts int
Backoff time.Duration
// HTTP configuration
Marshaler otlp.Marshaler
// gRPC configurations
ReconnectionPeriod time.Duration
ServiceConfig string
DialOptions []grpc.DialOption
}
)
func NewDefaultConfig() Config {
c := Config{
Traces: SignalConfig{
Endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
URLPath: DefaultTracesPath,
Compression: otlp.NoCompression,
Timeout: DefaultTimeout,
},
Metrics: SignalConfig{
Endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
URLPath: DefaultMetricsPath,
Compression: otlp.NoCompression,
Timeout: DefaultTimeout,
},
MaxAttempts: DefaultMaxAttempts,
Backoff: DefaultBackoff,
ServiceConfig: DefaultServiceConfig,
}
return c
}
type (
// GenericOption applies an option to the HTTP or gRPC driver.
GenericOption interface {
ApplyHTTPOption(*Config)
ApplyGRPCOption(*Config)
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
// HTTPOption applies an option to the HTTP driver.
HTTPOption interface {
ApplyHTTPOption(*Config)
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
// GRPCOption applies an option to the gRPC driver.
GRPCOption interface {
ApplyGRPCOption(*Config)
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
)
// genericOption is an option that applies the same logic
// for both gRPC and HTTP.
type genericOption struct {
fn func(*Config)
}
func (g *genericOption) ApplyGRPCOption(cfg *Config) {
g.fn(cfg)
}
func (g *genericOption) ApplyHTTPOption(cfg *Config) {
g.fn(cfg)
}
func (genericOption) private() {}
func newGenericOption(fn func(cfg *Config)) GenericOption {
return &genericOption{fn: fn}
}
// splitOption is an option that applies different logics
// for gRPC and HTTP.
type splitOption struct {
httpFn func(*Config)
grpcFn func(*Config)
}
func (g *splitOption) ApplyGRPCOption(cfg *Config) {
g.grpcFn(cfg)
}
func (g *splitOption) ApplyHTTPOption(cfg *Config) {
g.httpFn(cfg)
}
func (splitOption) private() {}
func newSplitOption(httpFn func(cfg *Config), grpcFn func(cfg *Config)) GenericOption {
return &splitOption{httpFn: httpFn, grpcFn: grpcFn}
}
// httpOption is an option that is only applied to the HTTP driver.
type httpOption struct {
fn func(*Config)
}
func (h *httpOption) ApplyHTTPOption(cfg *Config) {
h.fn(cfg)
}
func (httpOption) private() {}
func NewHTTPOption(fn func(cfg *Config)) HTTPOption {
return &httpOption{fn: fn}
}
// grpcOption is an option that is only applied to the gRPC driver.
type grpcOption struct {
fn func(*Config)
}
func (h *grpcOption) ApplyGRPCOption(cfg *Config) {
h.fn(cfg)
}
func (grpcOption) private() {}
func NewGRPCOption(fn func(cfg *Config)) GRPCOption {
return &grpcOption{fn: fn}
}
// Generic Options
func WithEndpoint(endpoint string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Endpoint = endpoint
cfg.Metrics.Endpoint = endpoint
})
}
func WithTracesEndpoint(endpoint string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Endpoint = endpoint
})
}
func WithMetricsEndpoint(endpoint string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Metrics.Endpoint = endpoint
})
}
func WithCompression(compression otlp.Compression) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Compression = compression
cfg.Metrics.Compression = compression
})
}
func WithTracesCompression(compression otlp.Compression) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Compression = compression
})
}
func WithMetricsCompression(compression otlp.Compression) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Metrics.Compression = compression
})
}
func WithTracesURLPath(urlPath string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.URLPath = urlPath
})
}
func WithMetricsURLPath(urlPath string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Metrics.URLPath = urlPath
})
}
func WithMaxAttempts(maxAttempts int) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.MaxAttempts = maxAttempts
})
}
func WithBackoff(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Backoff = duration
})
}
func WithTLSClientConfig(tlsCfg *tls.Config) GenericOption {
return newSplitOption(func(cfg *Config) {
cfg.Traces.TLSCfg = tlsCfg.Clone()
cfg.Metrics.TLSCfg = tlsCfg.Clone()
}, func(cfg *Config) {
cfg.Traces.GRPCCredentials = credentials.NewTLS(tlsCfg)
cfg.Metrics.GRPCCredentials = credentials.NewTLS(tlsCfg)
})
}
func WithTracesTLSClientConfig(tlsCfg *tls.Config) GenericOption {
return newSplitOption(func(cfg *Config) {
cfg.Traces.TLSCfg = tlsCfg.Clone()
}, func(cfg *Config) {
cfg.Traces.GRPCCredentials = credentials.NewTLS(tlsCfg)
})
}
func WithMetricsTLSClientConfig(tlsCfg *tls.Config) GenericOption {
return newSplitOption(func(cfg *Config) {
cfg.Metrics.TLSCfg = tlsCfg.Clone()
}, func(cfg *Config) {
cfg.Metrics.GRPCCredentials = credentials.NewTLS(tlsCfg)
})
}
func WithInsecure() GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Insecure = true
cfg.Metrics.Insecure = true
})
}
func WithInsecureTraces() GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Insecure = true
})
}
func WithInsecureMetrics() GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Metrics.Insecure = true
})
}
func WithHeaders(headers map[string]string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Headers = headers
cfg.Metrics.Headers = headers
})
}
func WithTracesHeaders(headers map[string]string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Headers = headers
})
}
func WithMetricsHeaders(headers map[string]string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Metrics.Headers = headers
})
}
func WithTimeout(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Timeout = duration
cfg.Metrics.Timeout = duration
})
}
func WithTracesTimeout(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Timeout = duration
})
}
func WithMetricsTimeout(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Metrics.Timeout = duration
})
}

View File

@ -0,0 +1,425 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig
import (
"crypto/tls"
"crypto/x509"
"errors"
"testing"
"time"
"go.opentelemetry.io/otel/exporters/otlp"
"github.com/stretchr/testify/assert"
)
type env map[string]string
func (e *env) getEnv(env string) string {
return (*e)[env]
}
type fileReader map[string][]byte
func (f *fileReader) readFile(filename string) ([]byte, error) {
if b, ok := (*f)[filename]; ok {
return b, nil
}
return nil, errors.New("File not found")
}
func TestConfigs(t *testing.T) {
tlsCert, err := CreateTLSConfig([]byte(WeakCertificate))
assert.NoError(t, err)
tests := []struct {
name string
opts []GenericOption
env env
fileReader fileReader
asserts func(t *testing.T, c *Config, grpcOption bool)
}{
{
name: "Test default configs",
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "localhost:4317", c.Traces.Endpoint)
assert.Equal(t, "localhost:4317", c.Metrics.Endpoint)
assert.Equal(t, otlp.NoCompression, c.Traces.Compression)
assert.Equal(t, otlp.NoCompression, c.Metrics.Compression)
assert.Equal(t, map[string]string(nil), c.Traces.Headers)
assert.Equal(t, map[string]string(nil), c.Metrics.Headers)
assert.Equal(t, 10*time.Second, c.Traces.Timeout)
assert.Equal(t, 10*time.Second, c.Metrics.Timeout)
},
},
// Endpoint Tests
{
name: "Test With Endpoint",
opts: []GenericOption{
WithEndpoint("someendpoint"),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "someendpoint", c.Traces.Endpoint)
assert.Equal(t, "someendpoint", c.Metrics.Endpoint)
},
},
{
name: "Test With Signal Specific Endpoint",
opts: []GenericOption{
WithEndpoint("overrode_by_signal_specific"),
WithTracesEndpoint("traces_endpoint"),
WithMetricsEndpoint("metrics_endpoint"),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "traces_endpoint", c.Traces.Endpoint)
assert.Equal(t, "metrics_endpoint", c.Metrics.Endpoint)
},
},
{
name: "Test Environment Endpoint",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "env_endpoint",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "env_endpoint", c.Traces.Endpoint)
assert.Equal(t, "env_endpoint", c.Metrics.Endpoint)
},
},
{
name: "Test Environment Signal Specific Endpoint",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "overrode_by_signal_specific",
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "env_traces_endpoint",
"OTEL_EXPORTER_OTLP_METRICS_ENDPOINT": "env_metrics_endpoint",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "env_traces_endpoint", c.Traces.Endpoint)
assert.Equal(t, "env_metrics_endpoint", c.Metrics.Endpoint)
},
},
{
name: "Test Mixed Environment and With Endpoint",
opts: []GenericOption{
WithTracesEndpoint("traces_endpoint"),
},
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "env_endpoint",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "traces_endpoint", c.Traces.Endpoint)
assert.Equal(t, "env_endpoint", c.Metrics.Endpoint)
},
},
// Certificate tests
{
name: "Test With Certificate",
opts: []GenericOption{
WithTLSClientConfig(tlsCert),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
//TODO: make sure gRPC's credentials actually works
assert.NotNil(t, c.Traces.GRPCCredentials)
assert.NotNil(t, c.Metrics.GRPCCredentials)
} else {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects())
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Metrics.TLSCfg.RootCAs.Subjects())
}
},
},
{
name: "Test With Signal Specific Certificate",
opts: []GenericOption{
WithTLSClientConfig(&tls.Config{}),
WithTracesTLSClientConfig(tlsCert),
WithMetricsTLSClientConfig(&tls.Config{RootCAs: x509.NewCertPool()}),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
assert.NotNil(t, c.Traces.GRPCCredentials)
assert.NotNil(t, c.Metrics.GRPCCredentials)
} else {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects())
assert.Equal(t, 0, len(c.Metrics.TLSCfg.RootCAs.Subjects()))
}
},
},
{
name: "Test Environment Certificate",
env: map[string]string{
"OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path",
},
fileReader: fileReader{
"cert_path": []byte(WeakCertificate),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
assert.NotNil(t, c.Traces.GRPCCredentials)
assert.NotNil(t, c.Metrics.GRPCCredentials)
} else {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects())
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Metrics.TLSCfg.RootCAs.Subjects())
}
},
},
{
name: "Test Environment Signal Specific Certificate",
env: map[string]string{
"OTEL_EXPORTER_OTLP_CERTIFICATE": "overrode_by_signal_specific",
"OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE": "cert_path",
"OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE": "invalid_cert",
},
fileReader: fileReader{
"cert_path": []byte(WeakCertificate),
"invalid_cert": []byte("invalid certificate file."),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
assert.NotNil(t, c.Traces.GRPCCredentials)
assert.Nil(t, c.Metrics.GRPCCredentials)
} else {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects())
assert.Equal(t, (*tls.Config)(nil), c.Metrics.TLSCfg)
}
},
},
{
name: "Test Mixed Environment and With Certificate",
opts: []GenericOption{
WithMetricsTLSClientConfig(&tls.Config{RootCAs: x509.NewCertPool()}),
},
env: map[string]string{
"OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path",
},
fileReader: fileReader{
"cert_path": []byte(WeakCertificate),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
assert.NotNil(t, c.Traces.GRPCCredentials)
assert.NotNil(t, c.Metrics.GRPCCredentials)
} else {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects())
assert.Equal(t, 0, len(c.Metrics.TLSCfg.RootCAs.Subjects()))
}
},
},
// Headers tests
{
name: "Test With Headers",
opts: []GenericOption{
WithHeaders(map[string]string{"h1": "v1"}),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, map[string]string{"h1": "v1"}, c.Metrics.Headers)
assert.Equal(t, map[string]string{"h1": "v1"}, c.Traces.Headers)
},
},
{
name: "Test With Signal Specific Headers",
opts: []GenericOption{
WithHeaders(map[string]string{"overrode": "by_signal_specific"}),
WithMetricsHeaders(map[string]string{"m1": "mv1"}),
WithTracesHeaders(map[string]string{"t1": "tv1"}),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, map[string]string{"m1": "mv1"}, c.Metrics.Headers)
assert.Equal(t, map[string]string{"t1": "tv1"}, c.Traces.Headers)
},
},
{
name: "Test Environment Headers",
env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Metrics.Headers)
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Traces.Headers)
},
},
{
name: "Test Environment Signal Specific Headers",
env: map[string]string{
"OTEL_EXPORTER_OTLP_HEADERS": "overrode_by_signal_specific",
"OTEL_EXPORTER_OTLP_TRACES_HEADERS": "h1=v1,h2=v2",
"OTEL_EXPORTER_OTLP_METRICS_HEADERS": "h1=v1,h2=v2",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Metrics.Headers)
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Traces.Headers)
},
},
{
name: "Test Mixed Environment and With Headers",
env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"},
opts: []GenericOption{
WithMetricsHeaders(map[string]string{"m1": "mv1"}),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, map[string]string{"m1": "mv1"}, c.Metrics.Headers)
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Traces.Headers)
},
},
// Compression Tests
{
name: "Test With Compression",
opts: []GenericOption{
WithCompression(otlp.GzipCompression),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, otlp.GzipCompression, c.Traces.Compression)
assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression)
},
},
{
name: "Test With Signal Specific Compression",
opts: []GenericOption{
WithCompression(otlp.NoCompression), // overrode by signal specific configs
WithTracesCompression(otlp.GzipCompression),
WithMetricsCompression(otlp.GzipCompression),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, otlp.GzipCompression, c.Traces.Compression)
assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression)
},
},
{
name: "Test Environment Compression",
env: map[string]string{
"OTEL_EXPORTER_OTLP_COMPRESSION": "gzip",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, otlp.GzipCompression, c.Traces.Compression)
assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression)
},
},
{
name: "Test Environment Signal Specific Compression",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip",
"OTEL_EXPORTER_OTLP_METRICS_COMPRESSION": "gzip",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, otlp.GzipCompression, c.Traces.Compression)
assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression)
},
},
{
name: "Test Mixed Environment and With Compression",
opts: []GenericOption{
WithTracesCompression(otlp.NoCompression),
},
env: map[string]string{
"OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip",
"OTEL_EXPORTER_OTLP_METRICS_COMPRESSION": "gzip",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, otlp.NoCompression, c.Traces.Compression)
assert.Equal(t, otlp.GzipCompression, c.Metrics.Compression)
},
},
// Timeout Tests
{
name: "Test With Timeout",
opts: []GenericOption{
WithTimeout(time.Duration(5 * time.Second)),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, 5*time.Second, c.Traces.Timeout)
assert.Equal(t, 5*time.Second, c.Metrics.Timeout)
},
},
{
name: "Test With Signal Specific Timeout",
opts: []GenericOption{
WithTimeout(time.Duration(5 * time.Second)),
WithTracesTimeout(time.Duration(13 * time.Second)),
WithMetricsTimeout(time.Duration(14 * time.Second)),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, 13*time.Second, c.Traces.Timeout)
assert.Equal(t, 14*time.Second, c.Metrics.Timeout)
},
},
{
name: "Test Environment Timeout",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, c.Metrics.Timeout, 15*time.Second)
assert.Equal(t, c.Traces.Timeout, 15*time.Second)
},
},
{
name: "Test Environment Signal Specific Timeout",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
"OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000",
"OTEL_EXPORTER_OTLP_METRICS_TIMEOUT": "28000",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, c.Traces.Timeout, 27*time.Second)
assert.Equal(t, c.Metrics.Timeout, 28*time.Second)
},
},
{
name: "Test Mixed Environment and With Timeout",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
"OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000",
"OTEL_EXPORTER_OTLP_METRICS_TIMEOUT": "28000",
},
opts: []GenericOption{
WithTracesTimeout(5 * time.Second),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, c.Traces.Timeout, 5*time.Second)
assert.Equal(t, c.Metrics.Timeout, 28*time.Second)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := EnvOptionsReader{
GetEnv: tt.env.getEnv,
ReadFile: tt.fileReader.readFile,
}
// Tests Generic options as HTTP Options
cfg := NewDefaultConfig()
e.ApplyHTTPEnvConfigs(&cfg)
for _, opt := range tt.opts {
opt.ApplyHTTPOption(&cfg)
}
tt.asserts(t, &cfg, false)
// Tests Generic options as gRPC Options
cfg = NewDefaultConfig()
e.ApplyGRPCEnvConfigs(&cfg)
for _, opt := range tt.opts {
opt.ApplyGRPCOption(&cfg)
}
tt.asserts(t, &cfg, true)
})
}
}

View File

@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlp // import "go.opentelemetry.io/otel/exporters/otlp"
// Compression describes the compression used for payloads sent to the
// collector.
type Compression int
const (
// NoCompression tells the driver to send payloads without
// compression.
NoCompression Compression = iota
// GzipCompression tells the driver to send payloads after
// compressing them with gzip.
GzipCompression
)
// Marshaler describes the kind of message format sent to the collector
type Marshaler int
const (
// MarshalProto tells the driver to send using the protobuf binary format.
MarshalProto Marshaler = iota
// MarshalJSON tells the driver to send using json format.
MarshalJSON
)

View File

@ -22,6 +22,11 @@ import (
"time"
"unsafe"
"google.golang.org/grpc/encoding/gzip"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
@ -36,7 +41,8 @@ type connection struct {
cc *grpc.ClientConn
// these fields are read-only after constructor is finished
cfg config
cfg otlpconfig.Config
sCfg otlpconfig.SignalConfig
metadata metadata.MD
newConnectionHandler func(cc *grpc.ClientConn)
@ -51,12 +57,13 @@ type connection struct {
closeBackgroundConnectionDoneCh func(ch chan struct{})
}
func newConnection(cfg config, handler func(cc *grpc.ClientConn)) *connection {
func newConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler func(cc *grpc.ClientConn)) *connection {
c := new(connection)
c.newConnectionHandler = handler
c.cfg = cfg
if len(c.cfg.headers) > 0 {
c.metadata = metadata.New(c.cfg.headers)
c.sCfg = sCfg
if len(c.sCfg.Headers) > 0 {
c.metadata = metadata.New(c.sCfg.Headers)
}
c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) {
close(ch)
@ -117,7 +124,7 @@ func (c *connection) indefiniteBackgroundConnection() {
c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh)
}()
connReattemptPeriod := c.cfg.reconnectionPeriod
connReattemptPeriod := c.cfg.ReconnectionPeriod
if connReattemptPeriod <= 0 {
connReattemptPeriod = defaultConnReattemptPeriod
}
@ -204,28 +211,26 @@ func (c *connection) setConnection(cc *grpc.ClientConn) bool {
}
func (c *connection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) {
endpoint := c.cfg.collectorEndpoint
dialOpts := []grpc.DialOption{}
if c.cfg.serviceConfig != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.serviceConfig))
if c.cfg.ServiceConfig != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.ServiceConfig))
}
if c.cfg.clientCredentials != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.cfg.clientCredentials))
} else if c.cfg.canDialInsecure {
if c.sCfg.GRPCCredentials != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.sCfg.GRPCCredentials))
} else if c.sCfg.Insecure {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
if c.cfg.compressor != "" {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(c.cfg.compressor)))
if c.sCfg.Compression == otlp.GzipCompression {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
}
if len(c.cfg.dialOptions) != 0 {
dialOpts = append(dialOpts, c.cfg.dialOptions...)
if len(c.cfg.DialOptions) != 0 {
dialOpts = append(dialOpts, c.cfg.DialOptions...)
}
ctx, cancel := c.contextWithStop(ctx)
defer cancel()
ctx = c.contextWithMetadata(ctx)
return grpc.DialContext(ctx, endpoint, dialOpts...)
return grpc.DialContext(ctx, c.sCfg.Endpoint, dialOpts...)
}
func (c *connection) contextWithMetadata(ctx context.Context) context.Context {

View File

@ -20,6 +20,8 @@ import (
"fmt"
"sync"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig"
"google.golang.org/grpc"
"go.opentelemetry.io/otel/exporters/otlp"
@ -33,10 +35,21 @@ import (
)
type driver struct {
metricsDriver metricsDriver
tracesDriver tracesDriver
}
type metricsDriver struct {
connection *connection
lock sync.Mutex
metricsClient colmetricpb.MetricsServiceClient
}
type tracesDriver struct {
connection *connection
lock sync.Mutex
tracesClient coltracepb.TraceServiceClient
}
@ -46,50 +59,69 @@ var (
// NewDriver creates a new gRPC protocol driver.
func NewDriver(opts ...Option) otlp.ProtocolDriver {
cfg := config{
collectorEndpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
serviceConfig: DefaultServiceConfig,
}
cfg := otlpconfig.NewDefaultConfig()
otlpconfig.ApplyGRPCEnvConfigs(&cfg)
for _, opt := range opts {
opt(&cfg)
opt.ApplyGRPCOption(&cfg)
}
d := &driver{}
d.connection = newConnection(cfg, d.handleNewConnection)
d.tracesDriver = tracesDriver{
connection: newConnection(cfg, cfg.Traces, d.tracesDriver.handleNewConnection),
}
d.metricsDriver = metricsDriver{
connection: newConnection(cfg, cfg.Metrics, d.metricsDriver.handleNewConnection),
}
return d
}
func (d *driver) handleNewConnection(cc *grpc.ClientConn) {
d.lock.Lock()
defer d.lock.Unlock()
func (md *metricsDriver) handleNewConnection(cc *grpc.ClientConn) {
md.lock.Lock()
defer md.lock.Unlock()
if cc != nil {
d.metricsClient = colmetricpb.NewMetricsServiceClient(cc)
d.tracesClient = coltracepb.NewTraceServiceClient(cc)
md.metricsClient = colmetricpb.NewMetricsServiceClient(cc)
} else {
d.metricsClient = nil
d.tracesClient = nil
md.metricsClient = nil
}
}
func (td *tracesDriver) handleNewConnection(cc *grpc.ClientConn) {
td.lock.Lock()
defer td.lock.Unlock()
if cc != nil {
td.tracesClient = coltracepb.NewTraceServiceClient(cc)
} else {
td.tracesClient = nil
}
}
// Start implements otlp.ProtocolDriver. It establishes a connection
// to the collector.
func (d *driver) Start(ctx context.Context) error {
d.connection.startConnection(ctx)
d.tracesDriver.connection.startConnection(ctx)
d.metricsDriver.connection.startConnection(ctx)
return nil
}
// Stop implements otlp.ProtocolDriver. It shuts down the connection
// to the collector.
func (d *driver) Stop(ctx context.Context) error {
return d.connection.shutdown(ctx)
if err := d.tracesDriver.connection.shutdown(ctx); err != nil {
return err
}
return d.metricsDriver.connection.shutdown(ctx)
}
// ExportMetrics implements otlp.ProtocolDriver. It transforms metrics
// to protobuf binary format and sends the result to the collector.
func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error {
if !d.connection.connected() {
return fmt.Errorf("exporter disconnected: %w", d.connection.lastConnectError())
if !d.metricsDriver.connection.connected() {
return fmt.Errorf("metrics exporter is disconnected from the server %s: %w", d.metricsDriver.connection.sCfg.Endpoint, d.metricsDriver.connection.lastConnectError())
}
ctx, cancel := d.connection.contextWithStop(ctx)
ctx, cancel := d.metricsDriver.connection.contextWithStop(ctx)
defer cancel()
rms, err := transform.CheckpointSet(ctx, selector, cps, 1)
@ -100,24 +132,24 @@ func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet,
return nil
}
return d.uploadMetrics(ctx, rms)
return d.metricsDriver.uploadMetrics(ctx, rms)
}
func (d *driver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error {
ctx = d.connection.contextWithMetadata(ctx)
func (md *metricsDriver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.ResourceMetrics) error {
ctx = md.connection.contextWithMetadata(ctx)
err := func() error {
d.lock.Lock()
defer d.lock.Unlock()
if d.metricsClient == nil {
md.lock.Lock()
defer md.lock.Unlock()
if md.metricsClient == nil {
return errNoClient
}
_, err := d.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{
_, err := md.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: protoMetrics,
})
return err
}()
if err != nil {
d.connection.setStateDisconnected(err)
md.connection.setStateDisconnected(err)
}
return err
}
@ -125,10 +157,10 @@ func (d *driver) uploadMetrics(ctx context.Context, protoMetrics []*metricpb.Res
// ExportTraces implements otlp.ProtocolDriver. It transforms spans to
// protobuf binary format and sends the result to the collector.
func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error {
if !d.connection.connected() {
return fmt.Errorf("exporter disconnected: %w", d.connection.lastConnectError())
if !d.tracesDriver.connection.connected() {
return fmt.Errorf("traces exporter is disconnected from the server %s: %w", d.tracesDriver.connection.sCfg.Endpoint, d.tracesDriver.connection.lastConnectError())
}
ctx, cancel := d.connection.contextWithStop(ctx)
ctx, cancel := d.tracesDriver.connection.contextWithStop(ctx)
defer cancel()
protoSpans := transform.SpanData(ss)
@ -136,24 +168,24 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
return nil
}
return d.uploadTraces(ctx, protoSpans)
return d.tracesDriver.uploadTraces(ctx, protoSpans)
}
func (d *driver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
ctx = d.connection.contextWithMetadata(ctx)
func (td *tracesDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
ctx = td.connection.contextWithMetadata(ctx)
err := func() error {
d.lock.Lock()
defer d.lock.Unlock()
if d.tracesClient == nil {
td.lock.Lock()
defer td.lock.Unlock()
if td.tracesClient == nil {
return errNoClient
}
_, err := d.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{
_, err := td.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
})
return err
}()
if err != nil {
d.connection.setStateDisconnected(err)
td.connection.setStateDisconnected(err)
}
return err
}

View File

@ -15,131 +15,170 @@
package otlpgrpc
import (
"fmt"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
// DefaultServiceConfig is the gRPC service config used if none is
// provided by the user.
//
// For more info on gRPC service configs:
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md
//
// For more info on the RetryableStatusCodes we allow here:
// https://github.com/open-telemetry/oteps/blob/be2a3fcbaa417ebbf5845cd485d34fdf0ab4a2a4/text/0035-opentelemetry-protocol.md#export-response
//
// Note: MaxAttempts > 5 are treated as 5. See
// https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy
// for more details.
DefaultServiceConfig = `{
"methodConfig":[{
"name":[
{ "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" },
{ "service":"opentelemetry.proto.collector.trace.v1.TraceService" }
],
"retryPolicy":{
"MaxAttempts":5,
"InitialBackoff":"0.3s",
"MaxBackoff":"5s",
"BackoffMultiplier":2,
"RetryableStatusCodes":[
"CANCELLED",
"DEADLINE_EXCEEDED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"UNAVAILABLE",
"DATA_LOSS"
]
}
}]
}`
)
type config struct {
canDialInsecure bool
collectorEndpoint string
compressor string
reconnectionPeriod time.Duration
serviceConfig string
dialOptions []grpc.DialOption
headers map[string]string
clientCredentials credentials.TransportCredentials
}
// Option applies an option to the gRPC driver.
type Option func(cfg *config)
type Option interface {
otlpconfig.GRPCOption
}
// WithInsecure disables client transport security for the exporter's gRPC connection
// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure
// does. Note, by default, client security is required unless WithInsecure is used.
func WithInsecure() Option {
return func(cfg *config) {
cfg.canDialInsecure = true
return otlpconfig.WithInsecure()
}
// WithTracesInsecure disables client transport security for the traces exporter's gRPC connection
// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure
// does. Note, by default, client security is required unless WithInsecure is used.
func WithTracesInsecure() Option {
return otlpconfig.WithInsecureTraces()
}
// WithInsecureMetrics disables client transport security for the metrics exporter's gRPC connection
// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure
// does. Note, by default, client security is required unless WithInsecure is used.
func WithInsecureMetrics() Option {
return otlpconfig.WithInsecureMetrics()
}
// WithEndpoint allows one to set the endpoint that the exporter will
// connect to the collector on. If unset, it will instead try to use
// connect to DefaultCollectorHost:DefaultCollectorPort.
func WithEndpoint(endpoint string) Option {
return func(cfg *config) {
cfg.collectorEndpoint = endpoint
return otlpconfig.WithEndpoint(endpoint)
}
// WithTracesEndpoint allows one to set the traces endpoint that the exporter will
// connect to the collector on. If unset, it will instead try to use
// connect to DefaultCollectorHost:DefaultCollectorPort.
func WithTracesEndpoint(endpoint string) Option {
return otlpconfig.WithTracesEndpoint(endpoint)
}
// WithMetricsEndpoint allows one to set the metrics endpoint that the exporter will
// connect to the collector on. If unset, it will instead try to use
// connect to DefaultCollectorHost:DefaultCollectorPort.
func WithMetricsEndpoint(endpoint string) Option {
return otlpconfig.WithMetricsEndpoint(endpoint)
}
// WithReconnectionPeriod allows one to set the delay between next connection attempt
// after failing to connect with the collector.
func WithReconnectionPeriod(rp time.Duration) Option {
return func(cfg *config) {
cfg.reconnectionPeriod = rp
func WithReconnectionPeriod(rp time.Duration) otlpconfig.GRPCOption {
return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) {
cfg.ReconnectionPeriod = rp
})
}
func compressorToCompression(compressor string) otlp.Compression {
switch compressor {
case "gzip":
return otlp.GzipCompression
}
otel.Handle(fmt.Errorf("invalid compression type: '%s', using no compression as default", compressor))
return otlp.NoCompression
}
// WithCompressor will set the compressor for the gRPC client to use when sending requests.
// It is the responsibility of the caller to ensure that the compressor set has been registered
// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some
// compressors auto-register on import, such as gzip, which can be registered by calling
// `import _ "google.golang.org/grpc/encoding/gzip"`
// `import _ "google.golang.org/grpc/encoding/gzip"`.
func WithCompressor(compressor string) Option {
return func(cfg *config) {
cfg.compressor = compressor
}
return otlpconfig.WithCompression(compressorToCompression(compressor))
}
// WithHeaders will send the provided headers with gRPC requests
func WithHeaders(headers map[string]string) Option {
return func(cfg *config) {
cfg.headers = headers
// WithTracesCompression will set the compressor for the gRPC client to use when sending traces requests.
// It is the responsibility of the caller to ensure that the compressor set has been registered
// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some
// compressors auto-register on import, such as gzip, which can be registered by calling
// `import _ "google.golang.org/grpc/encoding/gzip"`.
func WithTracesCompression(compressor string) Option {
return otlpconfig.WithTracesCompression(compressorToCompression(compressor))
}
// WithMetricsCompression will set the compressor for the gRPC client to use when sending metrics requests.
// It is the responsibility of the caller to ensure that the compressor set has been registered
// with google.golang.org/grpc/encoding. This can be done by encoding.RegisterCompressor. Some
// compressors auto-register on import, such as gzip, which can be registered by calling
// `import _ "google.golang.org/grpc/encoding/gzip"`.
func WithMetricsCompression(compressor string) Option {
return otlpconfig.WithMetricsCompression(compressorToCompression(compressor))
}
// WithHeaders will send the provided headers with gRPC requests.
func WithHeaders(headers map[string]string) Option {
return otlpconfig.WithHeaders(headers)
}
// WithTracesHeaders will send the provided headers with gRPC traces requests.
func WithTracesHeaders(headers map[string]string) Option {
return otlpconfig.WithTracesHeaders(headers)
}
// WithMetricsHeaders will send the provided headers with gRPC metrics requests.
func WithMetricsHeaders(headers map[string]string) Option {
return otlpconfig.WithMetricsHeaders(headers)
}
// WithTLSCredentials allows the connection to use TLS credentials
// when talking to the server. It takes in grpc.TransportCredentials instead
// of say a Certificate file or a tls.Certificate, because the retrieving
// of say a Certificate file or a tls.Certificate, because the retrieving of
// these credentials can be done in many ways e.g. plain file, in code tls.Config
// or by certificate rotation, so it is up to the caller to decide what to use.
func WithTLSCredentials(creds credentials.TransportCredentials) Option {
return func(cfg *config) {
cfg.clientCredentials = creds
return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) {
cfg.Traces.GRPCCredentials = creds
cfg.Metrics.GRPCCredentials = creds
})
}
// WithTracesTLSCredentials allows the connection to use TLS credentials
// when talking to the traces server. It takes in grpc.TransportCredentials instead
// of say a Certificate file or a tls.Certificate, because the retrieving of
// these credentials can be done in many ways e.g. plain file, in code tls.Config
// or by certificate rotation, so it is up to the caller to decide what to use.
func WithTracesTLSCredentials(creds credentials.TransportCredentials) Option {
return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) {
cfg.Traces.GRPCCredentials = creds
})
}
// WithMetricsTLSCredentials allows the connection to use TLS credentials
// when talking to the metrics server. It takes in grpc.TransportCredentials instead
// of say a Certificate file or a tls.Certificate, because the retrieving of
// these credentials can be done in many ways e.g. plain file, in code tls.Config
// or by certificate rotation, so it is up to the caller to decide what to use.
func WithMetricsTLSCredentials(creds credentials.TransportCredentials) Option {
return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) {
cfg.Metrics.GRPCCredentials = creds
})
}
// WithServiceConfig defines the default gRPC service config used.
func WithServiceConfig(serviceConfig string) Option {
return func(cfg *config) {
cfg.serviceConfig = serviceConfig
}
return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) {
cfg.ServiceConfig = serviceConfig
})
}
// WithDialOption opens support to any grpc.DialOption to be used. If it conflicts
// with some other configuration the GRPC specified via the collector the ones here will
// take preference since they are set last.
func WithDialOption(opts ...grpc.DialOption) Option {
return func(cfg *config) {
cfg.dialOptions = opts
}
return otlpconfig.NewGRPCOption(func(cfg *otlpconfig.Config) {
cfg.DialOptions = opts
})
}

View File

@ -360,7 +360,10 @@ func TestNewExporter_withInvalidSecurityConfiguration(t *testing.T) {
}
err = exp.ExportSpans(ctx, []*sdktrace.SpanSnapshot{{Name: "misconfiguration"}})
require.Equal(t, err.Error(), "exporter disconnected: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
expectedErr := fmt.Sprintf("traces exporter is disconnected from the server %s: grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)", mc.endpoint)
require.Equal(t, expectedErr, err.Error())
defer func() {
_ = exp.Shutdown(ctx)

View File

@ -28,6 +28,8 @@ import (
"strings"
"time"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig"
jsonpb "google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
@ -64,14 +66,15 @@ var ourTransport *http.Transport = &http.Transport{
type driver struct {
metricsDriver signalDriver
tracesDriver signalDriver
cfg config
cfg otlpconfig.Config
stopCh chan struct{}
}
type signalDriver struct {
cfg signalConfig
generalCfg config
name string
cfg otlpconfig.SignalConfig
generalCfg otlpconfig.Config
client *http.Client
stopCh chan struct{}
}
@ -80,15 +83,15 @@ var _ otlp.ProtocolDriver = (*driver)(nil)
// NewDriver creates a new HTTP driver.
func NewDriver(opts ...Option) otlp.ProtocolDriver {
cfg := newDefaultConfig()
applyEnvConfigs(&cfg)
cfg := otlpconfig.NewDefaultConfig()
otlpconfig.ApplyHTTPEnvConfigs(&cfg)
for _, opt := range opts {
opt.Apply(&cfg)
opt.ApplyHTTPOption(&cfg)
}
for pathPtr, defaultPath := range map[*string]string{
&cfg.traces.urlPath: DefaultTracesPath,
&cfg.metrics.urlPath: DefaultMetricsPath,
&cfg.Traces.URLPath: DefaultTracesPath,
&cfg.Metrics.URLPath: DefaultMetricsPath,
} {
tmp := strings.TrimSpace(*pathPtr)
if tmp == "" {
@ -101,46 +104,48 @@ func NewDriver(opts ...Option) otlp.ProtocolDriver {
}
*pathPtr = tmp
}
if cfg.maxAttempts <= 0 {
cfg.maxAttempts = DefaultMaxAttempts
if cfg.MaxAttempts <= 0 {
cfg.MaxAttempts = DefaultMaxAttempts
}
if cfg.maxAttempts > DefaultMaxAttempts {
cfg.maxAttempts = DefaultMaxAttempts
if cfg.MaxAttempts > DefaultMaxAttempts {
cfg.MaxAttempts = DefaultMaxAttempts
}
if cfg.backoff <= 0 {
cfg.backoff = DefaultBackoff
if cfg.Backoff <= 0 {
cfg.Backoff = DefaultBackoff
}
metricsClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.metrics.timeout,
Timeout: cfg.Metrics.Timeout,
}
if cfg.metrics.tlsCfg != nil {
if cfg.Metrics.TLSCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.metrics.tlsCfg
transport.TLSClientConfig = cfg.Metrics.TLSCfg
metricsClient.Transport = transport
}
tracesClient := &http.Client{
Transport: ourTransport,
Timeout: cfg.traces.timeout,
Timeout: cfg.Traces.Timeout,
}
if cfg.traces.tlsCfg != nil {
if cfg.Traces.TLSCfg != nil {
transport := ourTransport.Clone()
transport.TLSClientConfig = cfg.traces.tlsCfg
transport.TLSClientConfig = cfg.Traces.TLSCfg
tracesClient.Transport = transport
}
stopCh := make(chan struct{})
return &driver{
tracesDriver: signalDriver{
cfg: cfg.traces,
name: "traces",
cfg: cfg.Traces,
generalCfg: cfg,
stopCh: stopCh,
client: tracesClient,
},
metricsDriver: signalDriver{
cfg: cfg.metrics,
name: "metrics",
cfg: cfg.Metrics,
generalCfg: cfg,
stopCh: stopCh,
client: metricsClient,
@ -198,18 +203,18 @@ func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot)
}
func (d *driver) marshal(msg proto.Message) ([]byte, error) {
if d.cfg.marshaler == MarshalJSON {
if d.cfg.Marshaler == otlp.MarshalJSON {
return jsonpb.Marshal(msg)
}
return proto.Marshal(msg)
}
func (d *signalDriver) send(ctx context.Context, rawRequest []byte) error {
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.endpoint, d.cfg.urlPath)
address := fmt.Sprintf("%s://%s%s", d.getScheme(), d.cfg.Endpoint, d.cfg.URLPath)
var cancel context.CancelFunc
ctx, cancel = d.contextWithStop(ctx)
defer cancel()
for i := 0; i < d.generalCfg.maxAttempts; i++ {
for i := 0; i < d.generalCfg.MaxAttempts; i++ {
response, err := d.singleSend(ctx, rawRequest, address)
if err != nil {
return err
@ -226,20 +231,20 @@ func (d *signalDriver) send(ctx context.Context, rawRequest []byte) error {
fallthrough
case http.StatusServiceUnavailable:
select {
case <-time.After(getWaitDuration(d.generalCfg.backoff, i)):
case <-time.After(getWaitDuration(d.generalCfg.Backoff, i)):
continue
case <-ctx.Done():
return ctx.Err()
}
default:
return fmt.Errorf("failed with HTTP status %s", response.Status)
return fmt.Errorf("failed to send %s to %s with HTTP status %s", d.name, address, response.Status)
}
}
return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.maxAttempts)
return fmt.Errorf("failed to send data to %s after %d tries", address, d.generalCfg.MaxAttempts)
}
func (d *signalDriver) getScheme() string {
if d.cfg.insecure {
if d.cfg.Insecure {
return "http"
}
return "https"
@ -302,20 +307,20 @@ func (d *signalDriver) singleSend(ctx context.Context, rawRequest []byte, addres
func (d *signalDriver) prepareBody(rawRequest []byte) (io.ReadCloser, int64, http.Header) {
var bodyReader io.ReadCloser
headers := http.Header{}
for k, v := range d.cfg.headers {
for k, v := range d.cfg.Headers {
headers.Set(k, v)
}
contentLength := (int64)(len(rawRequest))
if d.generalCfg.marshaler == MarshalJSON {
if d.generalCfg.Marshaler == otlp.MarshalJSON {
headers.Set("Content-Type", contentTypeJSON)
} else {
headers.Set("Content-Type", contentTypeProto)
}
requestReader := bytes.NewBuffer(rawRequest)
switch d.cfg.compression {
case NoCompression:
switch d.cfg.Compression {
case otlp.NoCompression:
bodyReader = ioutil.NopCloser(requestReader)
case GzipCompression:
case otlp.GzipCompression:
preader, pwriter := io.Pipe()
go func() {
defer pwriter.Close()

View File

@ -16,6 +16,7 @@ package otlphttp_test
import (
"context"
"fmt"
"net/http"
"os"
"testing"
@ -57,7 +58,7 @@ func TestEndToEnd(t *testing.T) {
{
name: "with gzip compression",
opts: []otlphttp.Option{
otlphttp.WithCompression(otlphttp.GzipCompression),
otlphttp.WithCompression(otlp.GzipCompression),
},
},
{
@ -109,7 +110,7 @@ func TestEndToEnd(t *testing.T) {
{
name: "with json encoding",
opts: []otlphttp.Option{
otlphttp.WithMarshal(otlphttp.MarshalJSON),
otlphttp.WithMarshal(otlp.MarshalJSON),
},
},
}
@ -154,6 +155,7 @@ func TestRetry(t *testing.T) {
defer mc.MustStop(t)
driver := otlphttp.NewDriver(
otlphttp.WithEndpoint(mc.Endpoint()),
otlphttp.WithTracesEndpoint(mc.Endpoint()),
otlphttp.WithInsecure(),
otlphttp.WithMaxAttempts(len(statuses)+1),
)
@ -237,6 +239,7 @@ func TestNoRetry(t *testing.T) {
}()
err = exporter.ExportSpans(ctx, otlptest.SingleSpanSnapshot())
assert.Error(t, err)
assert.Equal(t, fmt.Sprintf("failed to send traces to http://%s/v1/traces with HTTP status 400 Bad Request", mc.endpoint), err.Error())
assert.Empty(t, mc.GetSpans())
}

View File

@ -16,23 +16,10 @@ package otlphttp
import (
"crypto/tls"
"fmt"
"time"
"go.opentelemetry.io/otel/exporters/otlp"
)
// Compression describes the compression used for payloads sent to the
// collector.
type Compression int
const (
// NoCompression tells the driver to send payloads without
// compression.
NoCompression Compression = iota
// GzipCompression tells the driver to send payloads after
// compressing them with gzip.
GzipCompression
"go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig"
)
const (
@ -54,78 +41,9 @@ const (
DefaultTimeout time.Duration = 10 * time.Second
)
// Marshaler describes the kind of message format sent to the collector
type Marshaler int
const (
// MarshalProto tells the driver to send using the protobuf binary format.
MarshalProto Marshaler = iota
// MarshalJSON tells the driver to send using json format.
MarshalJSON
)
type signalConfig struct {
endpoint string
insecure bool
tlsCfg *tls.Config
headers map[string]string
compression Compression
timeout time.Duration
urlPath string
}
type config struct {
metrics signalConfig
traces signalConfig
maxAttempts int
backoff time.Duration
marshaler Marshaler
}
func newDefaultConfig() config {
c := config{
traces: signalConfig{
endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
urlPath: DefaultTracesPath,
compression: NoCompression,
timeout: DefaultTimeout,
},
metrics: signalConfig{
endpoint: fmt.Sprintf("%s:%d", otlp.DefaultCollectorHost, otlp.DefaultCollectorPort),
urlPath: DefaultMetricsPath,
compression: NoCompression,
timeout: DefaultTimeout,
},
maxAttempts: DefaultMaxAttempts,
backoff: DefaultBackoff,
}
return c
}
// Option applies an option to the HTTP driver.
type Option interface {
Apply(*config)
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
type genericOption struct {
fn func(*config)
}
func (g *genericOption) Apply(cfg *config) {
g.fn(cfg)
}
func (genericOption) private() {}
func newGenericOption(fn func(cfg *config)) Option {
return &genericOption{fn: fn}
otlpconfig.HTTPOption
}
// WithEndpoint allows one to set the address of the collector
@ -134,10 +52,7 @@ func newGenericOption(fn func(cfg *config)) Option {
// DefaultCollectorHost:DefaultCollectorPort. Note that the endpoint
// must not contain any URL path.
func WithEndpoint(endpoint string) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.endpoint = endpoint
cfg.metrics.endpoint = endpoint
})
return otlpconfig.WithEndpoint(endpoint)
}
// WithTracesEndpoint allows one to set the address of the collector
@ -145,9 +60,7 @@ func WithEndpoint(endpoint string) Option {
// unset, it will instead try to use the Endpoint configuration.
// Note that the endpoint must not contain any URL path.
func WithTracesEndpoint(endpoint string) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.endpoint = endpoint
})
return otlpconfig.WithTracesEndpoint(endpoint)
}
// WithMetricsEndpoint allows one to set the address of the collector
@ -155,177 +68,132 @@ func WithTracesEndpoint(endpoint string) Option {
// unset, it will instead try to use the Endpoint configuration.
// Note that the endpoint must not contain any URL path.
func WithMetricsEndpoint(endpoint string) Option {
return newGenericOption(func(cfg *config) {
cfg.metrics.endpoint = endpoint
})
return otlpconfig.WithMetricsEndpoint(endpoint)
}
// WithCompression tells the driver to compress the sent data.
func WithCompression(compression Compression) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.compression = compression
cfg.metrics.compression = compression
})
func WithCompression(compression otlp.Compression) Option {
return otlpconfig.WithCompression(compression)
}
// WithTracesCompression tells the driver to compress the sent traces data.
func WithTracesCompression(compression Compression) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.compression = compression
})
func WithTracesCompression(compression otlp.Compression) Option {
return otlpconfig.WithTracesCompression(compression)
}
// WithMetricsCompression tells the driver to compress the sent metrics data.
func WithMetricsCompression(compression Compression) Option {
return newGenericOption(func(cfg *config) {
cfg.metrics.compression = compression
})
func WithMetricsCompression(compression otlp.Compression) Option {
return otlpconfig.WithMetricsCompression(compression)
}
// WithTracesURLPath allows one to override the default URL path used
// for sending traces. If unset, DefaultTracesPath will be used.
func WithTracesURLPath(urlPath string) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.urlPath = urlPath
})
return otlpconfig.WithTracesURLPath(urlPath)
}
// WithMetricsURLPath allows one to override the default URL path used
// for sending metrics. If unset, DefaultMetricsPath will be used.
func WithMetricsURLPath(urlPath string) Option {
return newGenericOption(func(cfg *config) {
cfg.metrics.urlPath = urlPath
})
return otlpconfig.WithMetricsURLPath(urlPath)
}
// WithMaxAttempts allows one to override how many times the driver
// will try to send the payload in case of retryable errors. If unset,
// DefaultMaxAttempts will be used.
func WithMaxAttempts(maxAttempts int) Option {
return newGenericOption(func(cfg *config) {
cfg.maxAttempts = maxAttempts
})
return otlpconfig.WithMaxAttempts(maxAttempts)
}
// WithBackoff tells the driver to use the duration as a base of the
// exponential backoff strategy. If unset, DefaultBackoff will be
// used.
func WithBackoff(duration time.Duration) Option {
return newGenericOption(func(cfg *config) {
cfg.backoff = duration
})
return otlpconfig.WithBackoff(duration)
}
// WithTLSClientConfig can be used to set up a custom TLS
// configuration for the client used to send payloads to the
// collector. Use it if you want to use a custom certificate.
func WithTLSClientConfig(tlsCfg *tls.Config) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.tlsCfg = tlsCfg
cfg.metrics.tlsCfg = tlsCfg
})
return otlpconfig.WithTLSClientConfig(tlsCfg)
}
// WithTracesTLSClientConfig can be used to set up a custom TLS
// configuration for the client used to send traces.
// Use it if you want to use a custom certificate.
func WithTracesTLSClientConfig(tlsCfg *tls.Config) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.tlsCfg = tlsCfg
})
return otlpconfig.WithTracesTLSClientConfig(tlsCfg)
}
// WithMetricsTLSClientConfig can be used to set up a custom TLS
// configuration for the client used to send metrics.
// Use it if you want to use a custom certificate.
func WithMetricsTLSClientConfig(tlsCfg *tls.Config) Option {
return newGenericOption(func(cfg *config) {
cfg.metrics.tlsCfg = tlsCfg
})
return otlpconfig.WithMetricsTLSClientConfig(tlsCfg)
}
// WithInsecure tells the driver to connect to the collector using the
// HTTP scheme, instead of HTTPS.
func WithInsecure() Option {
return newGenericOption(func(cfg *config) {
cfg.traces.insecure = true
cfg.metrics.insecure = true
})
return otlpconfig.WithInsecure()
}
// WithInsecureTraces tells the driver to connect to the traces collector using the
// HTTP scheme, instead of HTTPS.
func WithInsecureTraces() Option {
return newGenericOption(func(cfg *config) {
cfg.traces.insecure = true
})
return otlpconfig.WithInsecureTraces()
}
// WithInsecure tells the driver to connect to the metrics collector using the
// HTTP scheme, instead of HTTPS.
func WithInsecureMetrics() Option {
return newGenericOption(func(cfg *config) {
cfg.metrics.insecure = true
})
return otlpconfig.WithInsecureMetrics()
}
// WithHeaders allows one to tell the driver to send additional HTTP
// headers with the payloads. Specifying headers like Content-Length,
// Content-Encoding and Content-Type may result in a broken driver.
func WithHeaders(headers map[string]string) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.headers = headers
cfg.metrics.headers = headers
})
return otlpconfig.WithHeaders(headers)
}
// WithTracesHeaders allows one to tell the driver to send additional HTTP
// headers with the trace payloads. Specifying headers like Content-Length,
// Content-Encoding and Content-Type may result in a broken driver.
func WithTracesHeaders(headers map[string]string) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.headers = headers
})
return otlpconfig.WithTracesHeaders(headers)
}
// WithMetricsHeaders allows one to tell the driver to send additional HTTP
// headers with the metrics payloads. Specifying headers like Content-Length,
// Content-Encoding and Content-Type may result in a broken driver.
func WithMetricsHeaders(headers map[string]string) Option {
return newGenericOption(func(cfg *config) {
cfg.metrics.headers = headers
})
return otlpconfig.WithMetricsHeaders(headers)
}
// WithMarshal tells the driver which wire format to use when sending to the
// collector. If unset, MarshalProto will be used
func WithMarshal(m Marshaler) Option {
return newGenericOption(func(cfg *config) {
cfg.marshaler = m
func WithMarshal(m otlp.Marshaler) Option {
return otlpconfig.NewHTTPOption(func(cfg *otlpconfig.Config) {
cfg.Marshaler = m
})
}
// WithTimeout tells the driver the max waiting time for the backend to process
// each spans or metrics batch. If unset, the default will be 10 seconds.
func WithTimeout(duration time.Duration) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.timeout = duration
cfg.metrics.timeout = duration
})
return otlpconfig.WithTimeout(duration)
}
// WithTracesTimeout tells the driver the max waiting time for the backend to process
// each spans batch. If unset, the default will be 10 seconds.
func WithTracesTimeout(duration time.Duration) Option {
return newGenericOption(func(cfg *config) {
cfg.traces.timeout = duration
})
return otlpconfig.WithTracesTimeout(duration)
}
// WithMetricsTimeout tells the driver the max waiting time for the backend to process
// each metrics batch. If unset, the default will be 10 seconds.
func WithMetricsTimeout(duration time.Duration) Option {
return newGenericOption(func(cfg *config) {
cfg.metrics.timeout = duration
})
return otlpconfig.WithMetricsTimeout(duration)
}

View File

@ -1,389 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlphttp
import (
"crypto/tls"
"crypto/x509"
"errors"
"testing"
"time"
"go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig"
"github.com/stretchr/testify/assert"
)
type env map[string]string
func (e *env) getEnv(env string) string {
return (*e)[env]
}
type fileReader map[string][]byte
func (f *fileReader) readFile(filename string) ([]byte, error) {
if b, ok := (*f)[filename]; ok {
return b, nil
}
return nil, errors.New("File not found")
}
func TestConfigs(t *testing.T) {
tlsCert, err := otlpconfig.CreateTLSConfig([]byte(otlpconfig.WeakCertificate))
assert.NoError(t, err)
tests := []struct {
name string
opts []Option
env env
fileReader fileReader
asserts func(t *testing.T, c *config)
}{
{
name: "Test default configs",
asserts: func(t *testing.T, c *config) {
assert.Equal(t, "localhost:4317", c.traces.endpoint)
assert.Equal(t, "localhost:4317", c.metrics.endpoint)
assert.Equal(t, NoCompression, c.traces.compression)
assert.Equal(t, NoCompression, c.metrics.compression)
assert.Equal(t, map[string]string(nil), c.traces.headers)
assert.Equal(t, map[string]string(nil), c.metrics.headers)
assert.Equal(t, 10*time.Second, c.traces.timeout)
assert.Equal(t, 10*time.Second, c.metrics.timeout)
},
},
// Endpoint Tests
{
name: "Test With Endpoint",
opts: []Option{
WithEndpoint("someendpoint"),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, "someendpoint", c.traces.endpoint)
assert.Equal(t, "someendpoint", c.metrics.endpoint)
},
},
{
name: "Test With Signal Specific Endpoint",
opts: []Option{
WithEndpoint("overrode_by_signal_specific"),
WithTracesEndpoint("traces_endpoint"),
WithMetricsEndpoint("metrics_endpoint"),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, "traces_endpoint", c.traces.endpoint)
assert.Equal(t, "metrics_endpoint", c.metrics.endpoint)
},
},
{
name: "Test Environment Endpoint",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "env_endpoint",
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, "env_endpoint", c.traces.endpoint)
assert.Equal(t, "env_endpoint", c.metrics.endpoint)
},
},
{
name: "Test Environment Signal Specific Endpoint",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "overrode_by_signal_specific",
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "env_traces_endpoint",
"OTEL_EXPORTER_OTLP_METRICS_ENDPOINT": "env_metrics_endpoint",
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, "env_traces_endpoint", c.traces.endpoint)
assert.Equal(t, "env_metrics_endpoint", c.metrics.endpoint)
},
},
{
name: "Test Mixed Environment and With Endpoint",
opts: []Option{
WithTracesEndpoint("traces_endpoint"),
},
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "env_endpoint",
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, "traces_endpoint", c.traces.endpoint)
assert.Equal(t, "env_endpoint", c.metrics.endpoint)
},
},
// Certificate tests
{
name: "Test With Certificate",
opts: []Option{
WithTLSClientConfig(tlsCert),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects())
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.metrics.tlsCfg.RootCAs.Subjects())
},
},
{
name: "Test With Signal Specific Endpoint",
opts: []Option{
WithTLSClientConfig(&tls.Config{}),
WithTracesTLSClientConfig(tlsCert),
WithMetricsTLSClientConfig(&tls.Config{RootCAs: x509.NewCertPool()}),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects())
assert.Equal(t, 0, len(c.metrics.tlsCfg.RootCAs.Subjects()))
},
},
{
name: "Test Environment Endpoint",
env: map[string]string{
"OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path",
},
fileReader: fileReader{
"cert_path": []byte(otlpconfig.WeakCertificate),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects())
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.metrics.tlsCfg.RootCAs.Subjects())
},
},
{
name: "Test Environment Signal Specific Endpoint",
env: map[string]string{
"OTEL_EXPORTER_OTLP_CERTIFICATE": "overrode_by_signal_specific",
"OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE": "cert_path",
"OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE": "invalid_cert",
},
fileReader: fileReader{
"cert_path": []byte(otlpconfig.WeakCertificate),
"invalid_cert": []byte("invalid certificate file."),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects())
assert.Equal(t, (*tls.Config)(nil), c.metrics.tlsCfg)
},
},
{
name: "Test Mixed Environment and With Endpoint",
opts: []Option{
WithMetricsTLSClientConfig(&tls.Config{RootCAs: x509.NewCertPool()}),
},
env: map[string]string{
"OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path",
},
fileReader: fileReader{
"cert_path": []byte(otlpconfig.WeakCertificate),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.traces.tlsCfg.RootCAs.Subjects())
assert.Equal(t, 0, len(c.metrics.tlsCfg.RootCAs.Subjects()))
},
},
// Headers tests
{
name: "Test With Headers",
opts: []Option{
WithHeaders(map[string]string{"h1": "v1"}),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, map[string]string{"h1": "v1"}, c.metrics.headers)
assert.Equal(t, map[string]string{"h1": "v1"}, c.traces.headers)
},
},
{
name: "Test With Signal Specific Headers",
opts: []Option{
WithHeaders(map[string]string{"overrode": "by_signal_specific"}),
WithMetricsHeaders(map[string]string{"m1": "mv1"}),
WithTracesHeaders(map[string]string{"t1": "tv1"}),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, map[string]string{"m1": "mv1"}, c.metrics.headers)
assert.Equal(t, map[string]string{"t1": "tv1"}, c.traces.headers)
},
},
{
name: "Test Environment Headers",
env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.metrics.headers)
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.traces.headers)
},
},
{
name: "Test Environment Signal Specific Headers",
env: map[string]string{
"OTEL_EXPORTER_OTLP_HEADERS": "overrode_by_signal_specific",
"OTEL_EXPORTER_OTLP_TRACES_HEADERS": "h1=v1,h2=v2",
"OTEL_EXPORTER_OTLP_METRICS_HEADERS": "h1=v1,h2=v2",
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.metrics.headers)
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.traces.headers)
},
},
{
name: "Test Mixed Environment and With Headers",
env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"},
opts: []Option{
WithMetricsHeaders(map[string]string{"m1": "mv1"}),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, map[string]string{"m1": "mv1"}, c.metrics.headers)
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.traces.headers)
},
},
// Compression Tests
{
name: "Test With Compression",
opts: []Option{
WithCompression(GzipCompression),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, GzipCompression, c.traces.compression)
assert.Equal(t, GzipCompression, c.metrics.compression)
},
},
{
name: "Test With Signal Specific Compression",
opts: []Option{
WithCompression(NoCompression), // overrode by signal specific configs
WithTracesCompression(GzipCompression),
WithMetricsCompression(GzipCompression),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, GzipCompression, c.traces.compression)
assert.Equal(t, GzipCompression, c.metrics.compression)
},
},
{
name: "Test Environment Compression",
env: map[string]string{
"OTEL_EXPORTER_OTLP_COMPRESSION": "gzip",
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, GzipCompression, c.traces.compression)
assert.Equal(t, GzipCompression, c.metrics.compression)
},
},
{
name: "Test Environment Signal Specific Compression",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip",
"OTEL_EXPORTER_OTLP_METRICS_COMPRESSION": "gzip",
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, GzipCompression, c.traces.compression)
assert.Equal(t, GzipCompression, c.metrics.compression)
},
},
{
name: "Test Mixed Environment and With Compression",
opts: []Option{
WithTracesCompression(NoCompression),
},
env: map[string]string{
"OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip",
"OTEL_EXPORTER_OTLP_METRICS_COMPRESSION": "gzip",
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, NoCompression, c.traces.compression)
assert.Equal(t, GzipCompression, c.metrics.compression)
},
},
// Timeout Tests
{
name: "Test With Timeout",
opts: []Option{
WithTimeout(time.Duration(5 * time.Second)),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, 5*time.Second, c.traces.timeout)
assert.Equal(t, 5*time.Second, c.metrics.timeout)
},
},
{
name: "Test With Signal Specific Timeout",
opts: []Option{
WithTimeout(time.Duration(5 * time.Second)),
WithTracesTimeout(time.Duration(13 * time.Second)),
WithMetricsTimeout(time.Duration(14 * time.Second)),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, 13*time.Second, c.traces.timeout)
assert.Equal(t, 14*time.Second, c.metrics.timeout)
},
},
{
name: "Test Environment Timeout",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, c.metrics.timeout, 15*time.Second)
assert.Equal(t, c.traces.timeout, 15*time.Second)
},
},
{
name: "Test Environment Signal Specific Timeout",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
"OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000",
"OTEL_EXPORTER_OTLP_METRICS_TIMEOUT": "28000",
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, c.traces.timeout, 27*time.Second)
assert.Equal(t, c.metrics.timeout, 28*time.Second)
},
},
{
name: "Test Mixed Environment and With Timeout",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
"OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000",
"OTEL_EXPORTER_OTLP_METRICS_TIMEOUT": "28000",
},
opts: []Option{
WithTracesTimeout(5 * time.Second),
},
asserts: func(t *testing.T, c *config) {
assert.Equal(t, c.traces.timeout, 5*time.Second)
assert.Equal(t, c.metrics.timeout, 28*time.Second)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := newDefaultConfig()
e := envOptionsReader{
getEnv: tt.env.getEnv,
readFile: tt.fileReader.readFile,
}
e.applyEnvConfigs(&cfg)
for _, opt := range tt.opts {
opt.Apply(&cfg)
}
tt.asserts(t, &cfg)
})
}
}