1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-18 03:22:12 +02:00

Decouple otlp/otlptrace/otlptracegrpc from otlp/internal and otlp/otlptrace/internal using gotmpl (#4400)

* Use template retry pkg in otlpconfig

* Template out all otlptrace internal

* Add envconfig pkg to otlptrace/internal

* Generate otlptrace/internal/otlpconfig

* Revert templatizing otlptracegrpc

* Add changes to changelog

* Add partialsuccess to internal shared

* Use gotmpl to generate otlptracegrpc/internal

* Add changes to changelog
This commit is contained in:
Tyler Yahn 2023-08-03 13:51:45 -07:00 committed by GitHub
parent 58c9cf65d6
commit 2e6ca0af0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 2735 additions and 9 deletions

View File

@ -54,6 +54,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Fix possible panic, deadlock and race condition in batch span processor in `go.opentelemetry.io/otel/sdk/trace`. (#4353)
- Improve context cancelation handling in batch span processor's `ForceFlush` in `go.opentelemetry.io/otel/sdk/trace`. (#4369)
- Decouple `go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal` from `go.opentelemetry.io/otel/exporters/otlp/internal` using gotmpl. (#4397, #3846)
- Decouple `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal` from `go.opentelemetry.io/otel/exporters/otlp/internal` and `go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal` using gotmpl. (#4400, #3846)
- Decouple `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal` from `go.opentelemetry.io/otel/exporters/otlp/internal` and `go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal` using gotmpl. (#4401, #3846)
- Do not block the metric SDK when OTLP metric exports are blocked in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` and `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#3925, #4395)

View File

@ -27,10 +27,10 @@ import (
"google.golang.org/grpc/status"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
)

View File

@ -34,8 +34,8 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlptracetest"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"

View File

@ -3,10 +3,12 @@ module go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc
go 1.19
require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.opentelemetry.io/proto/otlp v1.0.0
go.uber.org/goleak v1.2.1
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc
@ -15,7 +17,6 @@ require (
)
require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
@ -23,7 +24,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.9.0 // indirect

View File

@ -0,0 +1,202 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/envconfig/envconfig.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package envconfig // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/envconfig"
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"time"
"go.opentelemetry.io/otel/internal/global"
)
// ConfigFn is the generic function used to set a config.
type ConfigFn func(*EnvOptionsReader)
// EnvOptionsReader reads the required environment variables.
type EnvOptionsReader struct {
GetEnv func(string) string
ReadFile func(string) ([]byte, error)
Namespace string
}
// Apply runs every ConfigFn.
func (e *EnvOptionsReader) Apply(opts ...ConfigFn) {
for _, o := range opts {
o(e)
}
}
// GetEnvValue gets an OTLP environment variable value of the specified key
// using the GetEnv function.
// This function prepends the OTLP specified namespace to all key lookups.
func (e *EnvOptionsReader) GetEnvValue(key string) (string, bool) {
v := strings.TrimSpace(e.GetEnv(keyWithNamespace(e.Namespace, key)))
return v, v != ""
}
// WithString retrieves the specified config and passes it to ConfigFn as a string.
func WithString(n string, fn func(string)) func(e *EnvOptionsReader) {
return func(e *EnvOptionsReader) {
if v, ok := e.GetEnvValue(n); ok {
fn(v)
}
}
}
// WithBool returns a ConfigFn that reads the environment variable n and if it exists passes its parsed bool value to fn.
func WithBool(n string, fn func(bool)) ConfigFn {
return func(e *EnvOptionsReader) {
if v, ok := e.GetEnvValue(n); ok {
b := strings.ToLower(v) == "true"
fn(b)
}
}
}
// WithDuration retrieves the specified config and passes it to ConfigFn as a duration.
func WithDuration(n string, fn func(time.Duration)) func(e *EnvOptionsReader) {
return func(e *EnvOptionsReader) {
if v, ok := e.GetEnvValue(n); ok {
d, err := strconv.Atoi(v)
if err != nil {
global.Error(err, "parse duration", "input", v)
return
}
fn(time.Duration(d) * time.Millisecond)
}
}
}
// WithHeaders retrieves the specified config and passes it to ConfigFn as a map of HTTP headers.
func WithHeaders(n string, fn func(map[string]string)) func(e *EnvOptionsReader) {
return func(e *EnvOptionsReader) {
if v, ok := e.GetEnvValue(n); ok {
fn(stringToHeader(v))
}
}
}
// WithURL retrieves the specified config and passes it to ConfigFn as a net/url.URL.
func WithURL(n string, fn func(*url.URL)) func(e *EnvOptionsReader) {
return func(e *EnvOptionsReader) {
if v, ok := e.GetEnvValue(n); ok {
u, err := url.Parse(v)
if err != nil {
global.Error(err, "parse url", "input", v)
return
}
fn(u)
}
}
}
// WithCertPool returns a ConfigFn that reads the environment variable n as a filepath to a TLS certificate pool. If it exists, it is parsed as a crypto/x509.CertPool and it is passed to fn.
func WithCertPool(n string, fn func(*x509.CertPool)) ConfigFn {
return func(e *EnvOptionsReader) {
if v, ok := e.GetEnvValue(n); ok {
b, err := e.ReadFile(v)
if err != nil {
global.Error(err, "read tls ca cert file", "file", v)
return
}
c, err := createCertPool(b)
if err != nil {
global.Error(err, "create tls cert pool")
return
}
fn(c)
}
}
}
// WithClientCert returns a ConfigFn that reads the environment variable nc and nk as filepaths to a client certificate and key pair. If they exists, they are parsed as a crypto/tls.Certificate and it is passed to fn.
func WithClientCert(nc, nk string, fn func(tls.Certificate)) ConfigFn {
return func(e *EnvOptionsReader) {
vc, okc := e.GetEnvValue(nc)
vk, okk := e.GetEnvValue(nk)
if !okc || !okk {
return
}
cert, err := e.ReadFile(vc)
if err != nil {
global.Error(err, "read tls client cert", "file", vc)
return
}
key, err := e.ReadFile(vk)
if err != nil {
global.Error(err, "read tls client key", "file", vk)
return
}
crt, err := tls.X509KeyPair(cert, key)
if err != nil {
global.Error(err, "create tls client key pair")
return
}
fn(crt)
}
}
func keyWithNamespace(ns, key string) string {
if ns == "" {
return key
}
return fmt.Sprintf("%s_%s", ns, key)
}
func stringToHeader(value string) map[string]string {
headersPairs := strings.Split(value, ",")
headers := make(map[string]string)
for _, header := range headersPairs {
n, v, found := strings.Cut(header, "=")
if !found {
global.Error(errors.New("missing '="), "parse headers", "input", header)
continue
}
name, err := url.QueryUnescape(n)
if err != nil {
global.Error(err, "escape header key", "key", n)
continue
}
trimmedName := strings.TrimSpace(name)
value, err := url.QueryUnescape(v)
if err != nil {
global.Error(err, "escape header value", "value", v)
continue
}
trimmedValue := strings.TrimSpace(value)
headers[trimmedName] = trimmedValue
}
return headers
}
func createCertPool(certBytes []byte) (*x509.CertPool, error) {
cp := x509.NewCertPool()
if ok := cp.AppendCertsFromPEM(certBytes); !ok {
return nil, errors.New("failed to append certificate to the cert pool")
}
return cp, nil
}

View File

@ -0,0 +1,464 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/envconfig/envconfig_test.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package envconfig
import (
"crypto/tls"
"crypto/x509"
"errors"
"net/url"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
const WeakKey = `
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIEbrSPmnlSOXvVzxCyv+VR3a0HDeUTvOcqrdssZ2k4gFoAoGCCqGSM49
AwEHoUQDQgAEDMTfv75J315C3K9faptS9iythKOMEeV/Eep73nWX531YAkmmwBSB
2dXRD/brsgLnfG57WEpxZuY7dPRbxu33BA==
-----END EC PRIVATE KEY-----
`
const WeakCertificate = `
-----BEGIN CERTIFICATE-----
MIIBjjCCATWgAwIBAgIUKQSMC66MUw+kPp954ZYOcyKAQDswCgYIKoZIzj0EAwIw
EjEQMA4GA1UECgwHb3RlbC1nbzAeFw0yMjEwMTkwMDA5MTlaFw0yMzEwMTkwMDA5
MTlaMBIxEDAOBgNVBAoMB290ZWwtZ28wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNC
AAQMxN+/vknfXkLcr19qm1L2LK2Eo4wR5X8R6nvedZfnfVgCSabAFIHZ1dEP9uuy
Aud8bntYSnFm5jt09FvG7fcEo2kwZzAdBgNVHQ4EFgQUicGuhnTTkYLZwofXMNLK
SHFeCWgwHwYDVR0jBBgwFoAUicGuhnTTkYLZwofXMNLKSHFeCWgwDwYDVR0TAQH/
BAUwAwEB/zAUBgNVHREEDTALgglsb2NhbGhvc3QwCgYIKoZIzj0EAwIDRwAwRAIg
Lfma8FnnxeSOi6223AsFfYwsNZ2RderNsQrS0PjEHb0CIBkrWacqARUAu7uT4cGu
jVcIxYQqhId5L8p/mAv2PWZS
-----END CERTIFICATE-----
`
type testOption struct {
TestString string
TestBool bool
TestDuration time.Duration
TestHeaders map[string]string
TestURL *url.URL
TestTLS *tls.Config
}
func TestEnvConfig(t *testing.T) {
parsedURL, err := url.Parse("https://example.com")
assert.NoError(t, err)
options := []testOption{}
for _, testcase := range []struct {
name string
reader EnvOptionsReader
configs []ConfigFn
expectedOptions []testOption
}{
{
name: "with no namespace and a matching key",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "world"
}
return ""
},
},
configs: []ConfigFn{
WithString("HELLO", func(v string) {
options = append(options, testOption{TestString: v})
}),
},
expectedOptions: []testOption{
{
TestString: "world",
},
},
},
{
name: "with no namespace and a non-matching key",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "world"
}
return ""
},
},
configs: []ConfigFn{
WithString("HOLA", func(v string) {
options = append(options, testOption{TestString: v})
}),
},
expectedOptions: []testOption{},
},
{
name: "with a namespace and a matching key",
reader: EnvOptionsReader{
Namespace: "MY_NAMESPACE",
GetEnv: func(n string) string {
if n == "MY_NAMESPACE_HELLO" {
return "world"
}
return ""
},
},
configs: []ConfigFn{
WithString("HELLO", func(v string) {
options = append(options, testOption{TestString: v})
}),
},
expectedOptions: []testOption{
{
TestString: "world",
},
},
},
{
name: "with no namespace and a non-matching key",
reader: EnvOptionsReader{
Namespace: "MY_NAMESPACE",
GetEnv: func(n string) string {
if n == "HELLO" {
return "world"
}
return ""
},
},
configs: []ConfigFn{
WithString("HELLO", func(v string) {
options = append(options, testOption{TestString: v})
}),
},
expectedOptions: []testOption{},
},
{
name: "with a bool config",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "true"
} else if n == "WORLD" {
return "false"
}
return ""
},
},
configs: []ConfigFn{
WithBool("HELLO", func(b bool) {
options = append(options, testOption{TestBool: b})
}),
WithBool("WORLD", func(b bool) {
options = append(options, testOption{TestBool: b})
}),
},
expectedOptions: []testOption{
{
TestBool: true,
},
{
TestBool: false,
},
},
},
{
name: "with an invalid bool config",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "world"
}
return ""
},
},
configs: []ConfigFn{
WithBool("HELLO", func(b bool) {
options = append(options, testOption{TestBool: b})
}),
},
expectedOptions: []testOption{
{
TestBool: false,
},
},
},
{
name: "with a duration config",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "60"
}
return ""
},
},
configs: []ConfigFn{
WithDuration("HELLO", func(v time.Duration) {
options = append(options, testOption{TestDuration: v})
}),
},
expectedOptions: []testOption{
{
TestDuration: 60_000_000, // 60 milliseconds
},
},
},
{
name: "with an invalid duration config",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "world"
}
return ""
},
},
configs: []ConfigFn{
WithDuration("HELLO", func(v time.Duration) {
options = append(options, testOption{TestDuration: v})
}),
},
expectedOptions: []testOption{},
},
{
name: "with headers",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "userId=42,userName=alice"
}
return ""
},
},
configs: []ConfigFn{
WithHeaders("HELLO", func(v map[string]string) {
options = append(options, testOption{TestHeaders: v})
}),
},
expectedOptions: []testOption{
{
TestHeaders: map[string]string{
"userId": "42",
"userName": "alice",
},
},
},
},
{
name: "with invalid headers",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "world"
}
return ""
},
},
configs: []ConfigFn{
WithHeaders("HELLO", func(v map[string]string) {
options = append(options, testOption{TestHeaders: v})
}),
},
expectedOptions: []testOption{
{
TestHeaders: map[string]string{},
},
},
},
{
name: "with URL",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "https://example.com"
}
return ""
},
},
configs: []ConfigFn{
WithURL("HELLO", func(v *url.URL) {
options = append(options, testOption{TestURL: v})
}),
},
expectedOptions: []testOption{
{
TestURL: parsedURL,
},
},
},
{
name: "with invalid URL",
reader: EnvOptionsReader{
GetEnv: func(n string) string {
if n == "HELLO" {
return "i nvalid://url"
}
return ""
},
},
configs: []ConfigFn{
WithURL("HELLO", func(v *url.URL) {
options = append(options, testOption{TestURL: v})
}),
},
expectedOptions: []testOption{},
},
} {
t.Run(testcase.name, func(t *testing.T) {
testcase.reader.Apply(testcase.configs...)
assert.Equal(t, testcase.expectedOptions, options)
options = []testOption{}
})
}
}
func TestWithTLSConfig(t *testing.T) {
pool, err := createCertPool([]byte(WeakCertificate))
assert.NoError(t, err)
reader := EnvOptionsReader{
GetEnv: func(n string) string {
if n == "CERTIFICATE" {
return "/path/cert.pem"
}
return ""
},
ReadFile: func(p string) ([]byte, error) {
if p == "/path/cert.pem" {
return []byte(WeakCertificate), nil
}
return []byte{}, nil
},
}
var option testOption
reader.Apply(
WithCertPool("CERTIFICATE", func(cp *x509.CertPool) {
option = testOption{TestTLS: &tls.Config{RootCAs: cp}}
}),
)
// nolint:staticcheck // ignoring tlsCert.RootCAs.Subjects is deprecated ERR because cert does not come from SystemCertPool.
assert.Equal(t, pool.Subjects(), option.TestTLS.RootCAs.Subjects())
}
func TestWithClientCert(t *testing.T) {
cert, err := tls.X509KeyPair([]byte(WeakCertificate), []byte(WeakKey))
assert.NoError(t, err)
reader := EnvOptionsReader{
GetEnv: func(n string) string {
switch n {
case "CLIENT_CERTIFICATE":
return "/path/tls.crt"
case "CLIENT_KEY":
return "/path/tls.key"
}
return ""
},
ReadFile: func(n string) ([]byte, error) {
switch n {
case "/path/tls.crt":
return []byte(WeakCertificate), nil
case "/path/tls.key":
return []byte(WeakKey), nil
}
return []byte{}, nil
},
}
var option testOption
reader.Apply(
WithClientCert("CLIENT_CERTIFICATE", "CLIENT_KEY", func(c tls.Certificate) {
option = testOption{TestTLS: &tls.Config{Certificates: []tls.Certificate{c}}}
}),
)
assert.Equal(t, cert, option.TestTLS.Certificates[0])
reader.ReadFile = func(s string) ([]byte, error) { return nil, errors.New("oops") }
option.TestTLS = nil
reader.Apply(
WithClientCert("CLIENT_CERTIFICATE", "CLIENT_KEY", func(c tls.Certificate) {
option = testOption{TestTLS: &tls.Config{Certificates: []tls.Certificate{c}}}
}),
)
assert.Nil(t, option.TestTLS)
reader.GetEnv = func(s string) string { return "" }
option.TestTLS = nil
reader.Apply(
WithClientCert("CLIENT_CERTIFICATE", "CLIENT_KEY", func(c tls.Certificate) {
option = testOption{TestTLS: &tls.Config{Certificates: []tls.Certificate{c}}}
}),
)
assert.Nil(t, option.TestTLS)
}
func TestStringToHeader(t *testing.T) {
tests := []struct {
name string
value string
want map[string]string
}{
{
name: "simple test",
value: "userId=alice",
want: map[string]string{"userId": "alice"},
},
{
name: "simple test with spaces",
value: " userId = alice ",
want: map[string]string{"userId": "alice"},
},
{
name: "multiples headers encoded",
value: "userId=alice,serverNode=DF%3A28,isProduction=false",
want: map[string]string{
"userId": "alice",
"serverNode": "DF:28",
"isProduction": "false",
},
},
{
name: "invalid headers format",
value: "userId:alice",
want: map[string]string{},
},
{
name: "invalid key",
value: "%XX=missing,userId=alice",
want: map[string]string{
"userId": "alice",
},
},
{
name: "invalid value",
value: "missing=%XX,userId=alice",
want: map[string]string{
"userId": "alice",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, stringToHeader(tt.value))
})
}
}

