1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-16 02:47:20 +02:00

Add newClient method for otlploggrpc gRPC client (#5523)

part of #5056

For full usage of this client, check #5522

---------

Co-authored-by: Damien Mathieu <42@dmathieu.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Sam Xie 2024-06-24 07:47:21 -07:00 committed by GitHub
parent a814b359a0
commit d99c76fa32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 384 additions and 6 deletions

View File

@ -3,12 +3,144 @@
package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
import (
"time"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
)
// The methods of this type are not expected to be called concurrently.
type client struct {
// TODO: implement.
metadata metadata.MD
exportTimeout time.Duration
requestFunc retry.RequestFunc
// ourConn keeps track of where conn was created: true if created here in
// NewClient, or false if passed with an option. This is important on
// Shutdown as conn should only be closed if we created it. Otherwise,
// it is up to the processes that passed conn to close it.
ourConn bool
conn *grpc.ClientConn
lsc collogpb.LogsServiceClient
}
// Used for testing.
var newGRPCClientFn = grpc.NewClient
// newClient creates a new gRPC log client.
func newClient(cfg config) (*client, error) {
// TODO: implement.
return &client{}, nil
c := &client{
exportTimeout: cfg.timeout.Value,
requestFunc: cfg.retryCfg.Value.RequestFunc(retryable),
conn: cfg.gRPCConn.Value,
}
if len(cfg.headers.Value) > 0 {
c.metadata = metadata.New(cfg.headers.Value)
}
if c.conn == nil {
// If the caller did not provide a ClientConn when the client was
// created, create one using the configuration they did provide.
dialOpts := newGRPCDialOptions(cfg)
conn, err := newGRPCClientFn(cfg.endpoint.Value, dialOpts...)
if err != nil {
return nil, err
}
// Keep track that we own the lifecycle of this conn and need to close
// it on Shutdown.
c.ourConn = true
c.conn = conn
}
c.lsc = collogpb.NewLogsServiceClient(c.conn)
return c, nil
}
func newGRPCDialOptions(cfg config) []grpc.DialOption {
userAgent := "OTel Go OTLP over gRPC logs exporter/" + Version()
dialOpts := []grpc.DialOption{grpc.WithUserAgent(userAgent)}
dialOpts = append(dialOpts, cfg.dialOptions.Value...)
// Convert other grpc configs to the dial options.
// Service config
if cfg.serviceConfig.Value != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(cfg.serviceConfig.Value))
}
// Prioritize GRPCCredentials over Insecure (passing both is an error).
if cfg.gRPCCredentials.Value != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(cfg.gRPCCredentials.Value))
} else if cfg.insecure.Value {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
// Default to using the host's root CA.
dialOpts = append(dialOpts, grpc.WithTransportCredentials(
credentials.NewTLS(nil),
))
}
// Compression
if cfg.compression.Value == GzipCompression {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
}
// Reconnection period
if cfg.reconnectionPeriod.Value != 0 {
p := grpc.ConnectParams{
Backoff: backoff.DefaultConfig,
MinConnectTimeout: cfg.reconnectionPeriod.Value,
}
dialOpts = append(dialOpts, grpc.WithConnectParams(p))
}
return dialOpts
}
// retryable returns if err identifies a request that can be retried and a
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
s := status.Convert(err)
return retryableGRPCStatus(s)
}
func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
switch s.Code() {
case codes.Canceled,
codes.DeadlineExceeded,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// Additionally, handle RetryInfo.
_, d := throttleDelay(s)
return true, d
case codes.ResourceExhausted:
// Retry only if the server signals that the recovery from resource exhaustion is possible.
return throttleDelay(s)
}
// Not a retry-able error.
return false, 0
}
// throttleDelay returns if the status is RetryInfo
// and the duration to wait for if an explicit throttle time is included.
func throttleDelay(s *status.Status) (bool, time.Duration) {
for _, detail := range s.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
return true, t.RetryDelay.AsDuration()
}
}
return false, 0
}

