diff --git a/exporters/otlp/otlplog/otlploghttp/config.go b/exporters/otlp/otlplog/otlploghttp/config.go index 6a414678c..5610434a7 100644 --- a/exporters/otlp/otlplog/otlploghttp/config.go +++ b/exporters/otlp/otlplog/otlploghttp/config.go @@ -5,10 +5,17 @@ package otlploghttp // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o import ( "crypto/tls" + "crypto/x509" + "errors" + "fmt" "net/http" "net/url" + "os" + "strconv" + "strings" "time" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry" "go.opentelemetry.io/otel/internal/global" ) @@ -22,6 +29,52 @@ var ( defaultRetryCfg = RetryConfig(retry.DefaultConfig) ) +// Environment variable keys. +var ( + envEndpoint = []string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", + "OTEL_EXPORTER_OTLP_ENDPOINT", + } + envInsecure = envEndpoint + + // Split because these are parsed differently. + envPathSignal = []string{"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT"} + envPathOTLP = []string{"OTEL_EXPORTER_OTLP_ENDPOINT"} + + envHeaders = []string{ + "OTEL_EXPORTER_OTLP_LOGS_HEADERS", + "OTEL_EXPORTER_OTLP_HEADERS", + } + + envCompression = []string{ + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION", + "OTEL_EXPORTER_OTLP_COMPRESSION", + } + + envTimeout = []string{ + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT", + "OTEL_EXPORTER_OTLP_TIMEOUT", + } + + envTLSCert = []string{ + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CERTIFICATE", + } + envTLSClient = []struct { + Certificate string + Key string + }{ + { + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY", + }, + { + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE", + "OTEL_EXPORTER_OTLP_CLIENT_KEY", + }, + } +) + // Option applies an option to the Exporter. type Option interface { applyHTTPOption(config) config @@ -50,12 +103,28 @@ func newConfig(options []Option) config { } c.endpoint = c.endpoint.Resolve( + getenv[string](envEndpoint, convEndpoint), fallback[string](defaultEndpoint), ) c.path = c.path.Resolve( + getenv[string](envPathSignal, convPathExact), + getenv[string](envPathOTLP, convPath), fallback[string](defaultPath), ) + c.insecure = c.insecure.Resolve( + getenv[bool](envInsecure, convInsecure), + ) + c.tlsCfg = c.tlsCfg.Resolve( + loadEnvTLS[*tls.Config](), + ) + c.headers = c.headers.Resolve( + getenv[map[string]string](envHeaders, convHeaders), + ) + c.compression = c.compression.Resolve( + getenv[Compression](envCompression, convCompression), + ) c.timeout = c.timeout.Resolve( + getenv[time.Duration](envTimeout, convDuration), fallback[time.Duration](defaultTimeout), ) c.proxy = c.proxy.Resolve( @@ -303,6 +372,219 @@ func (s setting[T]) Resolve(fn ...resolver[T]) setting[T] { return s } +// loadEnvTLS returns a resolver that loads a *tls.Config from files defeind by +// the OTLP TLS environment variables. This will load both the rootCAs and +// certificates used for mTLS. +// +// If the filepath defined is invalid or does not contain valid TLS files, an +// error is passed to the OTel ErrorHandler and no TLS configuration is +// provided. +func loadEnvTLS[T *tls.Config]() resolver[T] { + return func(s setting[T]) setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + var rootCAs *x509.CertPool + var err error + for _, key := range envTLSCert { + if v := os.Getenv(key); v != "" { + rootCAs, err = loadCertPool(v) + break + } + } + + var certs []tls.Certificate + for _, pair := range envTLSClient { + cert := os.Getenv(pair.Certificate) + key := os.Getenv(pair.Key) + if cert != "" && key != "" { + var e error + certs, e = loadCertificates(cert, key) + err = errors.Join(err, e) + break + } + } + + if err != nil { + err = fmt.Errorf("failed to load TLS: %w", err) + otel.Handle(err) + } else if rootCAs != nil || certs != nil { + s.Set = true + s.Value = &tls.Config{RootCAs: rootCAs, Certificates: certs} + } + return s + } +} + +// readFile is used for testing. +var readFile = os.ReadFile + +// loadCertPool loads and returns the *x509.CertPool found at path if it exists +// and is valid. Otherwise, nil and an error is returned. +func loadCertPool(path string) (*x509.CertPool, error) { + b, err := readFile(path) + if err != nil { + return nil, err + } + cp := x509.NewCertPool() + if ok := cp.AppendCertsFromPEM(b); !ok { + return nil, errors.New("certificate not added") + } + return cp, nil +} + +// loadCertPool loads and returns the tls.Certificate found at path if it +// exists and is valid. Otherwise, nil and an error is returned. +func loadCertificates(certPath, keyPath string) ([]tls.Certificate, error) { + cert, err := readFile(certPath) + if err != nil { + return nil, err + } + key, err := readFile(keyPath) + if err != nil { + return nil, err + } + crt, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + return []tls.Certificate{crt}, nil +} + +// getenv returns a resolver that will apply an environment variable value +// associated with the first set key to a setting value. The conv function is +// used to convert between the environment variable value and the setting type. +// +// If the input setting to the resolver is set, the environment variable will +// not be applied. +// +// Any error returned from conv is sent to the OTel ErrorHandler and the +// setting will not be updated. +func getenv[T any](keys []string, conv func(string) (T, error)) resolver[T] { + return func(s setting[T]) setting[T] { + if s.Set { + // Passed, valid, options have precedence. + return s + } + + for _, key := range keys { + if vStr := os.Getenv(key); vStr != "" { + v, err := conv(vStr) + if err == nil { + s.Value = v + s.Set = true + break + } + otel.Handle(fmt.Errorf("invalid %s value %s: %w", key, vStr, err)) + } + } + return s + } +} + +// convEndpoint converts s from a URL string to an endpoint if s is a valid +// URL. Otherwise, "" and an error are returned. +func convEndpoint(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + return u.Host, nil +} + +// convPathExact converts s from a URL string to the exact path if s is a valid +// URL. Otherwise, "" and an error are returned. +// +// If the path contained in s is empty, "/" is returned. +func convPathExact(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + if u.Path == "" { + return "/", nil + } + return u.Path, nil +} + +// convPath converts s from a URL string to an OTLP endpoint path if s is a +// valid URL. Otherwise, "" and an error are returned. +func convPath(s string) (string, error) { + u, err := url.Parse(s) + if err != nil { + return "", err + } + return u.Path + "/v1/logs", nil +} + +// convInsecure parses s as a URL string and returns if the connection should +// use client transport security or not. If s is an invalid URL, false and an +// error are returned. +func convInsecure(s string) (bool, error) { + u, err := url.Parse(s) + if err != nil { + return false, err + } + return u.Scheme != "https", nil +} + +// convHeaders converts the OTel environment variable header value s into a +// mapping of header key to value. If s is invalid a partial result and error +// are returned. +func convHeaders(s string) (map[string]string, error) { + out := make(map[string]string) + var err error + for _, header := range strings.Split(s, ",") { + rawKey, rawVal, found := strings.Cut(header, "=") + if !found { + err = errors.Join(err, fmt.Errorf("invalid header: %s", header)) + continue + } + + escKey, e := url.PathUnescape(rawKey) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header key: %s", rawKey)) + continue + } + key := strings.TrimSpace(escKey) + + escVal, e := url.PathUnescape(rawVal) + if e != nil { + err = errors.Join(err, fmt.Errorf("invalid header value: %s", rawVal)) + continue + } + val := strings.TrimSpace(escVal) + + out[key] = val + } + return out, err +} + +// convCompression returns the parsed compression encoded in s. NoCompression +// and an errors are returned if s is unknown. +func convCompression(s string) (Compression, error) { + switch s { + case "gzip": + return GzipCompression, nil + case "none", "": + return NoCompression, nil + } + return NoCompression, fmt.Errorf("unknown compression: %s", s) +} + +// convDuration converts s into a duration of milliseconds. If s does not +// contain an integer, 0 and an error are returned. +func convDuration(s string) (time.Duration, error) { + d, err := strconv.Atoi(s) + if err != nil { + return 0, err + } + // OTel durations are defined in milliseconds. + return time.Duration(d) * time.Millisecond, nil +} + // fallback returns a resolve that will set a setting value to val if it is not // already set. // diff --git a/exporters/otlp/otlplog/otlploghttp/config_test.go b/exporters/otlp/otlplog/otlploghttp/config_test.go index e28422e74..72d03d8ef 100644 --- a/exporters/otlp/otlplog/otlploghttp/config_test.go +++ b/exporters/otlp/otlplog/otlploghttp/config_test.go @@ -5,16 +5,79 @@ package otlploghttp import ( "crypto/tls" + "crypto/x509" + "errors" + "fmt" "net/http" "net/url" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" ) +const ( + weakCertificate = ` +-----BEGIN CERTIFICATE----- +MIIBhzCCASygAwIBAgIRANHpHgAWeTnLZpTSxCKs0ggwCgYIKoZIzj0EAwIwEjEQ +MA4GA1UEChMHb3RlbC1nbzAeFw0yMTA0MDExMzU5MDNaFw0yMTA0MDExNDU5MDNa +MBIxEDAOBgNVBAoTB290ZWwtZ28wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS9 +nWSkmPCxShxnp43F+PrOtbGV7sNfkbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0Z +sJCLHGogQsYnWJBXUZOVo2MwYTAOBgNVHQ8BAf8EBAMCB4AwEwYDVR0lBAwwCgYI +KwYBBQUHAwEwDAYDVR0TAQH/BAIwADAsBgNVHREEJTAjgglsb2NhbGhvc3SHEAAA +AAAAAAAAAAAAAAAAAAGHBH8AAAEwCgYIKoZIzj0EAwIDSQAwRgIhANwZVVKvfvQ/ +1HXsTvgH+xTQswOwSSKYJ1cVHQhqK7ZbAiEAus8NxpTRnp5DiTMuyVmhVNPB+bVH +Lhnm4N/QDk5rek0= +-----END CERTIFICATE----- +` + weakPrivateKey = ` +-----BEGIN PRIVATE KEY----- +MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgN8HEXiXhvByrJ1zK +SFT6Y2l2KqDWwWzKf+t4CyWrNKehRANCAAS9nWSkmPCxShxnp43F+PrOtbGV7sNf +kbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0ZsJCLHGogQsYnWJBXUZOV +-----END PRIVATE KEY----- +` +) + +func newTLSConf(cert, key []byte) (*tls.Config, error) { + cp := x509.NewCertPool() + if ok := cp.AppendCertsFromPEM(cert); !ok { + return nil, errors.New("failed to append certificate to the cert pool") + } + crt, err := tls.X509KeyPair(cert, key) + if err != nil { + return nil, err + } + crts := []tls.Certificate{crt} + return &tls.Config{RootCAs: cp, Certificates: crts}, nil +} + func TestNewConfig(t *testing.T) { - tlsCfg := &tls.Config{} + orig := readFile + readFile = func() func(name string) ([]byte, error) { + index := map[string][]byte{ + "cert_path": []byte(weakCertificate), + "key_path": []byte(weakPrivateKey), + "invalid_cert": []byte("invalid certificate file."), + "invalid_key": []byte("invalid key file."), + } + return func(name string) ([]byte, error) { + b, ok := index[name] + if !ok { + err := fmt.Errorf("file does not exist: %s", name) + return nil, err + } + return b, nil + } + }() + t.Cleanup(func() { readFile = orig }) + + tlsCfg, err := newTLSConf([]byte(weakCertificate), []byte(weakPrivateKey)) + require.NoError(t, err, "testing TLS config") + headers := map[string]string{"a": "A"} rc := RetryConfig{} @@ -23,6 +86,7 @@ func TestNewConfig(t *testing.T) { options []Option envars map[string]string want config + errs []string }{ { name: "Defaults", @@ -102,6 +166,176 @@ func TestNewConfig(t *testing.T) { retryCfg: newSetting(defaultRetryCfg), }, }, + { + name: "LogEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + want: config{ + endpoint: newSetting("env.endpoint:8080"), + path: newSetting("/prefix"), + insecure: newSetting(false), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "LogEnpointEnvironmentVariablesDefaultPath", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "http://env.endpoint", + }, + want: config{ + endpoint: newSetting("env.endpoint"), + path: newSetting("/"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "OTLPEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "key_path", + }, + want: config{ + endpoint: newSetting("env.endpoint:8080"), + path: newSetting("/prefix/v1/logs"), + insecure: newSetting(true), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(NoCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "OTLPEnpointEnvironmentVariablesDefaultPath", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint", + }, + want: config{ + endpoint: newSetting("env.endpoint"), + path: newSetting(defaultPath), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "EnvironmentVariablesPrecedence", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt", + "OTEL_EXPORTER_OTLP_HEADERS": "b=B", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "30000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key", + + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/path", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + want: config{ + endpoint: newSetting("env.endpoint:8080"), + path: newSetting("/path"), + insecure: newSetting(false), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(15 * time.Second), + retryCfg: newSetting(defaultRetryCfg), + }, + }, + { + name: "OptionsPrecedence", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_ENDPOINT": "http://ignored:9090/alt", + "OTEL_EXPORTER_OTLP_HEADERS": "b=B", + "OTEL_EXPORTER_OTLP_COMPRESSION": "none", + "OTEL_EXPORTER_OTLP_TIMEOUT": "30000", + "OTEL_EXPORTER_OTLP_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_CLIENT_KEY": "invalid_key", + + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "https://env.endpoint:8080/prefix", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a=A", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "gzip", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "15000", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "cert_path", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "key_path", + }, + options: []Option{ + WithEndpoint("test"), + WithURLPath("/path"), + WithInsecure(), + WithTLSClientConfig(tlsCfg), + WithCompression(GzipCompression), + WithHeaders(headers), + WithTimeout(time.Second), + WithRetry(rc), + }, + want: config{ + endpoint: newSetting("test"), + path: newSetting("/path"), + insecure: newSetting(true), + tlsCfg: newSetting(tlsCfg), + headers: newSetting(headers), + compression: newSetting(GzipCompression), + timeout: newSetting(time.Second), + retryCfg: newSetting(rc), + }, + }, + { + name: "InvalidEnvironmentVariables", + envars: map[string]string{ + "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "%invalid", + "OTEL_EXPORTER_OTLP_LOGS_HEADERS": "a,%ZZ=valid,key=%ZZ", + "OTEL_EXPORTER_OTLP_LOGS_COMPRESSION": "xz", + "OTEL_EXPORTER_OTLP_LOGS_TIMEOUT": "100 seconds", + "OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_CERTIFICATE": "invalid_cert", + "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "invalid_key", + }, + want: config{ + endpoint: newSetting(defaultEndpoint), + path: newSetting(defaultPath), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + }, + errs: []string{ + `invalid OTEL_EXPORTER_OTLP_LOGS_ENDPOINT value %invalid: parse "%invalid": invalid URL escape "%in"`, + `failed to load TLS:`, + `certificate not added`, + `tls: failed to find any PEM data in certificate input`, + `invalid OTEL_EXPORTER_OTLP_LOGS_HEADERS value a,%ZZ=valid,key=%ZZ:`, + `invalid header: a`, + `invalid header key: %ZZ`, + `invalid header value: %ZZ`, + `invalid OTEL_EXPORTER_OTLP_LOGS_COMPRESSION value xz: unknown compression: xz`, + `invalid OTEL_EXPORTER_OTLP_LOGS_TIMEOUT value 100 seconds: strconv.Atoi: parsing "100 seconds": invalid syntax`, + }, + }, } for _, tc := range testcases { @@ -109,14 +343,57 @@ func TestNewConfig(t *testing.T) { for key, value := range tc.envars { t.Setenv(key, value) } + + var err error + t.Cleanup(func(orig otel.ErrorHandler) func() { + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(e error) { + err = errors.Join(err, e) + })) + return func() { otel.SetErrorHandler(orig) } + }(otel.GetErrorHandler())) c := newConfig(tc.options) - // Cannot compare funcs + + // Do not compare pointer values. + assertTLSConfig(t, tc.want.tlsCfg, c.tlsCfg) + var emptyTLS setting[*tls.Config] + c.tlsCfg, tc.want.tlsCfg = emptyTLS, emptyTLS + + // Cannot compare funcs, see TestWithProxy. c.proxy = setting[HTTPTransportProxyFunc]{} + assert.Equal(t, tc.want, c) + + for _, errMsg := range tc.errs { + assert.ErrorContains(t, err, errMsg) + } }) } } +func assertTLSConfig(t *testing.T, want, got setting[*tls.Config]) { + t.Helper() + + assert.Equal(t, want.Set, got.Set, "setting Set") + if !want.Set { + return + } + + if want.Value == nil { + assert.Nil(t, got.Value, "*tls.Config") + return + } + require.NotNil(t, got.Value, "*tls.Config") + + if want.Value.RootCAs == nil { + assert.Nil(t, got.Value.RootCAs, "*tls.Config.RootCAs") + } else { + if assert.NotNil(t, got.Value.RootCAs, "RootCAs") { + assert.True(t, want.Value.RootCAs.Equal(got.Value.RootCAs), "RootCAs equal") + } + } + assert.Equal(t, want.Value.Certificates, got.Value.Certificates, "Certificates") +} + func TestWithProxy(t *testing.T) { proxy := func(*http.Request) (*url.URL, error) { return nil, nil } opts := []Option{WithProxy(HTTPTransportProxyFunc(proxy))}