View File

@ -0,0 +1,35 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
//go:generate gotmpl --body=../../../../../internal/shared/otlp/partialsuccess.go.tmpl "--data={}" --out=partialsuccess.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/partialsuccess_test.go.tmpl "--data={}" --out=partialsuccess_test.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry.go.tmpl "--data={}" --out=retry/retry.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry_test.go.tmpl "--data={}" --out=retry/retry_test.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/envconfig/envconfig.go.tmpl "--data={}" --out=envconfig/envconfig.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/envconfig/envconfig_test.go.tmpl "--data={}" --out=envconfig/envconfig_test.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlpconfig/envconfig.go.tmpl "--data={\"envconfigImportPath\": \"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/envconfig\"}" --out=otlpconfig/envconfig.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlpconfig/options.go.tmpl "--data={\"retryImportPath\": \"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry\"}" --out=otlpconfig/options.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlpconfig/options_test.go.tmpl "--data={\"envconfigImportPath\": \"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/envconfig\"}" --out=otlpconfig/options_test.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlpconfig/optiontypes.go.tmpl "--data={}" --out=otlpconfig/optiontypes.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlpconfig/tls.go.tmpl "--data={}" --out=otlpconfig/tls.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlptracetest/client.go.tmpl "--data={}" --out=otlptracetest/client.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlptracetest/collector.go.tmpl "--data={}" --out=otlptracetest/collector.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlptracetest/data.go.tmpl "--data={}" --out=otlptracetest/data.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlptracetest/otlptest.go.tmpl "--data={}" --out=otlptracetest/otlptest.go