View File

@ -0,0 +1,227 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
"github.com/stretchr/testify/assert"
)
func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "SingleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Millisecond),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
s, err := status.New(c, "ErrorInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "no throttle detail"},
)
require.NoError(t, err)
return s
}(),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "ErrorAndRetryInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "with throttle detail"},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
s, err := status.New(c, "DoubleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Minute),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 13 * time.Minute,
},
}
for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}
func TestRetryable(t *testing.T) {
retryableCodes := map[codes.Code]bool{
codes.OK: false,
codes.Canceled: true,
codes.Unknown: false,
codes.InvalidArgument: false,
codes.DeadlineExceeded: true,
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
codes.Unimplemented: false,
codes.Internal: false,
codes.Unavailable: true,
codes.DataLoss: true,
codes.Unauthenticated: false,
}
for c, want := range retryableCodes {
got, _ := retryable(status.Error(c, ""))
assert.Equalf(t, want, got, "evaluate(%s)", c)
}
}
func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)
ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}
func TestNewClient(t *testing.T) {
newGRPCClientFnSwap := newGRPCClientFn
t.Cleanup(func() {
newGRPCClientFn = newGRPCClientFnSwap
})
// The gRPC connection created by newClient.
conn, err := grpc.NewClient("test", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
newGRPCClientFn = func(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return conn, nil
}
// The gRPC connection created by users.
userConn, err := grpc.NewClient("test 2", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
testCases := []struct {
name string
cfg config
cli *client
}{
{
name: "empty config",
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
},
},
{
name: "with headers",
cfg: config{
headers: newSetting(map[string]string{
"key": "value",
}),
},
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
metadata: map[string][]string{"key": {"value"}},
},
},
{
name: "with gRPC connection",
cfg: config{
gRPCConn: newSetting(userConn),
},
cli: &client{
ourConn: false,
conn: userConn,
lsc: collogpb.NewLogsServiceClient(userConn),
},
},
{
// It is not possible to compare grpc dial options directly, so we just check that the client is created
// and no panic occurs.
name: "with dial options",
cfg: config{
serviceConfig: newSetting("service config"),
gRPCCredentials: newSetting(credentials.NewTLS(nil)),
compression: newSetting(GzipCompression),
reconnectionPeriod: newSetting(10 * time.Second),
},
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cli, err := newClient(tc.cfg)
require.NoError(t, err)
assert.Equal(t, tc.cli.metadata, cli.metadata)
assert.Equal(t, tc.cli.exportTimeout, cli.exportTimeout)
assert.Equal(t, tc.cli.ourConn, cli.ourConn)
assert.Equal(t, tc.cli.conn, cli.conn)
assert.Equal(t, tc.cli.lsc, cli.lsc)
})
}
}

View File

@ -7,14 +7,20 @@ require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/sdk/log v0.3.0
go.opentelemetry.io/proto/otlp v1.3.1
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.2
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
go.opentelemetry.io/otel/log v0.3.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
@ -22,8 +28,7 @@ require (
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 // indirect
google.golang.org/protobuf v1.34.2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@ -1,5 +1,6 @@
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
@ -9,23 +10,36 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8 h1:W5Xj/70xIA4x60O/IFyXivR5MGqblAb8R3w26pnD6No=
google.golang.org/genproto/googleapis/api v0.0.0-20240513163218-0867130af1f8/go.mod h1:vPrPUTsDCYxXWjP7clS81mZ6/803D8K4iM9Ma27VKas=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4 h1:Di6ANFilr+S60a4S61ZM00vLdw0IrQOSMS2/6mrnOU0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240617180043-68d350f18fd4/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.64.0 h1:KH3VH9y/MgNQg1dE7b3XfVK0GsPSIzJwdF617gUSbvY=
google.golang.org/grpc v1.64.0/go.mod h1:oxjF8E3FBnjp+/gVFYdWacaLDx9na1aqy9oovLpxQYg=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=