mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-03-17 20:57:51 +02:00
Add implementation of otlploggrpc configuration (#5383)
part of #5056 Most of the codes are copied from `otlploghttp`. I will try to make `internal/conf` as a shared go template file so `otlploghttp` can use a shared setting struct with `otlploggrpc` in the following PRs.
This commit is contained in:
parent
5331939a74
commit
dace7b6eaf
exporters/otlp/otlplog/otlploggrpc
@ -4,28 +4,130 @@
|
||||
package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
)
|
||||
|
||||
// Default values.
|
||||
var (
|
||||
defaultEndpoint = "localhost:4317"
|
||||
defaultTimeout = 10 * time.Second
|
||||
defaultRetryCfg = retry.DefaultConfig
|
||||
)
|
||||
|
||||
// Environment variable keys.
|
||||
var (
|
||||
envEndpoint = []string{
|
||||
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT",
|
||||
"OTEL_EXPORTER_OTLP_ENDPOINT",
|
||||
}
|
||||
envInsecure = envEndpoint
|
||||
|
||||
envHeaders = []string{
|
||||
"OTEL_EXPORTER_OTLP_LOGS_HEADERS",
|
||||
"OTEL_EXPORTER_OTLP_HEADERS",
|
||||
}
|
||||
|
||||
envCompression = []string{
|
||||
"OTEL_EXPORTER_OTLP_LOGS_COMPRESSION",
|
||||
"OTEL_EXPORTER_OTLP_COMPRESSION",
|
||||
}
|
||||
|
||||
envTimeout = []string{
|
||||
"OTEL_EXPORTER_OTLP_LOGS_TIMEOUT",
|
||||
"OTEL_EXPORTER_OTLP_TIMEOUT",
|
||||
}
|
||||
|
||||
envTLSCert = []string{
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE",
|
||||
"OTEL_EXPORTER_OTLP_CERTIFICATE",
|
||||
}
|
||||
envTLSClient = []struct {
|
||||
Certificate string
|
||||
Key string
|
||||
}{
|
||||
{
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY",
|
||||
},
|
||||
{
|
||||
"OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE",
|
||||
"OTEL_EXPORTER_OTLP_CLIENT_KEY",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
type fnOpt func(config) config
|
||||
|
||||
func (f fnOpt) applyOption(c config) config { return f(c) }
|
||||
|
||||
// Option applies an option to the Exporter.
|
||||
type Option interface {
|
||||
applyHTTPOption(config) config
|
||||
applyOption(config) config
|
||||
}
|
||||
|
||||
type config struct {
|
||||
// TODO: implement.
|
||||
endpoint setting[string]
|
||||
insecure setting[bool]
|
||||
tlsCfg setting[*tls.Config]
|
||||
headers setting[map[string]string]
|
||||
compression setting[Compression]
|
||||
timeout setting[time.Duration]
|
||||
retryCfg setting[retry.Config]
|
||||
|
||||
// gRPC configurations
|
||||
gRPCCredentials setting[credentials.TransportCredentials]
|
||||
serviceConfig setting[string]
|
||||
reconnectionPeriod setting[time.Duration]
|
||||
dialOptions setting[[]grpc.DialOption]
|
||||
gRPCConn setting[*grpc.ClientConn]
|
||||
}
|
||||
|
||||
func newConfig(options []Option) config {
|
||||
var c config
|
||||
for _, opt := range options {
|
||||
c = opt.applyHTTPOption(c)
|
||||
c = opt.applyOption(c)
|
||||
}
|
||||
|
||||
// Apply environment value and default value
|
||||
c.endpoint = c.endpoint.Resolve(
|
||||
getEnv[string](envEndpoint, convEndpoint),
|
||||
fallback[string](defaultEndpoint),
|
||||
)
|
||||
c.insecure = c.insecure.Resolve(
|
||||
getEnv[bool](envInsecure, convInsecure),
|
||||
)
|
||||
c.tlsCfg = c.tlsCfg.Resolve(
|
||||
loadEnvTLS[*tls.Config](),
|
||||
)
|
||||
c.headers = c.headers.Resolve(
|
||||
getEnv[map[string]string](envHeaders, convHeaders),
|
||||
)
|
||||
c.compression = c.compression.Resolve(
|
||||
getEnv[Compression](envCompression, convCompression),
|
||||
)
|
||||
c.timeout = c.timeout.Resolve(
|
||||
getEnv[time.Duration](envTimeout, convDuration),
|
||||
fallback[time.Duration](defaultTimeout),
|
||||
)
|
||||
c.retryCfg = c.retryCfg.Resolve(
|
||||
fallback[retry.Config](defaultRetryCfg),
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
@ -51,8 +153,10 @@ type RetryConfig retry.Config
|
||||
//
|
||||
// This option has no effect if WithGRPCConn is used.
|
||||
func WithInsecure() Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
return fnOpt(func(c config) config {
|
||||
c.insecure = newSetting(true)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithEndpoint sets the target endpoint the Exporter will connect to.
|
||||
@ -70,8 +174,10 @@ func WithInsecure() Option {
|
||||
//
|
||||
// This option has no effect if WithGRPCConn is used.
|
||||
func WithEndpoint(endpoint string) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
return fnOpt(func(c config) config {
|
||||
c.endpoint = newSetting(endpoint)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithEndpointURL sets the target endpoint URL the Exporter will connect to.
|
||||
@ -90,9 +196,21 @@ func WithEndpoint(endpoint string) Option {
|
||||
// passed, "localhost:4317" will be used.
|
||||
//
|
||||
// This option has no effect if WithGRPCConn is used.
|
||||
func WithEndpointURL(u string) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
func WithEndpointURL(rawURL string) Option {
|
||||
u, err := url.Parse(rawURL)
|
||||
if err != nil {
|
||||
global.Error(err, "otlplog: parse endpoint url", "url", rawURL)
|
||||
return fnOpt(func(c config) config { return c })
|
||||
}
|
||||
return fnOpt(func(c config) config {
|
||||
c.endpoint = newSetting(u.Host)
|
||||
if u.Scheme != "https" {
|
||||
c.insecure = newSetting(true)
|
||||
} else {
|
||||
c.insecure = newSetting(false)
|
||||
}
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithReconnectionPeriod set the minimum amount of time between connection
|
||||
@ -100,10 +218,22 @@ func WithEndpointURL(u string) Option {
|
||||
//
|
||||
// This option has no effect if WithGRPCConn is used.
|
||||
func WithReconnectionPeriod(rp time.Duration) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
return fnOpt(func(c config) config {
|
||||
c.reconnectionPeriod = newSetting(rp)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// Compression describes the compression used for exported payloads.
|
||||
type Compression int
|
||||
|
||||
const (
|
||||
// NoCompression represents that no compression should be used.
|
||||
NoCompression Compression = iota
|
||||
// GzipCompression represents that gzip compression should be used.
|
||||
GzipCompression
|
||||
)
|
||||
|
||||
// WithCompressor sets the compressor the gRPC client uses.
|
||||
// Supported compressor values: "gzip".
|
||||
//
|
||||
@ -114,12 +244,14 @@ func WithReconnectionPeriod(rp time.Duration) Option {
|
||||
// OTEL_EXPORTER_OTLP_LOGS_COMPRESSION will take precedence.
|
||||
//
|
||||
// By default, if an environment variable is not set, and this option is not
|
||||
// passed, no compressor will be used.
|
||||
// passed, no compression strategy will be used.
|
||||
//
|
||||
// This option has no effect if WithGRPCConn is used.
|
||||
func WithCompressor(compressor string) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
return fnOpt(func(c config) config {
|
||||
c.compression = newSetting(compressorToCompression(compressor))
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithHeaders will send the provided headers with each gRPC requests.
|
||||
@ -134,8 +266,10 @@ func WithCompressor(compressor string) Option {
|
||||
// By default, if an environment variable is not set, and this option is not
|
||||
// passed, no user headers will be set.
|
||||
func WithHeaders(headers map[string]string) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
return fnOpt(func(c config) config {
|
||||
c.headers = newSetting(headers)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithTLSCredentials sets the gRPC connection to use creds.
|
||||
@ -150,17 +284,21 @@ func WithHeaders(headers map[string]string) Option {
|
||||
// passed, no TLS credentials will be used.
|
||||
//
|
||||
// This option has no effect if WithGRPCConn is used.
|
||||
func WithTLSCredentials(_ credentials.TransportCredentials) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
func WithTLSCredentials(credential credentials.TransportCredentials) Option {
|
||||
return fnOpt(func(c config) config {
|
||||
c.gRPCCredentials = newSetting(credential)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithServiceConfig defines the default gRPC service config used.
|
||||
//
|
||||
// This option has no effect if WithGRPCConn is used.
|
||||
func WithServiceConfig(serviceConfig string) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
return fnOpt(func(c config) config {
|
||||
c.serviceConfig = newSetting(serviceConfig)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithDialOption sets explicit grpc.DialOptions to use when establishing a
|
||||
@ -171,9 +309,11 @@ func WithServiceConfig(serviceConfig string) Option {
|
||||
// grpc.DialOptions are ignored.
|
||||
//
|
||||
// This option has no effect if WithGRPCConn is used.
|
||||
func WithDialOption(_ ...grpc.DialOption) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
func WithDialOption(opts ...grpc.DialOption) Option {
|
||||
return fnOpt(func(c config) config {
|
||||
c.dialOptions = newSetting(opts)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithGRPCConn sets conn as the gRPC ClientConn used for all communication.
|
||||
@ -184,9 +324,11 @@ func WithDialOption(_ ...grpc.DialOption) Option {
|
||||
//
|
||||
// It is the callers responsibility to close the passed conn. The Exporter
|
||||
// Shutdown method will not close this connection.
|
||||
func WithGRPCConn(_ *grpc.ClientConn) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
func WithGRPCConn(conn *grpc.ClientConn) Option {
|
||||
return fnOpt(func(c config) config {
|
||||
c.gRPCConn = newSetting(conn)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithTimeout sets the max amount of time an Exporter will attempt an export.
|
||||
@ -204,8 +346,10 @@ func WithGRPCConn(_ *grpc.ClientConn) Option {
|
||||
// By default, if an environment variable is not set, and this option is not
|
||||
// passed, a timeout of 10 seconds will be used.
|
||||
func WithTimeout(duration time.Duration) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
return fnOpt(func(c config) config {
|
||||
c.timeout = newSetting(duration)
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// WithRetry sets the retry policy for transient retryable errors that are
|
||||
@ -221,7 +365,252 @@ func WithTimeout(duration time.Duration) Option {
|
||||
// If unset, the default retry policy will be used. It will retry the export
|
||||
// 5 seconds after receiving a retryable error and increase exponentially
|
||||
// after each error for no more than a total time of 1 minute.
|
||||
func WithRetry(settings RetryConfig) Option {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
func WithRetry(rc RetryConfig) Option {
|
||||
return fnOpt(func(c config) config {
|
||||
c.retryCfg = newSetting(retry.Config(rc))
|
||||
return c
|
||||
})
|
||||
}
|
||||
|
||||
// convCompression returns the parsed compression encoded in s. NoCompression
|
||||
// and an errors are returned if s is unknown.
|
||||
func convCompression(s string) (Compression, error) {
|
||||
switch s {
|
||||
case "gzip":
|
||||
return GzipCompression, nil
|
||||
case "none", "":
|
||||
return NoCompression, nil
|
||||
}
|
||||
return NoCompression, fmt.Errorf("unknown compression: %s", s)
|
||||
}
|
||||
|
||||
// convEndpoint converts s from a URL string to an endpoint if s is a valid
|
||||
// URL. Otherwise, "" and an error are returned.
|
||||
func convEndpoint(s string) (string, error) {
|
||||
u, err := url.Parse(s)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return u.Host, nil
|
||||
}
|
||||
|
||||
// convInsecure parses s as a URL string and returns if the connection should
|
||||
// use client transport security or not. If s is an invalid URL, false and an
|
||||
// error are returned.
|
||||
func convInsecure(s string) (bool, error) {
|
||||
u, err := url.Parse(s)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return u.Scheme != "https", nil
|
||||
}
|
||||
|
||||
// convHeaders converts the OTel environment variable header value s into a
|
||||
// mapping of header key to value. If s is invalid a partial result and error
|
||||
// are returned.
|
||||
func convHeaders(s string) (map[string]string, error) {
|
||||
out := make(map[string]string)
|
||||
var err error
|
||||
for _, header := range strings.Split(s, ",") {
|
||||
rawKey, rawVal, found := strings.Cut(header, "=")
|
||||
if !found {
|
||||
err = errors.Join(err, fmt.Errorf("invalid header: %s", header))
|
||||
continue
|
||||
}
|
||||
|
||||
escKey, e := url.PathUnescape(rawKey)
|
||||
if e != nil {
|
||||
err = errors.Join(err, fmt.Errorf("invalid header key: %s", rawKey))
|
||||
continue
|
||||
}
|
||||
key := strings.TrimSpace(escKey)
|
||||
|
||||
escVal, e := url.PathUnescape(rawVal)
|
||||
if e != nil {
|
||||
err = errors.Join(err, fmt.Errorf("invalid header value: %s", rawVal))
|
||||
continue
|
||||
}
|
||||
val := strings.TrimSpace(escVal)
|
||||
|
||||
out[key] = val
|
||||
}
|
||||
return out, err
|
||||
}
|
||||
|
||||
// convDuration converts s into a duration of milliseconds. If s does not
|
||||
// contain an integer, 0 and an error are returned.
|
||||
func convDuration(s string) (time.Duration, error) {
|
||||
d, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// OTel durations are defined in milliseconds.
|
||||
return time.Duration(d) * time.Millisecond, nil
|
||||
}
|
||||
|
||||
// loadEnvTLS returns a resolver that loads a *tls.Config from files defeind by
|
||||
// the OTLP TLS environment variables. This will load both the rootCAs and
|
||||
// certificates used for mTLS.
|
||||
//
|
||||
// If the filepath defined is invalid or does not contain valid TLS files, an
|
||||
// error is passed to the OTel ErrorHandler and no TLS configuration is
|
||||
// provided.
|
||||
func loadEnvTLS[T *tls.Config]() resolver[T] {
|
||||
return func(s setting[T]) setting[T] {
|
||||
if s.Set {
|
||||
// Passed, valid, options have precedence.
|
||||
return s
|
||||
}
|
||||
|
||||
var rootCAs *x509.CertPool
|
||||
var err error
|
||||
for _, key := range envTLSCert {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
rootCAs, err = loadCertPool(v)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
var certs []tls.Certificate
|
||||
for _, pair := range envTLSClient {
|
||||
cert := os.Getenv(pair.Certificate)
|
||||
key := os.Getenv(pair.Key)
|
||||
if cert != "" && key != "" {
|
||||
var e error
|
||||
certs, e = loadCertificates(cert, key)
|
||||
err = errors.Join(err, e)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to load TLS: %w", err)
|
||||
otel.Handle(err)
|
||||
} else if rootCAs != nil || certs != nil {
|
||||
s.Set = true
|
||||
s.Value = &tls.Config{RootCAs: rootCAs, Certificates: certs}
|
||||
}
|
||||
return s
|
||||
}
|
||||
}
|
||||
|
||||
// readFile is used for testing.
|
||||
var readFile = os.ReadFile
|
||||
|
||||
// loadCertPool loads and returns the *x509.CertPool found at path if it exists
|
||||
// and is valid. Otherwise, nil and an error is returned.
|
||||
func loadCertPool(path string) (*x509.CertPool, error) {
|
||||
b, err := readFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cp := x509.NewCertPool()
|
||||
if ok := cp.AppendCertsFromPEM(b); !ok {
|
||||
return nil, errors.New("certificate not added")
|
||||
}
|
||||
return cp, nil
|
||||
}
|
||||
|
||||
// loadCertificates loads and returns the tls.Certificate found at path if it
|
||||
// exists and is valid. Otherwise, nil and an error is returned.
|
||||
func loadCertificates(certPath, keyPath string) ([]tls.Certificate, error) {
|
||||
cert, err := readFile(certPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
key, err := readFile(keyPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
crt, err := tls.X509KeyPair(cert, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return []tls.Certificate{crt}, nil
|
||||
}
|
||||
|
||||
func compressorToCompression(compressor string) Compression {
|
||||
c, err := convCompression(compressor)
|
||||
if err != nil {
|
||||
otel.Handle(fmt.Errorf("%s, using no compression as default", err))
|
||||
return NoCompression
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// setting is a configuration setting value.
|
||||
type setting[T any] struct {
|
||||
Value T
|
||||
Set bool
|
||||
}
|
||||
|
||||
// newSetting returns a new setting with the value set.
|
||||
func newSetting[T any](value T) setting[T] {
|
||||
return setting[T]{Value: value, Set: true}
|
||||
}
|
||||
|
||||
// resolver returns an updated setting after applying an resolution operation.
|
||||
type resolver[T any] func(setting[T]) setting[T]
|
||||
|
||||
// Resolve returns a resolved version of s.
|
||||
//
|
||||
// It will apply all the passed fn in the order provided, chaining together the
|
||||
// return setting to the next input. The setting s is used as the initial
|
||||
// argument to the first fn.
|
||||
//
|
||||
// Each fn needs to validate if it should apply given the Set state of the
|
||||
// setting. This will not perform any checks on the set state when chaining
|
||||
// function.
|
||||
func (s setting[T]) Resolve(fn ...resolver[T]) setting[T] {
|
||||
for _, f := range fn {
|
||||
s = f(s)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// getEnv returns a resolver that will apply an environment variable value
|
||||
// associated with the first set key to a setting value. The conv function is
|
||||
// used to convert between the environment variable value and the setting type.
|
||||
//
|
||||
// If the input setting to the resolver is set, the environment variable will
|
||||
// not be applied.
|
||||
//
|
||||
// Any error returned from conv is sent to the OTel ErrorHandler and the
|
||||
// setting will not be updated.
|
||||
func getEnv[T any](keys []string, conv func(string) (T, error)) resolver[T] {
|
||||
return func(s setting[T]) setting[T] {
|
||||
if s.Set {
|
||||
// Passed, valid, options have precedence.
|
||||
return s
|
||||
}
|
||||
|
||||
for _, key := range keys {
|
||||
if vStr := os.Getenv(key); vStr != "" {
|
||||
v, err := conv(vStr)
|
||||
if err == nil {
|
||||
s.Value = v
|
||||
s.Set = true
|
||||
break
|
||||
}
|
||||
otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, vStr, err))
|
||||
}
|
||||
}
|
||||
return s
|
||||
}
|
||||
}
|
||||
|
||||
// fallback returns a resolve that will set a setting value to val if it is not
|
||||
// already set.
|
||||
//
|
||||
// This is usually passed at the end of a resolver chain to ensure a default is
|
||||
// applied if the setting has not already been set.
|
||||
func fallback[T any](val T) resolver[T] {
|
||||
return func(s setting[T]) setting[T] {
|
||||
if !s.Set {
|
||||
s.Value = val
|
||||
s.Set = true
|
||||
}
|
||||
return s
|
||||
}
|
||||
}
|
||||
|
389
exporters/otlp/otlplog/otlploggrpc/config_test.go
Normal file
389
exporters/otlp/otlplog/otlploggrpc/config_test.go
Normal file
@ -0,0 +1,389 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package otlploggrpc
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
|
||||
)
|
||||
|
||||
const (
|
||||
weakCertificate = `
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIBhzCCASygAwIBAgIRANHpHgAWeTnLZpTSxCKs0ggwCgYIKoZIzj0EAwIwEjEQ
|
||||
MA4GA1UEChMHb3RlbC1nbzAeFw0yMTA0MDExMzU5MDNaFw0yMTA0MDExNDU5MDNa
|
||||
MBIxEDAOBgNVBAoTB290ZWwtZ28wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS9
|
||||
nWSkmPCxShxnp43F+PrOtbGV7sNfkbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0Z
|
||||
sJCLHGogQsYnWJBXUZOVo2MwYTAOBgNVHQ8BAf8EBAMCB4AwEwYDVR0lBAwwCgYI
|
||||
KwYBBQUHAwEwDAYDVR0TAQH/BAIwADAsBgNVHREEJTAjgglsb2NhbGhvc3SHEAAA
|
||||
AAAAAAAAAAAAAAAAAAGHBH8AAAEwCgYIKoZIzj0EAwIDSQAwRgIhANwZVVKvfvQ/
|
||||
1HXsTvgH+xTQswOwSSKYJ1cVHQhqK7ZbAiEAus8NxpTRnp5DiTMuyVmhVNPB+bVH
|
||||
Lhnm4N/QDk5rek0=
|
||||
-----END CERTIFICATE-----
|
||||
`
|
||||
weakPrivateKey = `
|
||||
-----BEGIN PRIVATE KEY-----
|
||||
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgN8HEXiXhvByrJ1zK
|
||||
SFT6Y2l2KqDWwWzKf+t4CyWrNKehRANCAAS9nWSkmPCxShxnp43F+PrOtbGV7sNf
|
||||
kbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0ZsJCLHGogQsYnWJBXUZOV
|
||||
-----END PRIVATE KEY-----
|
||||
`
|
||||
)
|
||||
|
||||
func newTLSConf(cert, key []byte) (*tls.Config, error) {
|
||||
cp := x509.NewCertPool()
|
||||
if ok := cp.AppendCertsFromPEM(cert); !ok {
|
||||
return nil, errors.New("failed to append certificate to the cert pool")
|
||||
}
|
||||
crt, err := tls.X509KeyPair(cert, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
crts := []tls.Certificate{crt}
|
||||
return &tls.Config{RootCAs: cp, Certificates: crts}, nil
|
||||
}
|
||||
|
||||
func TestNewConfig(t *testing.T) {
|
||||
orig := readFile
|
||||
readFile = func() func(name string) ([]byte, error) {
|
||||
index := map[string][]byte{
|
||||
"cert_path": []byte(weakCertificate),
|
||||
"key_path": []byte(weakPrivateKey),
|
||||
"invalid_cert": []byte("invalid certificate file."),
|
||||
"invalid_key": []byte("invalid key file."),
|
||||
}
|
||||
return func(name string) ([]byte, error) {
|
||||
b, ok := index[name]
|
||||
if !ok {
|
||||
err := fmt.Errorf("file does not exist: %s", name)
|
||||
return nil, err
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
}()
|
||||
t.Cleanup(func() { readFile = orig })
|
||||
|
||||
tlsCfg, err := newTLSConf([]byte(weakCertificate), []byte(weakPrivateKey))
|
||||
require.NoError(t, err, "testing TLS config")
|
||||
|
||||
headers := map[string]string{"a": "A"}
|
||||
rc := retry.Config{}
|
||||
|
||||
dialOptions := []grpc.DialOption{grpc.WithUserAgent("test-agent")}
|
||||
|
||||
testcases := []struct {
|
||||
name string
|
||||
options []Option
|
||||
envars map[string]string
|
||||
want config
|
||||
errs []string
|
||||
}{
|
||||
{
|
||||
name: "Defaults",
|
||||
want: config{
|
||||
endpoint: newSetting(defaultEndpoint),
|
||||
timeout: newSetting(defaultTimeout),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Options",
|
||||
options: []Option{
|
||||
WithInsecure(),
|
||||
WithEndpoint("test"),
|
||||
WithEndpointURL("http://test:8080/path"),
|
||||
WithReconnectionPeriod(time.Second),
|
||||
WithCompressor("gzip"),
|
||||
WithHeaders(headers),
|
||||
WithTLSCredentials(credentials.NewTLS(tlsCfg)),
|
||||
WithServiceConfig("{}"),
|
||||
WithDialOption(dialOptions...),
|
||||
WithGRPCConn(&grpc.ClientConn{}),
|
||||
WithTimeout(2 * time.Second),
|
||||
WithRetry(RetryConfig(rc)),
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("test:8080"),
|
||||
insecure: newSetting(true),
|
||||
headers: newSetting(headers),
|
||||
compression: newSetting(GzipCompression),
|
||||
timeout: newSetting(2 * time.Second),
|
||||
retryCfg: newSetting(rc),
|
||||
gRPCCredentials: newSetting(credentials.NewTLS(tlsCfg)),
|
||||
serviceConfig: newSetting("{}"),
|
||||
reconnectionPeriod: newSetting(time.Second),
|
||||
gRPCConn: newSetting(&grpc.ClientConn{}),
|
||||
dialOptions: newSetting(dialOptions),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "WithEndpointURL",
|
||||
options: []Option{
|
||||
WithEndpointURL("http://test:8080/path"),
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("test:8080"),
|
||||
insecure: newSetting(true),
|
||||
timeout: newSetting(defaultTimeout),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "EndpointPrecedence",
|
||||
options: []Option{
|
||||
WithEndpointURL("https://test:8080/path"),
|
||||
WithEndpoint("not-test:9090"),
|
||||
WithInsecure(),
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("not-test:9090"),
|
||||
insecure: newSetting(true),
|
||||
timeout: newSetting(defaultTimeout),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "EndpointURLPrecedence",
|
||||
options: []Option{
|
||||
WithEndpoint("not-test:9090"),
|
||||
WithInsecure(),
|
||||
WithEndpointURL("https://test:8080/path"),
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("test:8080"),
|
||||
insecure: newSetting(false),
|
||||
timeout: newSetting(defaultTimeout),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "LogEnvironmentVariables",
|
||||
envars: map[string]string{
|
||||
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path",
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("env.endpoint:8080"),
|
||||
insecure: newSetting(false),
|
||||
tlsCfg: newSetting(tlsCfg),
|
||||
headers: newSetting(headers),
|
||||
compression: newSetting(GzipCompression),
|
||||
timeout: newSetting(15 * time.Second),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "LogEnpointEnvironmentVariablesDefaultPath",
|
||||
envars: map[string]string{
|
||||
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "http://env.endpoint",
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("env.endpoint"),
|
||||
insecure: newSetting(true),
|
||||
timeout: newSetting(defaultTimeout),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "OTLPEnvironmentVariables",
|
||||
envars: map[string]string{
|
||||
"OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint:8080/prefix",
|
||||
"OTEL_EXPORTER_OTLP_HEADERS": "a=A",
|
||||
"OTEL_EXPORTER_OTLP_COMPRESSION": "none",
|
||||
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
|
||||
"OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path",
|
||||
"OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "cert_path",
|
||||
"OTEL_EXPORTER_OTLP_CLIENT_KEY": "key_path",
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("env.endpoint:8080"),
|
||||
insecure: newSetting(true),
|
||||
tlsCfg: newSetting(tlsCfg),
|
||||
headers: newSetting(headers),
|
||||
compression: newSetting(NoCompression),
|
||||
timeout: newSetting(15 * time.Second),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "OTLPEnpointEnvironmentVariablesDefaultPath",
|
||||
envars: map[string]string{
|
||||
"OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint",
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("env.endpoint"),
|
||||
insecure: newSetting(true),
|
||||
timeout: newSetting(defaultTimeout),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "EnvironmentVariablesPrecedence",
|
||||
envars: map[string]string{
|
||||
"OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt",
|
||||
"OTEL_EXPORTER_OTLP_HEADERS": "b=B",
|
||||
"OTEL_EXPORTER_OTLP_COMPRESSION": "none",
|
||||
"OTEL_EXPORTER_OTLP_TIMEOUT": "30000",
|
||||
"OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert",
|
||||
"OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert",
|
||||
"OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key",
|
||||
|
||||
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/path",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path",
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("env.endpoint:8080"),
|
||||
insecure: newSetting(false),
|
||||
tlsCfg: newSetting(tlsCfg),
|
||||
headers: newSetting(headers),
|
||||
compression: newSetting(GzipCompression),
|
||||
timeout: newSetting(15 * time.Second),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "OptionsPrecedence",
|
||||
envars: map[string]string{
|
||||
"OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt",
|
||||
"OTEL_EXPORTER_OTLP_HEADERS": "b=B",
|
||||
"OTEL_EXPORTER_OTLP_COMPRESSION": "none",
|
||||
"OTEL_EXPORTER_OTLP_TIMEOUT": "30000",
|
||||
"OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert",
|
||||
"OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert",
|
||||
"OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key",
|
||||
|
||||
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path",
|
||||
},
|
||||
options: []Option{
|
||||
WithEndpoint("foo"),
|
||||
WithEndpointURL("https://test/path"),
|
||||
WithInsecure(),
|
||||
WithTLSCredentials(credentials.NewTLS(tlsCfg)),
|
||||
WithCompressor("gzip"),
|
||||
WithHeaders(headers),
|
||||
WithTimeout(time.Second),
|
||||
WithRetry(RetryConfig(rc)),
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting("test"),
|
||||
insecure: newSetting(true),
|
||||
tlsCfg: newSetting(tlsCfg),
|
||||
headers: newSetting(headers),
|
||||
compression: newSetting(GzipCompression),
|
||||
timeout: newSetting(time.Second),
|
||||
retryCfg: newSetting(rc),
|
||||
gRPCCredentials: newSetting(credentials.NewTLS(tlsCfg)),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "InvalidEnvironmentVariables",
|
||||
envars: map[string]string{
|
||||
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "%invalid",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a,%ZZ=valid,key=%ZZ",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "xz",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "100 seconds",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "invalid_cert",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "invalid_cert",
|
||||
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "invalid_key",
|
||||
},
|
||||
want: config{
|
||||
endpoint: newSetting(defaultEndpoint),
|
||||
timeout: newSetting(defaultTimeout),
|
||||
retryCfg: newSetting(defaultRetryCfg),
|
||||
},
|
||||
errs: []string{
|
||||
`invalid OTEL_EXPORTER_OTLP_LOGS_ENDPOINT value %invalid: parse "%invalid": invalid URL escape "%in"`,
|
||||
`failed to load TLS:`,
|
||||
`certificate not added`,
|
||||
`tls: failed to find any PEM data in certificate input`,
|
||||
`invalid OTEL_EXPORTER_OTLP_LOGS_HEADERS value a,%ZZ=valid,key=%ZZ:`,
|
||||
`invalid header: a`,
|
||||
`invalid header key: %ZZ`,
|
||||
`invalid header value: %ZZ`,
|
||||
`invalid OTEL_EXPORTER_OTLP_LOGS_COMPRESSION value xz: unknown compression: xz`,
|
||||
`invalid OTEL_EXPORTER_OTLP_LOGS_TIMEOUT value 100 seconds: strconv.Atoi: parsing "100 seconds": invalid syntax`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
for key, value := range tc.envars {
|
||||
t.Setenv(key, value)
|
||||
}
|
||||
|
||||
var err error
|
||||
t.Cleanup(func(orig otel.ErrorHandler) func() {
|
||||
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(e error) {
|
||||
err = errors.Join(err, e)
|
||||
}))
|
||||
return func() { otel.SetErrorHandler(orig) }
|
||||
}(otel.GetErrorHandler()))
|
||||
c := newConfig(tc.options)
|
||||
|
||||
// Do not compare pointer values.
|
||||
assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg)
|
||||
var emptyTLS setting[*tls.Config]
|
||||
c.tlsCfg, tc.want.tlsCfg = emptyTLS, emptyTLS
|
||||
|
||||
assert.Equal(t, tc.want, c)
|
||||
|
||||
for _, errMsg := range tc.errs {
|
||||
assert.ErrorContains(t, err, errMsg)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func assertTLSConfig(t *testing.T, want, got setting[*tls.Config]) {
|
||||
t.Helper()
|
||||
|
||||
assert.Equal(t, want.Set, got.Set, "setting Set")
|
||||
if !want.Set {
|
||||
return
|
||||
}
|
||||
|
||||
if want.Value == nil {
|
||||
assert.Nil(t, got.Value, "*tls.Config")
|
||||
return
|
||||
}
|
||||
require.NotNil(t, got.Value, "*tls.Config")
|
||||
|
||||
if want.Value.RootCAs == nil {
|
||||
assert.Nil(t, got.Value.RootCAs, "*tls.Config.RootCAs")
|
||||
} else {
|
||||
if assert.NotNil(t, got.Value.RootCAs, "RootCAs") {
|
||||
assert.True(t, want.Value.RootCAs.Equal(got.Value.RootCAs), "RootCAs equal")
|
||||
}
|
||||
}
|
||||
assert.Equal(t, want.Value.Certificates, got.Value.Certificates, "Certificates")
|
||||
}
|
@ -5,6 +5,7 @@ go 1.21
|
||||
require (
|
||||
github.com/cenkalti/backoff/v4 v4.3.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
go.opentelemetry.io/otel v1.27.0
|
||||
go.opentelemetry.io/otel/sdk/log v0.3.0
|
||||
google.golang.org/grpc v1.64.0
|
||||
)
|
||||
@ -14,7 +15,6 @@ require (
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
go.opentelemetry.io/otel v1.27.0 // indirect
|
||||
go.opentelemetry.io/otel/log v0.3.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.27.0 // indirect
|
||||
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
|
||||
|
Loading…
x
Reference in New Issue
Block a user