View File

@ -0,0 +1,153 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/otlptrace/otlpconfig/envconfig.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
import (
"crypto/tls"
"crypto/x509"
"net/url"
"os"
"path"
"strings"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/envconfig"
)
// DefaultEnvOptionsReader is the default environments reader.
var DefaultEnvOptionsReader = envconfig.EnvOptionsReader{
GetEnv: os.Getenv,
ReadFile: os.ReadFile,
Namespace: "OTEL_EXPORTER_OTLP",
}
// ApplyGRPCEnvConfigs applies the env configurations for gRPC.
func ApplyGRPCEnvConfigs(cfg Config) Config {
opts := getOptionsFromEnv()
for _, opt := range opts {
cfg = opt.ApplyGRPCOption(cfg)
}
return cfg
}
// ApplyHTTPEnvConfigs applies the env configurations for HTTP.
func ApplyHTTPEnvConfigs(cfg Config) Config {
opts := getOptionsFromEnv()
for _, opt := range opts {
cfg = opt.ApplyHTTPOption(cfg)
}
return cfg
}
func getOptionsFromEnv() []GenericOption {
opts := []GenericOption{}
tlsConf := &tls.Config{}
DefaultEnvOptionsReader.Apply(
envconfig.WithURL("ENDPOINT", func(u *url.URL) {
opts = append(opts, withEndpointScheme(u))
opts = append(opts, newSplitOption(func(cfg Config) Config {
cfg.Traces.Endpoint = u.Host
// For OTLP/HTTP endpoint URLs without a per-signal
// configuration, the passed endpoint is used as a base URL
// and the signals are sent to these paths relative to that.
cfg.Traces.URLPath = path.Join(u.Path, DefaultTracesPath)
return cfg
}, withEndpointForGRPC(u)))
}),
envconfig.WithURL("TRACES_ENDPOINT", func(u *url.URL) {
opts = append(opts, withEndpointScheme(u))
opts = append(opts, newSplitOption(func(cfg Config) Config {
cfg.Traces.Endpoint = u.Host
// For endpoint URLs for OTLP/HTTP per-signal variables, the
// URL MUST be used as-is without any modification. The only
// exception is that if an URL contains no path part, the root
// path / MUST be used.
path := u.Path
if path == "" {
path = "/"
}
cfg.Traces.URLPath = path
return cfg
}, withEndpointForGRPC(u)))
}),
envconfig.WithCertPool("CERTIFICATE", func(p *x509.CertPool) { tlsConf.RootCAs = p }),
envconfig.WithCertPool("TRACES_CERTIFICATE", func(p *x509.CertPool) { tlsConf.RootCAs = p }),
envconfig.WithClientCert("CLIENT_CERTIFICATE", "CLIENT_KEY", func(c tls.Certificate) { tlsConf.Certificates = []tls.Certificate{c} }),
envconfig.WithClientCert("TRACES_CLIENT_CERTIFICATE", "TRACES_CLIENT_KEY", func(c tls.Certificate) { tlsConf.Certificates = []tls.Certificate{c} }),
withTLSConfig(tlsConf, func(c *tls.Config) { opts = append(opts, WithTLSClientConfig(c)) }),
envconfig.WithBool("INSECURE", func(b bool) { opts = append(opts, withInsecure(b)) }),
envconfig.WithBool("TRACES_INSECURE", func(b bool) { opts = append(opts, withInsecure(b)) }),
envconfig.WithHeaders("HEADERS", func(h map[string]string) { opts = append(opts, WithHeaders(h)) }),
envconfig.WithHeaders("TRACES_HEADERS", func(h map[string]string) { opts = append(opts, WithHeaders(h)) }),
WithEnvCompression("COMPRESSION", func(c Compression) { opts = append(opts, WithCompression(c)) }),
WithEnvCompression("TRACES_COMPRESSION", func(c Compression) { opts = append(opts, WithCompression(c)) }),
envconfig.WithDuration("TIMEOUT", func(d time.Duration) { opts = append(opts, WithTimeout(d)) }),
envconfig.WithDuration("TRACES_TIMEOUT", func(d time.Duration) { opts = append(opts, WithTimeout(d)) }),
)
return opts
}
func withEndpointScheme(u *url.URL) GenericOption {
switch strings.ToLower(u.Scheme) {
case "http", "unix":
return WithInsecure()
default:
return WithSecure()
}
}
func withEndpointForGRPC(u *url.URL) func(cfg Config) Config {
return func(cfg Config) Config {
// For OTLP/gRPC endpoints, this is the target to which the
// exporter is going to send telemetry.
cfg.Traces.Endpoint = path.Join(u.Host, u.Path)
return cfg
}
}
// WithEnvCompression retrieves the specified config and passes it to ConfigFn as a Compression.
func WithEnvCompression(n string, fn func(Compression)) func(e *envconfig.EnvOptionsReader) {
return func(e *envconfig.EnvOptionsReader) {
if v, ok := e.GetEnvValue(n); ok {
cp := NoCompression
if v == "gzip" {
cp = GzipCompression
}
fn(cp)
}
}
}
// revive:disable-next-line:flag-parameter
func withInsecure(b bool) GenericOption {
if b {
return WithInsecure()
}
return WithSecure()
}
func withTLSConfig(c *tls.Config, fn func(*tls.Config)) func(e *envconfig.EnvOptionsReader) {
return func(e *envconfig.EnvOptionsReader) {
if c.RootCAs != nil || len(c.Certificates) > 0 {
fn(c)
}
}
}

View File

@ -0,0 +1,328 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/otlptrace/otlpconfig/options.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
import (
"crypto/tls"
"fmt"
"path"
"strings"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
)
const (
// DefaultTracesPath is a default URL path for endpoint that
// receives spans.
DefaultTracesPath string = "/v1/traces"
// DefaultTimeout is a default max waiting time for the backend to process
// each span batch.
DefaultTimeout time.Duration = 10 * time.Second
)
type (
SignalConfig struct {
Endpoint string
Insecure bool
TLSCfg *tls.Config
Headers map[string]string
Compression Compression
Timeout time.Duration
URLPath string
// gRPC configurations
GRPCCredentials credentials.TransportCredentials
}
Config struct {
// Signal specific configurations
Traces SignalConfig
RetryConfig retry.Config
// gRPC configurations
ReconnectionPeriod time.Duration
ServiceConfig string
DialOptions []grpc.DialOption
GRPCConn *grpc.ClientConn
}
)
// NewHTTPConfig returns a new Config with all settings applied from opts and
// any unset setting using the default HTTP config values.
func NewHTTPConfig(opts ...HTTPOption) Config {
cfg := Config{
Traces: SignalConfig{
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorHTTPPort),
URLPath: DefaultTracesPath,
Compression: NoCompression,
Timeout: DefaultTimeout,
},
RetryConfig: retry.DefaultConfig,
}
cfg = ApplyHTTPEnvConfigs(cfg)
for _, opt := range opts {
cfg = opt.ApplyHTTPOption(cfg)
}
cfg.Traces.URLPath = cleanPath(cfg.Traces.URLPath, DefaultTracesPath)
return cfg
}
// cleanPath returns a path with all spaces trimmed and all redundancies
// removed. If urlPath is empty or cleaning it results in an empty string,
// defaultPath is returned instead.
func cleanPath(urlPath string, defaultPath string) string {
tmp := path.Clean(strings.TrimSpace(urlPath))
if tmp == "." {
return defaultPath
}
if !path.IsAbs(tmp) {
tmp = fmt.Sprintf("/%s", tmp)
}
return tmp
}
// NewGRPCConfig returns a new Config with all settings applied from opts and
// any unset setting using the default gRPC config values.
func NewGRPCConfig(opts ...GRPCOption) Config {
userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version()
cfg := Config{
Traces: SignalConfig{
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorGRPCPort),
URLPath: DefaultTracesPath,
Compression: NoCompression,
Timeout: DefaultTimeout,
},
RetryConfig: retry.DefaultConfig,
DialOptions: []grpc.DialOption{grpc.WithUserAgent(userAgent)},
}
cfg = ApplyGRPCEnvConfigs(cfg)
for _, opt := range opts {
cfg = opt.ApplyGRPCOption(cfg)
}
if cfg.ServiceConfig != "" {
cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultServiceConfig(cfg.ServiceConfig))
}
// Priroritize GRPCCredentials over Insecure (passing both is an error).
if cfg.Traces.GRPCCredentials != nil {
cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(cfg.Traces.GRPCCredentials))
} else if cfg.Traces.Insecure {
cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
// Default to using the host's root CA.
creds := credentials.NewTLS(nil)
cfg.Traces.GRPCCredentials = creds
cfg.DialOptions = append(cfg.DialOptions, grpc.WithTransportCredentials(creds))
}
if cfg.Traces.Compression == GzipCompression {
cfg.DialOptions = append(cfg.DialOptions, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
}
if len(cfg.DialOptions) != 0 {
cfg.DialOptions = append(cfg.DialOptions, cfg.DialOptions...)
}
if cfg.ReconnectionPeriod != 0 {
p := grpc.ConnectParams{
Backoff: backoff.DefaultConfig,
MinConnectTimeout: cfg.ReconnectionPeriod,
}
cfg.DialOptions = append(cfg.DialOptions, grpc.WithConnectParams(p))
}
return cfg
}
type (
// GenericOption applies an option to the HTTP or gRPC driver.
GenericOption interface {
ApplyHTTPOption(Config) Config
ApplyGRPCOption(Config) Config
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
// HTTPOption applies an option to the HTTP driver.
HTTPOption interface {
ApplyHTTPOption(Config) Config
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
// GRPCOption applies an option to the gRPC driver.
GRPCOption interface {
ApplyGRPCOption(Config) Config
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
)
// genericOption is an option that applies the same logic
// for both gRPC and HTTP.
type genericOption struct {
fn func(Config) Config
}
func (g *genericOption) ApplyGRPCOption(cfg Config) Config {
return g.fn(cfg)
}
func (g *genericOption) ApplyHTTPOption(cfg Config) Config {
return g.fn(cfg)
}
func (genericOption) private() {}
func newGenericOption(fn func(cfg Config) Config) GenericOption {
return &genericOption{fn: fn}
}
// splitOption is an option that applies different logics
// for gRPC and HTTP.
type splitOption struct {
httpFn func(Config) Config
grpcFn func(Config) Config
}
func (g *splitOption) ApplyGRPCOption(cfg Config) Config {
return g.grpcFn(cfg)
}
func (g *splitOption) ApplyHTTPOption(cfg Config) Config {
return g.httpFn(cfg)
}
func (splitOption) private() {}
func newSplitOption(httpFn func(cfg Config) Config, grpcFn func(cfg Config) Config) GenericOption {
return &splitOption{httpFn: httpFn, grpcFn: grpcFn}
}
// httpOption is an option that is only applied to the HTTP driver.
type httpOption struct {
fn func(Config) Config
}
func (h *httpOption) ApplyHTTPOption(cfg Config) Config {
return h.fn(cfg)
}
func (httpOption) private() {}
func NewHTTPOption(fn func(cfg Config) Config) HTTPOption {
return &httpOption{fn: fn}
}
// grpcOption is an option that is only applied to the gRPC driver.
type grpcOption struct {
fn func(Config) Config
}
func (h *grpcOption) ApplyGRPCOption(cfg Config) Config {
return h.fn(cfg)
}
func (grpcOption) private() {}
func NewGRPCOption(fn func(cfg Config) Config) GRPCOption {
return &grpcOption{fn: fn}
}
// Generic Options
func WithEndpoint(endpoint string) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Traces.Endpoint = endpoint
return cfg
})
}
func WithCompression(compression Compression) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Traces.Compression = compression
return cfg
})
}
func WithURLPath(urlPath string) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Traces.URLPath = urlPath
return cfg
})
}
func WithRetry(rc retry.Config) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.RetryConfig = rc
return cfg
})
}
func WithTLSClientConfig(tlsCfg *tls.Config) GenericOption {
return newSplitOption(func(cfg Config) Config {
cfg.Traces.TLSCfg = tlsCfg.Clone()
return cfg
}, func(cfg Config) Config {
cfg.Traces.GRPCCredentials = credentials.NewTLS(tlsCfg)
return cfg
})
}
func WithInsecure() GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Traces.Insecure = true
return cfg
})
}
func WithSecure() GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Traces.Insecure = false
return cfg
})
}
func WithHeaders(headers map[string]string) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Traces.Headers = headers
return cfg
})
}
func WithTimeout(duration time.Duration) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Traces.Timeout = duration
return cfg
})
}

View File

@ -0,0 +1,489 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/otlptrace/otlpconfig/options_test.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/envconfig"
)
const (
WeakCertificate = `
-----BEGIN CERTIFICATE-----
MIIBhzCCASygAwIBAgIRANHpHgAWeTnLZpTSxCKs0ggwCgYIKoZIzj0EAwIwEjEQ
MA4GA1UEChMHb3RlbC1nbzAeFw0yMTA0MDExMzU5MDNaFw0yMTA0MDExNDU5MDNa
MBIxEDAOBgNVBAoTB290ZWwtZ28wWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAS9
nWSkmPCxShxnp43F+PrOtbGV7sNfkbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0Z
sJCLHGogQsYnWJBXUZOVo2MwYTAOBgNVHQ8BAf8EBAMCB4AwEwYDVR0lBAwwCgYI
KwYBBQUHAwEwDAYDVR0TAQH/BAIwADAsBgNVHREEJTAjgglsb2NhbGhvc3SHEAAA
AAAAAAAAAAAAAAAAAAGHBH8AAAEwCgYIKoZIzj0EAwIDSQAwRgIhANwZVVKvfvQ/
1HXsTvgH+xTQswOwSSKYJ1cVHQhqK7ZbAiEAus8NxpTRnp5DiTMuyVmhVNPB+bVH
Lhnm4N/QDk5rek0=
-----END CERTIFICATE-----
`
WeakPrivateKey = `
-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgN8HEXiXhvByrJ1zK
SFT6Y2l2KqDWwWzKf+t4CyWrNKehRANCAAS9nWSkmPCxShxnp43F+PrOtbGV7sNf
kbQ/kxzi9Ego0ZJdiXxkmv/C05QFddCW7Y0ZsJCLHGogQsYnWJBXUZOV
-----END PRIVATE KEY-----
`
)
type env map[string]string
func (e *env) getEnv(env string) string {
return (*e)[env]
}
type fileReader map[string][]byte
func (f *fileReader) readFile(filename string) ([]byte, error) {
if b, ok := (*f)[filename]; ok {
return b, nil
}
return nil, errors.New("file not found")
}
func TestConfigs(t *testing.T) {
tlsCert, err := CreateTLSConfig([]byte(WeakCertificate))
assert.NoError(t, err)
tests := []struct {
name string
opts []GenericOption
env env
fileReader fileReader
asserts func(t *testing.T, c *Config, grpcOption bool)
}{
{
name: "Test default configs",
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
assert.Equal(t, "localhost:4317", c.Traces.Endpoint)
} else {
assert.Equal(t, "localhost:4318", c.Traces.Endpoint)
}
assert.Equal(t, NoCompression, c.Traces.Compression)
assert.Equal(t, map[string]string(nil), c.Traces.Headers)
assert.Equal(t, 10*time.Second, c.Traces.Timeout)
},
},
// Endpoint Tests
{
name: "Test With Endpoint",
opts: []GenericOption{
WithEndpoint("someendpoint"),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "someendpoint", c.Traces.Endpoint)
},
},
{
name: "Test Environment Endpoint",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "https://env.endpoint/prefix",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.False(t, c.Traces.Insecure)
if grpcOption {
assert.Equal(t, "env.endpoint/prefix", c.Traces.Endpoint)
} else {
assert.Equal(t, "env.endpoint", c.Traces.Endpoint)
assert.Equal(t, "/prefix/v1/traces", c.Traces.URLPath)
}
},
},
{
name: "Test Environment Signal Specific Endpoint",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "https://overrode.by.signal.specific/env/var",
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "http://env.traces.endpoint",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.True(t, c.Traces.Insecure)
assert.Equal(t, "env.traces.endpoint", c.Traces.Endpoint)
if !grpcOption {
assert.Equal(t, "/", c.Traces.URLPath)
}
},
},
{
name: "Test Mixed Environment and With Endpoint",
opts: []GenericOption{
WithEndpoint("traces_endpoint"),
},
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "env_endpoint",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "traces_endpoint", c.Traces.Endpoint)
},
},
{
name: "Test Environment Endpoint with HTTP scheme",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "http://env_endpoint",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "env_endpoint", c.Traces.Endpoint)
assert.Equal(t, true, c.Traces.Insecure)
},
},
{
name: "Test Environment Endpoint with HTTP scheme and leading & trailingspaces",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": " http://env_endpoint ",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "env_endpoint", c.Traces.Endpoint)
assert.Equal(t, true, c.Traces.Insecure)
},
},
{
name: "Test Environment Endpoint with HTTPS scheme",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "https://env_endpoint",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "env_endpoint", c.Traces.Endpoint)
assert.Equal(t, false, c.Traces.Insecure)
},
},
{
name: "Test Environment Signal Specific Endpoint with uppercase scheme",
env: map[string]string{
"OTEL_EXPORTER_OTLP_ENDPOINT": "HTTPS://overrode_by_signal_specific",
"OTEL_EXPORTER_OTLP_TRACES_ENDPOINT": "HtTp://env_traces_endpoint",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, "env_traces_endpoint", c.Traces.Endpoint)
assert.Equal(t, true, c.Traces.Insecure)
},
},
// Certificate tests
{
name: "Test Default Certificate",
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
assert.NotNil(t, c.Traces.GRPCCredentials)
} else {
assert.Nil(t, c.Traces.TLSCfg)
}
},
},
{
name: "Test With Certificate",
opts: []GenericOption{
WithTLSClientConfig(tlsCert),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
//TODO: make sure gRPC's credentials actually works
assert.NotNil(t, c.Traces.GRPCCredentials)
} else {
// nolint:staticcheck // ignoring tlsCert.RootCAs.Subjects is deprecated ERR because cert does not come from SystemCertPool.
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects())
}
},
},
{
name: "Test Environment Certificate",
env: map[string]string{
"OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path",
},
fileReader: fileReader{
"cert_path": []byte(WeakCertificate),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
assert.NotNil(t, c.Traces.GRPCCredentials)
} else {
// nolint:staticcheck // ignoring tlsCert.RootCAs.Subjects is deprecated ERR because cert does not come from SystemCertPool.
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects())
}
},
},
{
name: "Test Environment Signal Specific Certificate",
env: map[string]string{
"OTEL_EXPORTER_OTLP_CERTIFICATE": "overrode_by_signal_specific",
"OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE": "cert_path",
},
fileReader: fileReader{
"cert_path": []byte(WeakCertificate),
"invalid_cert": []byte("invalid certificate file."),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
assert.NotNil(t, c.Traces.GRPCCredentials)
} else {
// nolint:staticcheck // ignoring tlsCert.RootCAs.Subjects is deprecated ERR because cert does not come from SystemCertPool.
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects())
}
},
},
{
name: "Test Mixed Environment and With Certificate",
opts: []GenericOption{},
env: map[string]string{
"OTEL_EXPORTER_OTLP_CERTIFICATE": "cert_path",
},
fileReader: fileReader{
"cert_path": []byte(WeakCertificate),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
if grpcOption {
assert.NotNil(t, c.Traces.GRPCCredentials)
} else {
// nolint:staticcheck // ignoring tlsCert.RootCAs.Subjects is deprecated ERR because cert does not come from SystemCertPool.
assert.Equal(t, tlsCert.RootCAs.Subjects(), c.Traces.TLSCfg.RootCAs.Subjects())
}
},
},
// Headers tests
{
name: "Test With Headers",
opts: []GenericOption{
WithHeaders(map[string]string{"h1": "v1"}),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, map[string]string{"h1": "v1"}, c.Traces.Headers)
},
},
{
name: "Test Environment Headers",
env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Traces.Headers)
},
},
{
name: "Test Environment Signal Specific Headers",
env: map[string]string{
"OTEL_EXPORTER_OTLP_HEADERS": "overrode_by_signal_specific",
"OTEL_EXPORTER_OTLP_TRACES_HEADERS": "h1=v1,h2=v2",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Traces.Headers)
},
},
{
name: "Test Mixed Environment and With Headers",
env: map[string]string{"OTEL_EXPORTER_OTLP_HEADERS": "h1=v1,h2=v2"},
opts: []GenericOption{},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, map[string]string{"h1": "v1", "h2": "v2"}, c.Traces.Headers)
},
},
// Compression Tests
{
name: "Test With Compression",
opts: []GenericOption{
WithCompression(GzipCompression),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, GzipCompression, c.Traces.Compression)
},
},
{
name: "Test Environment Compression",
env: map[string]string{
"OTEL_EXPORTER_OTLP_COMPRESSION": "gzip",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, GzipCompression, c.Traces.Compression)
},
},
{
name: "Test Environment Signal Specific Compression",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, GzipCompression, c.Traces.Compression)
},
},
{
name: "Test Mixed Environment and With Compression",
opts: []GenericOption{
WithCompression(NoCompression),
},
env: map[string]string{
"OTEL_EXPORTER_OTLP_TRACES_COMPRESSION": "gzip",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, NoCompression, c.Traces.Compression)
},
},
// Timeout Tests
{
name: "Test With Timeout",
opts: []GenericOption{
WithTimeout(time.Duration(5 * time.Second)),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, 5*time.Second, c.Traces.Timeout)
},
},
{
name: "Test Environment Timeout",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, c.Traces.Timeout, 15*time.Second)
},
},
{
name: "Test Environment Signal Specific Timeout",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
"OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000",
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, c.Traces.Timeout, 27*time.Second)
},
},
{
name: "Test Mixed Environment and With Timeout",
env: map[string]string{
"OTEL_EXPORTER_OTLP_TIMEOUT": "15000",
"OTEL_EXPORTER_OTLP_TRACES_TIMEOUT": "27000",
},
opts: []GenericOption{
WithTimeout(5 * time.Second),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Equal(t, c.Traces.Timeout, 5*time.Second)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
origEOR := DefaultEnvOptionsReader
DefaultEnvOptionsReader = envconfig.EnvOptionsReader{
GetEnv: tt.env.getEnv,
ReadFile: tt.fileReader.readFile,
Namespace: "OTEL_EXPORTER_OTLP",
}
t.Cleanup(func() { DefaultEnvOptionsReader = origEOR })
// Tests Generic options as HTTP Options
cfg := NewHTTPConfig(asHTTPOptions(tt.opts)...)
tt.asserts(t, &cfg, false)
// Tests Generic options as gRPC Options
cfg = NewGRPCConfig(asGRPCOptions(tt.opts)...)
tt.asserts(t, &cfg, true)
})
}
}
func asHTTPOptions(opts []GenericOption) []HTTPOption {
converted := make([]HTTPOption, len(opts))
for i, o := range opts {
converted[i] = NewHTTPOption(o.ApplyHTTPOption)
}
return converted
}
func asGRPCOptions(opts []GenericOption) []GRPCOption {
converted := make([]GRPCOption, len(opts))
for i, o := range opts {
converted[i] = NewGRPCOption(o.ApplyGRPCOption)
}
return converted
}
func TestCleanPath(t *testing.T) {
type args struct {
urlPath string
defaultPath string
}
tests := []struct {
name string
args args
want string
}{
{
name: "clean empty path",
args: args{
urlPath: "",
defaultPath: "DefaultPath",
},
want: "DefaultPath",
},
{
name: "clean metrics path",
args: args{
urlPath: "/prefix/v1/metrics",
defaultPath: "DefaultMetricsPath",
},
want: "/prefix/v1/metrics",
},
{
name: "clean traces path",
args: args{
urlPath: "https://env_endpoint",
defaultPath: "DefaultTracesPath",
},
want: "/https:/env_endpoint",
},
{
name: "spaces trimmed",
args: args{
urlPath: " /dir",
},
want: "/dir",
},
{
name: "clean path empty",
args: args{
urlPath: "dir/..",
defaultPath: "DefaultTracesPath",
},
want: "DefaultTracesPath",
},
{
name: "make absolute",
args: args{
urlPath: "dir/a",
},
want: "/dir/a",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := cleanPath(tt.args.urlPath, tt.args.defaultPath); got != tt.want {
t.Errorf("CleanPath() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,51 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/otlptrace/otlpconfig/optiontypes.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
const (
// DefaultCollectorGRPCPort is the default gRPC port of the collector.
DefaultCollectorGRPCPort uint16 = 4317
// DefaultCollectorHTTPPort is the default HTTP port of the collector.
DefaultCollectorHTTPPort uint16 = 4318
// DefaultCollectorHost is the host address the Exporter will attempt
// connect to if no collector address is provided.
DefaultCollectorHost string = "localhost"
)
// Compression describes the compression used for payloads sent to the
// collector.
type Compression int
const (
// NoCompression tells the driver to send payloads without
// compression.
NoCompression Compression = iota
// GzipCompression tells the driver to send payloads after
// compressing them with gzip.
GzipCompression
)
// Marshaler describes the kind of message format sent to the collector.
type Marshaler int
const (
// MarshalProto tells the driver to send using the protobuf binary format.
MarshalProto Marshaler = iota
// MarshalJSON tells the driver to send using json format.
MarshalJSON
)

View File

@ -0,0 +1,37 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/otlptrace/otlpconfig/tls.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
import (
"crypto/tls"
"crypto/x509"
"errors"
)
// CreateTLSConfig creates a tls.Config from a raw certificate bytes
// to verify a server certificate.
func CreateTLSConfig(certBytes []byte) (*tls.Config, error) {
cp := x509.NewCertPool()
if ok := cp.AppendCertsFromPEM(certBytes); !ok {
return nil, errors.New("failed to append certificate to the cert pool")
}
return &tls.Config{
RootCAs: cp,
}, nil
}

View File

@ -0,0 +1,136 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/otlptrace/otlptracetest/client.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlptracetest // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
import (
"context"
"errors"
"sync"
"testing"
"time"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
)
func RunExporterShutdownTest(t *testing.T, factory func() otlptrace.Client) {
t.Run("testClientStopHonorsTimeout", func(t *testing.T) {
testClientStopHonorsTimeout(t, factory())
})
t.Run("testClientStopHonorsCancel", func(t *testing.T) {
testClientStopHonorsCancel(t, factory())
})
t.Run("testClientStopNoError", func(t *testing.T) {
testClientStopNoError(t, factory())
})
t.Run("testClientStopManyTimes", func(t *testing.T) {
testClientStopManyTimes(t, factory())
})
}
func initializeExporter(t *testing.T, client otlptrace.Client) *otlptrace.Exporter {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
e, err := otlptrace.New(ctx, client)
if err != nil {
t.Fatalf("failed to create exporter")
}
return e
}
func testClientStopHonorsTimeout(t *testing.T, client otlptrace.Client) {
t.Cleanup(func() {
// The test is looking for a failed shut down. Call Stop a second time
// with an un-expired context to give the client a second chance at
// cleaning up. There is not guarantee from the Client interface this
// will succeed, therefore, no need to check the error (just give it a
// best try).
_ = client.Stop(context.Background())
})
e := initializeExporter(t, client)
ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
defer cancel()
<-ctx.Done()
if err := e.Shutdown(ctx); !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected context DeadlineExceeded error, got %v", err)
}
}
func testClientStopHonorsCancel(t *testing.T, client otlptrace.Client) {
t.Cleanup(func() {
// The test is looking for a failed shut down. Call Stop a second time
// with an un-expired context to give the client a second chance at
// cleaning up. There is not guarantee from the Client interface this
// will succeed, therefore, no need to check the error (just give it a
// best try).
_ = client.Stop(context.Background())
})
e := initializeExporter(t, client)
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := e.Shutdown(ctx); !errors.Is(err, context.Canceled) {
t.Errorf("expected context canceled error, got %v", err)
}
}
func testClientStopNoError(t *testing.T, client otlptrace.Client) {
e := initializeExporter(t, client)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
if err := e.Shutdown(ctx); err != nil {
t.Errorf("shutdown errored: expected nil, got %v", err)
}
}
func testClientStopManyTimes(t *testing.T, client otlptrace.Client) {
e := initializeExporter(t, client)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
ch := make(chan struct{})
wg := sync.WaitGroup{}
const num int = 20
wg.Add(num)
errs := make([]error, num)
for i := 0; i < num; i++ {
go func(idx int) {
defer wg.Done()
<-ch
errs[idx] = e.Shutdown(ctx)
}(i)
}
close(ch)
wg.Wait()
for _, err := range errs {
if err != nil {
t.Errorf("failed to shutdown exporter: %v", err)
return
}
}
}

View File

@ -0,0 +1,106 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/otlptrace/otlptracetest/collector.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlptracetest // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
import (
"sort"
collectortracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
)
// TracesCollector mocks a collector for the end-to-end testing.
type TracesCollector interface {
Stop() error
GetResourceSpans() []*tracepb.ResourceSpans
}
// SpansStorage stores the spans. Mock collectors can use it to
// store spans they have received.
type SpansStorage struct {
rsm map[string]*tracepb.ResourceSpans
spanCount int
}
// NewSpansStorage creates a new spans storage.
func NewSpansStorage() SpansStorage {
return SpansStorage{
rsm: make(map[string]*tracepb.ResourceSpans),
}
}
// AddSpans adds spans to the spans storage.
func (s *SpansStorage) AddSpans(request *collectortracepb.ExportTraceServiceRequest) {
for _, rs := range request.GetResourceSpans() {
rstr := resourceString(rs.Resource)
if existingRs, ok := s.rsm[rstr]; !ok {
s.rsm[rstr] = rs
// TODO (rghetia): Add support for library Info.
if len(rs.ScopeSpans) == 0 {
rs.ScopeSpans = []*tracepb.ScopeSpans{
{
Spans: []*tracepb.Span{},
},
}
}
s.spanCount += len(rs.ScopeSpans[0].Spans)
} else {
if len(rs.ScopeSpans) > 0 {
newSpans := rs.ScopeSpans[0].GetSpans()
existingRs.ScopeSpans[0].Spans = append(existingRs.ScopeSpans[0].Spans, newSpans...)
s.spanCount += len(newSpans)
}
}
}
}
// GetSpans returns the stored spans.
func (s *SpansStorage) GetSpans() []*tracepb.Span {
spans := make([]*tracepb.Span, 0, s.spanCount)
for _, rs := range s.rsm {
spans = append(spans, rs.ScopeSpans[0].Spans...)
}
return spans
}
// GetResourceSpans returns the stored resource spans.
func (s *SpansStorage) GetResourceSpans() []*tracepb.ResourceSpans {
rss := make([]*tracepb.ResourceSpans, 0, len(s.rsm))
for _, rs := range s.rsm {
rss = append(rss, rs)
}
return rss
}
func resourceString(res *resourcepb.Resource) string {
sAttrs := sortedAttributes(res.GetAttributes())
rstr := ""
for _, attr := range sAttrs {
rstr = rstr + attr.String()
}
return rstr
}
func sortedAttributes(attrs []*commonpb.KeyValue) []*commonpb.KeyValue {
sort.Slice(attrs[:], func(i, j int) bool {
return attrs[i].Key < attrs[j].Key
})
return attrs
}

View File

@ -0,0 +1,66 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/otlptrace/otlptracetest/data.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlptracetest // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)
// SingleReadOnlySpan returns a one-element slice with a read-only span. It
// may be useful for testing driver's trace export.
func SingleReadOnlySpan() []tracesdk.ReadOnlySpan {
return tracetest.SpanStubs{
{
SpanContext: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID{2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9},
SpanID: trace.SpanID{3, 4, 5, 6, 7, 8, 9, 0},
TraceFlags: trace.FlagsSampled,
}),
Parent: trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID{2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9},
SpanID: trace.SpanID{1, 2, 3, 4, 5, 6, 7, 8},
TraceFlags: trace.FlagsSampled,
}),
SpanKind: trace.SpanKindInternal,
Name: "foo",
StartTime: time.Date(2020, time.December, 8, 20, 23, 0, 0, time.UTC),
EndTime: time.Date(2020, time.December, 0, 20, 24, 0, 0, time.UTC),
Attributes: []attribute.KeyValue{},
Events: []tracesdk.Event{},
Links: []tracesdk.Link{},
Status: tracesdk.Status{Code: codes.Ok},
DroppedAttributes: 0,
DroppedEvents: 0,
DroppedLinks: 0,
ChildSpanCount: 0,
Resource: resource.NewSchemaless(attribute.String("a", "b")),
InstrumentationLibrary: instrumentation.Library{
Name: "bar",
Version: "0.0.0",
},
},
}.Snapshots()
}

View File

@ -0,0 +1,128 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/otlptrace/otlptracetest/otlptest.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlptracetest // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
import (
"context"
"testing"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
)
// RunEndToEndTest can be used by otlptrace.Client tests to validate
// themselves.
func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlptrace.Exporter, tracesCollector TracesCollector) {
pOpts := []sdktrace.TracerProviderOption{
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(
exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5*time.Second),
sdktrace.WithMaxExportBatchSize(10),
),
}
tp1 := sdktrace.NewTracerProvider(append(pOpts,
sdktrace.WithResource(resource.NewSchemaless(
attribute.String("rk1", "rv11)"),
attribute.Int64("rk2", 5),
)))...)
tp2 := sdktrace.NewTracerProvider(append(pOpts,
sdktrace.WithResource(resource.NewSchemaless(
attribute.String("rk1", "rv12)"),
attribute.Float64("rk3", 6.5),
)))...)
tr1 := tp1.Tracer("test-tracer1")
tr2 := tp2.Tracer("test-tracer2")
// Now create few spans
m := 4
for i := 0; i < m; i++ {
_, span := tr1.Start(ctx, "AlwaysSample")
span.SetAttributes(attribute.Int64("i", int64(i)))
span.End()
_, span = tr2.Start(ctx, "AlwaysSample")
span.SetAttributes(attribute.Int64("i", int64(i)))
span.End()
}
func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := tp1.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a tracer provider 1: %v", err)
}
if err := tp2.Shutdown(ctx); err != nil {
t.Fatalf("failed to shut down a tracer provider 2: %v", err)
}
}()
// Wait >2 cycles.
<-time.After(40 * time.Millisecond)
// Now shutdown the exporter
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
t.Fatalf("failed to stop the exporter: %v", err)
}
// Shutdown the collector too so that we can begin
// verification checks of expected data back.
if err := tracesCollector.Stop(); err != nil {
t.Fatalf("failed to stop the mock collector: %v", err)
}
// Now verify that we only got two resources
rss := tracesCollector.GetResourceSpans()
if got, want := len(rss), 2; got != want {
t.Fatalf("resource span count: got %d, want %d\n", got, want)
}
// Now verify spans and attributes for each resource span.
for _, rs := range rss {
if len(rs.ScopeSpans) == 0 {
t.Fatalf("zero ScopeSpans")
}
if got, want := len(rs.ScopeSpans[0].Spans), m; got != want {
t.Fatalf("span counts: got %d, want %d", got, want)
}
attrMap := map[int64]bool{}
for _, s := range rs.ScopeSpans[0].Spans {
if gotName, want := s.Name, "AlwaysSample"; gotName != want {
t.Fatalf("span name: got %s, want %s", gotName, want)
}
attrMap[s.Attributes[0].Value.Value.(*commonpb.AnyValue_IntValue).IntValue] = true
}
if got, want := len(attrMap), m; got != want {
t.Fatalf("span attribute unique values: got %d want %d", got, want)
}
for i := 0; i < m; i++ {
_, ok := attrMap[int64(i)]
if !ok {
t.Fatalf("span with attribute %d missing", i)
}
}
}
}

View File

@ -0,0 +1,67 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/partialsuccess.go
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
import "fmt"
// PartialSuccess represents the underlying error for all handling
// OTLP partial success messages. Use `errors.Is(err,
// PartialSuccess{})` to test whether an error passed to the OTel
// error handler belongs to this category.
type PartialSuccess struct {
ErrorMessage string
RejectedItems int64
RejectedKind string
}
var _ error = PartialSuccess{}
// Error implements the error interface.
func (ps PartialSuccess) Error() string {
msg := ps.ErrorMessage
if msg == "" {
msg = "empty message"
}
return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind)
}
// Is supports the errors.Is() interface.
func (ps PartialSuccess) Is(err error) bool {
_, ok := err.(PartialSuccess)
return ok
}
// TracePartialSuccessError returns an error describing a partial success
// response for the trace signal.
func TracePartialSuccessError(itemsRejected int64, errorMessage string) error {
return PartialSuccess{
ErrorMessage: errorMessage,
RejectedItems: itemsRejected,
RejectedKind: "spans",
}
}
// MetricPartialSuccessError returns an error describing a partial success
// response for the metric signal.
func MetricPartialSuccessError(itemsRejected int64, errorMessage string) error {
return PartialSuccess{
ErrorMessage: errorMessage,
RejectedItems: itemsRejected,
RejectedKind: "metric data points",
}
}

View File

@ -0,0 +1,46 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/partialsuccess_test.go
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"errors"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func requireErrorString(t *testing.T, expect string, err error) {
t.Helper()
require.NotNil(t, err)
require.Error(t, err)
require.True(t, errors.Is(err, PartialSuccess{}))
const pfx = "OTLP partial success: "
msg := err.Error()
require.True(t, strings.HasPrefix(msg, pfx))
require.Equal(t, expect, msg[len(pfx):])
}
func TestPartialSuccessFormat(t *testing.T) {
requireErrorString(t, "empty message (0 metric data points rejected)", MetricPartialSuccessError(0, ""))
requireErrorString(t, "help help (0 metric data points rejected)", MetricPartialSuccessError(0, "help help"))
requireErrorString(t, "what happened (10 metric data points rejected)", MetricPartialSuccessError(10, "what happened"))
requireErrorString(t, "what happened (15 spans rejected)", TracePartialSuccessError(15, "what happened"))
}

View File

@ -0,0 +1,156 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/retry/retry.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package retry provides request retry functionality that can perform
// configurable exponential backoff for transient errors and honor any
// explicit throttle responses received.
package retry // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
import (
"context"
"fmt"
"time"
"github.com/cenkalti/backoff/v4"
)
// DefaultConfig are the recommended defaults to use.
var DefaultConfig = Config{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}
// Config defines configuration for retrying batches in case of export failure
// using an exponential backoff.
type Config struct {
// Enabled indicates whether to not retry sending batches in case of
// export failure.
Enabled bool
// InitialInterval the time to wait after the first failure before
// retrying.
InitialInterval time.Duration
// MaxInterval is the upper bound on backoff interval. Once this value is
// reached the delay between consecutive retries will always be
// `MaxInterval`.
MaxInterval time.Duration
// MaxElapsedTime is the maximum amount of time (including retries) spent
// trying to send a request/batch. Once this value is reached, the data
// is discarded.
MaxElapsedTime time.Duration
}
// RequestFunc wraps a request with retry logic.
type RequestFunc func(context.Context, func(context.Context) error) error
// EvaluateFunc returns if an error is retry-able and if an explicit throttle
// duration should be honored that was included in the error.
//
// The function must return true if the error argument is retry-able,
// otherwise it must return false for the first return parameter.
//
// The function must return a non-zero time.Duration if the error contains
// explicit throttle duration that should be honored, otherwise it must return
// a zero valued time.Duration.
type EvaluateFunc func(error) (bool, time.Duration)
// RequestFunc returns a RequestFunc using the evaluate function to determine
// if requests can be retried and based on the exponential backoff
// configuration of c.
func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc {
if !c.Enabled {
return func(ctx context.Context, fn func(context.Context) error) error {
return fn(ctx)
}
}
return func(ctx context.Context, fn func(context.Context) error) error {
// Do not use NewExponentialBackOff since it calls Reset and the code here
// must call Reset after changing the InitialInterval (this saves an
// unnecessary call to Now).
b := &backoff.ExponentialBackOff{
InitialInterval: c.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: c.MaxInterval,
MaxElapsedTime: c.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b.Reset()
for {
err := fn(ctx)
if err == nil {
return nil
}
retryable, throttle := evaluate(err)
if !retryable {
return err
}
bOff := b.NextBackOff()
if bOff == backoff.Stop {
return fmt.Errorf("max retry time elapsed: %w", err)
}
// Wait for the greater of the backoff or throttle delay.
var delay time.Duration
if bOff > throttle {
delay = bOff
} else {
elapsed := b.GetElapsedTime()
if b.MaxElapsedTime != 0 && elapsed+throttle > b.MaxElapsedTime {
return fmt.Errorf("max retry time would elapse: %w", err)
}
delay = throttle
}
if ctxErr := waitFunc(ctx, delay); ctxErr != nil {
return fmt.Errorf("%w: %s", ctxErr, err)
}
}
}
}
// Allow override for testing.
var waitFunc = wait
// wait takes the caller's context, and the amount of time to wait. It will
// return nil if the timer fires before or at the same time as the context's
// deadline. This indicates that the call can be retried.
func wait(ctx context.Context, delay time.Duration) error {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-ctx.Done():
// Handle the case where the timer and context deadline end
// simultaneously by prioritizing the timer expiration nil value
// response.
select {
case <-timer.C:
default:
return ctx.Err()
}
case <-timer.C:
}
return nil
}

View File

@ -0,0 +1,261 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/retry/retry_test.go.tmpl
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retry
import (
"context"
"errors"
"math"
"sync"
"testing"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/stretchr/testify/assert"
)
func TestWait(t *testing.T) {
tests := []struct {
ctx context.Context
delay time.Duration
expected error
}{
{
ctx: context.Background(),
delay: time.Duration(0),
},
{
ctx: context.Background(),
delay: time.Duration(1),
},
{
ctx: context.Background(),
delay: time.Duration(-1),
},
{
ctx: func() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}(),
// Ensure the timer and context do not end simultaneously.
delay: 1 * time.Hour,
expected: context.Canceled,
},
}
for _, test := range tests {
err := wait(test.ctx, test.delay)
if test.expected == nil {
assert.NoError(t, err)
} else {
assert.ErrorIs(t, err, test.expected)
}
}
}
func TestNonRetryableError(t *testing.T) {
ev := func(error) (bool, time.Duration) { return false, 0 }
reqFunc := Config{
Enabled: true,
InitialInterval: 1 * time.Nanosecond,
MaxInterval: 1 * time.Nanosecond,
// Never stop retrying.
MaxElapsedTime: 0,
}.RequestFunc(ev)
ctx := context.Background()
assert.NoError(t, reqFunc(ctx, func(context.Context) error {
return nil
}))
assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error {
return assert.AnError
}), assert.AnError)
}
func TestThrottledRetry(t *testing.T) {
// Ensure the throttle delay is used by making longer than backoff delay.
throttleDelay, backoffDelay := time.Second, time.Nanosecond
ev := func(error) (bool, time.Duration) {
// Retry everything with a throttle delay.
return true, throttleDelay
}
reqFunc := Config{
Enabled: true,
InitialInterval: backoffDelay,
MaxInterval: backoffDelay,
// Never stop retrying.
MaxElapsedTime: 0,
}.RequestFunc(ev)
origWait := waitFunc
var done bool
waitFunc = func(_ context.Context, delay time.Duration) error {
assert.Equal(t, throttleDelay, delay, "retry not throttled")
// Try twice to ensure call is attempted again after delay.
if done {
return assert.AnError
}
done = true
return nil
}
defer func() { waitFunc = origWait }()
ctx := context.Background()
assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error {
return errors.New("not this error")
}), assert.AnError)
}
func TestBackoffRetry(t *testing.T) {
ev := func(error) (bool, time.Duration) { return true, 0 }
delay := time.Nanosecond
reqFunc := Config{
Enabled: true,
InitialInterval: delay,
MaxInterval: delay,
// Never stop retrying.
MaxElapsedTime: 0,
}.RequestFunc(ev)
origWait := waitFunc
var done bool
waitFunc = func(_ context.Context, d time.Duration) error {
delta := math.Ceil(float64(delay) * backoff.DefaultRandomizationFactor)
assert.InDelta(t, delay, d, delta, "retry not backoffed")
// Try twice to ensure call is attempted again after delay.
if done {
return assert.AnError
}
done = true
return nil
}
t.Cleanup(func() { waitFunc = origWait })
ctx := context.Background()
assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error {
return errors.New("not this error")
}), assert.AnError)
}
func TestBackoffRetryCanceledContext(t *testing.T) {
ev := func(error) (bool, time.Duration) { return true, 0 }
delay := time.Millisecond
reqFunc := Config{
Enabled: true,
InitialInterval: delay,
MaxInterval: delay,
// Never stop retrying.
MaxElapsedTime: 10 * time.Millisecond,
}.RequestFunc(ev)
ctx, cancel := context.WithCancel(context.Background())
count := 0
cancel()
err := reqFunc(ctx, func(context.Context) error {
count++
return assert.AnError
})
assert.ErrorIs(t, err, context.Canceled)
assert.Contains(t, err.Error(), assert.AnError.Error())
assert.Equal(t, 1, count)
}
func TestThrottledRetryGreaterThanMaxElapsedTime(t *testing.T) {
// Ensure the throttle delay is used by making longer than backoff delay.
tDelay, bDelay := time.Hour, time.Nanosecond
ev := func(error) (bool, time.Duration) { return true, tDelay }
reqFunc := Config{
Enabled: true,
InitialInterval: bDelay,
MaxInterval: bDelay,
MaxElapsedTime: tDelay - (time.Nanosecond),
}.RequestFunc(ev)
ctx := context.Background()
assert.Contains(t, reqFunc(ctx, func(context.Context) error {
return assert.AnError
}).Error(), "max retry time would elapse: ")
}
func TestMaxElapsedTime(t *testing.T) {
ev := func(error) (bool, time.Duration) { return true, 0 }
delay := time.Nanosecond
reqFunc := Config{
Enabled: true,
// InitialInterval > MaxElapsedTime means immediate return.
InitialInterval: 2 * delay,
MaxElapsedTime: delay,
}.RequestFunc(ev)
ctx := context.Background()
assert.Contains(t, reqFunc(ctx, func(context.Context) error {
return assert.AnError
}).Error(), "max retry time elapsed: ")
}
func TestRetryNotEnabled(t *testing.T) {
ev := func(error) (bool, time.Duration) {
t.Error("evaluated retry when not enabled")
return false, 0
}
reqFunc := Config{}.RequestFunc(ev)
ctx := context.Background()
assert.NoError(t, reqFunc(ctx, func(context.Context) error {
return nil
}))
assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error {
return assert.AnError
}), assert.AnError)
}
func TestRetryConcurrentSafe(t *testing.T) {
ev := func(error) (bool, time.Duration) { return true, 0 }
reqFunc := Config{
Enabled: true,
}.RequestFunc(ev)
var wg sync.WaitGroup
ctx := context.Background()
for i := 1; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
var done bool
assert.NoError(t, reqFunc(ctx, func(context.Context) error {
if !done {
done = true
return assert.AnError
}
return nil
}))
}()
}
wg.Wait()
}

View File

@ -26,7 +26,7 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlptracetest"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
collectortracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
)

View File

@ -22,8 +22,8 @@ import (
"google.golang.org/grpc/credentials"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/retry"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
)
// Option applies an option to the gRPC driver.