2024-04-25 09:57:24 -07:00
|
|
|
// Copyright The OpenTelemetry Authors
|
|
|
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
|
|
|
|
|
|
package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
|
|
|
|
|
|
2024-06-24 07:47:21 -07:00
|
|
|
import (
|
2024-07-03 21:50:02 -07:00
|
|
|
"context"
|
Use the cause of the context error in OTLP retry (#6898)
Part of #6588
For a demo code like this
```go
package main
import (
"context"
"fmt"
"log"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
func main() {
ctx := context.Background()
exp, err := newExporter(ctx)
if err != nil {
log.Fatalf("failed to initialize trace exporter: %v", err)
}
tp, err := newTracerProvider(exp)
if err != nil {
log.Fatalf("failed to initialize trace provider: %v", err)
}
defer func() { _ = tp.Shutdown(ctx) }()
otel.SetTracerProvider(tp)
generateSpan()
select {}
}
func generateSpan() {
log.Println("Generating a dummy span")
_, span := otel.Tracer("").Start(context.Background(), "dummy")
defer span.End()
}
func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) {
return sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
), nil
}
func newExporter(ctx context.Context) (*otlptrace.Exporter, error) {
traceExporter, err := otlptrace.New(
ctx,
otlptracegrpc.NewClient(
otlptracegrpc.WithEndpoint("127.0.0.1:4317"),
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
return traceExporter, nil
}
```
the error result from
```
traces export: context deadline exceeded: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused"
```
become
```
traces export: exporter export timeout: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused"
```
2025-06-12 10:02:35 -07:00
|
|
|
"errors"
|
2025-10-03 01:15:41 +08:00
|
|
|
"sync/atomic"
|
2024-06-24 07:47:21 -07:00
|
|
|
"time"
|
|
|
|
|
|
2025-08-03 17:24:33 +02:00
|
|
|
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
|
|
|
|
|
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
|
2024-06-24 07:47:21 -07:00
|
|
|
"google.golang.org/genproto/googleapis/rpc/errdetails"
|
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
|
"google.golang.org/grpc/backoff"
|
|
|
|
|
"google.golang.org/grpc/codes"
|
|
|
|
|
"google.golang.org/grpc/credentials"
|
|
|
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
|
|
|
"google.golang.org/grpc/encoding/gzip"
|
|
|
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
|
"google.golang.org/grpc/status"
|
|
|
|
|
|
2025-10-03 01:15:41 +08:00
|
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
|
|
|
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
|
2024-06-24 07:47:21 -07:00
|
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// The methods of this type are not expected to be called concurrently.
|
2024-04-25 09:57:24 -07:00
|
|
|
type client struct {
|
2024-06-24 07:47:21 -07:00
|
|
|
metadata metadata.MD
|
|
|
|
|
exportTimeout time.Duration
|
|
|
|
|
requestFunc retry.RequestFunc
|
|
|
|
|
|
|
|
|
|
// ourConn keeps track of where conn was created: true if created here in
|
|
|
|
|
// NewClient, or false if passed with an option. This is important on
|
|
|
|
|
// Shutdown as conn should only be closed if we created it. Otherwise,
|
|
|
|
|
// it is up to the processes that passed conn to close it.
|
|
|
|
|
ourConn bool
|
|
|
|
|
conn *grpc.ClientConn
|
|
|
|
|
lsc collogpb.LogsServiceClient
|
2025-10-03 01:15:41 +08:00
|
|
|
|
|
|
|
|
instrumentation *observ.Instrumentation
|
2024-04-25 09:57:24 -07:00
|
|
|
}
|
|
|
|
|
|
2024-06-24 07:47:21 -07:00
|
|
|
// Used for testing.
|
|
|
|
|
var newGRPCClientFn = grpc.NewClient
|
|
|
|
|
|
2024-04-25 09:57:24 -07:00
|
|
|
// newClient creates a new gRPC log client.
|
|
|
|
|
func newClient(cfg config) (*client, error) {
|
2024-06-24 07:47:21 -07:00
|
|
|
c := &client{
|
|
|
|
|
exportTimeout: cfg.timeout.Value,
|
|
|
|
|
requestFunc: cfg.retryCfg.Value.RequestFunc(retryable),
|
|
|
|
|
conn: cfg.gRPCConn.Value,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(cfg.headers.Value) > 0 {
|
|
|
|
|
c.metadata = metadata.New(cfg.headers.Value)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.conn == nil {
|
|
|
|
|
// If the caller did not provide a ClientConn when the client was
|
|
|
|
|
// created, create one using the configuration they did provide.
|
|
|
|
|
dialOpts := newGRPCDialOptions(cfg)
|
|
|
|
|
|
|
|
|
|
conn, err := newGRPCClientFn(cfg.endpoint.Value, dialOpts...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
// Keep track that we own the lifecycle of this conn and need to close
|
|
|
|
|
// it on Shutdown.
|
|
|
|
|
c.ourConn = true
|
|
|
|
|
c.conn = conn
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
c.lsc = collogpb.NewLogsServiceClient(c.conn)
|
|
|
|
|
|
2025-10-03 01:15:41 +08:00
|
|
|
var err error
|
|
|
|
|
id := nextExporterID()
|
|
|
|
|
c.instrumentation, err = observ.NewInstrumentation(id, c.conn.CanonicalTarget())
|
|
|
|
|
return c, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var exporterN atomic.Int64
|
|
|
|
|
|
|
|
|
|
// nextExporterID returns the next unique ID for an exporter.
|
|
|
|
|
func nextExporterID() int64 {
|
|
|
|
|
const inc = 1
|
|
|
|
|
return exporterN.Add(inc) - inc
|
2024-06-24 07:47:21 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newGRPCDialOptions(cfg config) []grpc.DialOption {
|
|
|
|
|
userAgent := "OTel Go OTLP over gRPC logs exporter/" + Version()
|
|
|
|
|
dialOpts := []grpc.DialOption{grpc.WithUserAgent(userAgent)}
|
|
|
|
|
dialOpts = append(dialOpts, cfg.dialOptions.Value...)
|
|
|
|
|
|
|
|
|
|
// Convert other grpc configs to the dial options.
|
|
|
|
|
// Service config
|
|
|
|
|
if cfg.serviceConfig.Value != "" {
|
|
|
|
|
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(cfg.serviceConfig.Value))
|
|
|
|
|
}
|
|
|
|
|
// Prioritize GRPCCredentials over Insecure (passing both is an error).
|
2025-07-29 18:20:32 +02:00
|
|
|
switch {
|
|
|
|
|
case cfg.gRPCCredentials.Value != nil:
|
2024-06-24 07:47:21 -07:00
|
|
|
dialOpts = append(dialOpts, grpc.WithTransportCredentials(cfg.gRPCCredentials.Value))
|
2025-07-29 18:20:32 +02:00
|
|
|
case cfg.insecure.Value:
|
2024-06-24 07:47:21 -07:00
|
|
|
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
2025-07-29 18:20:32 +02:00
|
|
|
default:
|
2024-06-24 07:47:21 -07:00
|
|
|
// Default to using the host's root CA.
|
|
|
|
|
dialOpts = append(dialOpts, grpc.WithTransportCredentials(
|
|
|
|
|
credentials.NewTLS(nil),
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
// Compression
|
|
|
|
|
if cfg.compression.Value == GzipCompression {
|
|
|
|
|
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
|
|
|
|
|
}
|
|
|
|
|
// Reconnection period
|
|
|
|
|
if cfg.reconnectionPeriod.Value != 0 {
|
|
|
|
|
p := grpc.ConnectParams{
|
|
|
|
|
Backoff: backoff.DefaultConfig,
|
|
|
|
|
MinConnectTimeout: cfg.reconnectionPeriod.Value,
|
|
|
|
|
}
|
|
|
|
|
dialOpts = append(dialOpts, grpc.WithConnectParams(p))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return dialOpts
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-03 21:50:02 -07:00
|
|
|
// UploadLogs sends proto logs to connected endpoint.
|
|
|
|
|
//
|
|
|
|
|
// Retryable errors from the server will be handled according to any
|
|
|
|
|
// RetryConfig the client was created with.
|
|
|
|
|
//
|
|
|
|
|
// The otlplog.Exporter synchronizes access to client methods, and
|
|
|
|
|
// ensures this is not called after the Exporter is shutdown. Only thing
|
|
|
|
|
// to do here is send data.
|
2025-09-18 13:08:25 -07:00
|
|
|
func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uploadErr error) {
|
2024-07-03 21:50:02 -07:00
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
// Do not upload if the context is already expired.
|
|
|
|
|
return ctx.Err()
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx, cancel := c.exportContext(ctx)
|
|
|
|
|
defer cancel()
|
|
|
|
|
|
2025-10-03 01:15:41 +08:00
|
|
|
count := int64(len(rl))
|
|
|
|
|
if c.instrumentation != nil {
|
|
|
|
|
eo := c.instrumentation.ExportLogs(ctx, count)
|
|
|
|
|
defer func() {
|
|
|
|
|
eo.End(uploadErr)
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
2025-09-18 13:08:25 -07:00
|
|
|
return errors.Join(uploadErr, c.requestFunc(ctx, func(ctx context.Context) error {
|
2024-07-03 21:50:02 -07:00
|
|
|
resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{
|
|
|
|
|
ResourceLogs: rl,
|
|
|
|
|
})
|
|
|
|
|
if resp != nil && resp.PartialSuccess != nil {
|
|
|
|
|
msg := resp.PartialSuccess.GetErrorMessage()
|
|
|
|
|
n := resp.PartialSuccess.GetRejectedLogRecords()
|
|
|
|
|
if n != 0 || msg != "" {
|
2025-10-03 01:15:41 +08:00
|
|
|
err := internal.LogPartialSuccessError(n, msg)
|
2025-09-18 13:08:25 -07:00
|
|
|
uploadErr = errors.Join(uploadErr, err)
|
2024-07-03 21:50:02 -07:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// nil is converted to OK.
|
|
|
|
|
if status.Code(err) == codes.OK {
|
|
|
|
|
// Success.
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
return err
|
2025-09-18 13:08:25 -07:00
|
|
|
}))
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-03 21:50:02 -07:00
|
|
|
// Shutdown shuts down the client, freeing all resources.
|
|
|
|
|
//
|
|
|
|
|
// Any active connections to a remote endpoint are closed if they were created
|
|
|
|
|
// by the client. Any gRPC connection passed during creation using
|
|
|
|
|
// WithGRPCConn will not be closed. It is the caller's responsibility to
|
|
|
|
|
// handle cleanup of that resource.
|
|
|
|
|
//
|
|
|
|
|
// The otlplog.Exporter synchronizes access to client methods and
|
|
|
|
|
// ensures this is called only once. The only thing that needs to be done
|
|
|
|
|
// here is to release any computational resources the client holds.
|
|
|
|
|
func (c *client) Shutdown(ctx context.Context) error {
|
|
|
|
|
c.metadata = nil
|
|
|
|
|
c.requestFunc = nil
|
|
|
|
|
c.lsc = nil
|
|
|
|
|
|
|
|
|
|
// Release the connection if we created it.
|
|
|
|
|
err := ctx.Err()
|
|
|
|
|
if c.ourConn {
|
|
|
|
|
closeErr := c.conn.Close()
|
|
|
|
|
// A context timeout error takes precedence over this error.
|
|
|
|
|
if err == nil && closeErr != nil {
|
|
|
|
|
err = closeErr
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
c.conn = nil
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// exportContext returns a copy of parent with an appropriate deadline and
|
|
|
|
|
// cancellation function based on the clients configured export timeout.
|
|
|
|
|
//
|
|
|
|
|
// It is the callers responsibility to cancel the returned context once its
|
|
|
|
|
// use is complete, via the parent or directly with the returned CancelFunc, to
|
|
|
|
|
// ensure all resources are correctly released.
|
|
|
|
|
func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) {
|
|
|
|
|
var (
|
|
|
|
|
ctx context.Context
|
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if c.exportTimeout > 0 {
|
Use the cause of the context error in OTLP retry (#6898)
Part of #6588
For a demo code like this
```go
package main
import (
"context"
"fmt"
"log"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
func main() {
ctx := context.Background()
exp, err := newExporter(ctx)
if err != nil {
log.Fatalf("failed to initialize trace exporter: %v", err)
}
tp, err := newTracerProvider(exp)
if err != nil {
log.Fatalf("failed to initialize trace provider: %v", err)
}
defer func() { _ = tp.Shutdown(ctx) }()
otel.SetTracerProvider(tp)
generateSpan()
select {}
}
func generateSpan() {
log.Println("Generating a dummy span")
_, span := otel.Tracer("").Start(context.Background(), "dummy")
defer span.End()
}
func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) {
return sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
), nil
}
func newExporter(ctx context.Context) (*otlptrace.Exporter, error) {
traceExporter, err := otlptrace.New(
ctx,
otlptracegrpc.NewClient(
otlptracegrpc.WithEndpoint("127.0.0.1:4317"),
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
return traceExporter, nil
}
```
the error result from
```
traces export: context deadline exceeded: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused"
```
become
```
traces export: exporter export timeout: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused"
```
2025-06-12 10:02:35 -07:00
|
|
|
ctx, cancel = context.WithTimeoutCause(parent, c.exportTimeout, errors.New("exporter export timeout"))
|
2024-07-03 21:50:02 -07:00
|
|
|
} else {
|
|
|
|
|
ctx, cancel = context.WithCancel(parent)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.metadata.Len() > 0 {
|
2024-10-23 15:39:25 +08:00
|
|
|
md := c.metadata
|
|
|
|
|
if outMD, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
|
|
|
md = metadata.Join(md, outMD)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx = metadata.NewOutgoingContext(ctx, md)
|
2024-07-03 21:50:02 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return ctx, cancel
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-09 12:37:01 -07:00
|
|
|
type noopClient struct{}
|
|
|
|
|
|
|
|
|
|
func newNoopClient() *noopClient {
|
|
|
|
|
return &noopClient{}
|
|
|
|
|
}
|
|
|
|
|
|
2025-08-09 00:38:22 +02:00
|
|
|
func (*noopClient) UploadLogs(context.Context, []*logpb.ResourceLogs) error { return nil }
|
2024-07-09 12:37:01 -07:00
|
|
|
|
2025-08-09 00:38:22 +02:00
|
|
|
func (*noopClient) Shutdown(context.Context) error { return nil }
|
2024-07-09 12:37:01 -07:00
|
|
|
|
2024-06-24 07:47:21 -07:00
|
|
|
// retryable returns if err identifies a request that can be retried and a
|
|
|
|
|
// duration to wait for if an explicit throttle time is included in err.
|
|
|
|
|
func retryable(err error) (bool, time.Duration) {
|
|
|
|
|
s := status.Convert(err)
|
|
|
|
|
return retryableGRPCStatus(s)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
|
|
|
|
|
switch s.Code() {
|
Use the cause of the context error in OTLP retry (#6898)
Part of #6588
For a demo code like this
```go
package main
import (
"context"
"fmt"
"log"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
func main() {
ctx := context.Background()
exp, err := newExporter(ctx)
if err != nil {
log.Fatalf("failed to initialize trace exporter: %v", err)
}
tp, err := newTracerProvider(exp)
if err != nil {
log.Fatalf("failed to initialize trace provider: %v", err)
}
defer func() { _ = tp.Shutdown(ctx) }()
otel.SetTracerProvider(tp)
generateSpan()
select {}
}
func generateSpan() {
log.Println("Generating a dummy span")
_, span := otel.Tracer("").Start(context.Background(), "dummy")
defer span.End()
}
func newTracerProvider(exp sdktrace.SpanExporter) (*sdktrace.TracerProvider, error) {
return sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
), nil
}
func newExporter(ctx context.Context) (*otlptrace.Exporter, error) {
traceExporter, err := otlptrace.New(
ctx,
otlptracegrpc.NewClient(
otlptracegrpc.WithEndpoint("127.0.0.1:4317"),
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{
Enabled: true,
InitialInterval: 1 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}),
),
)
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
return traceExporter, nil
}
```
the error result from
```
traces export: context deadline exceeded: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused"
```
become
```
traces export: exporter export timeout: rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 127.0.0.1:4317: connect: connection refused"
```
2025-06-12 10:02:35 -07:00
|
|
|
// Follows the retryable error codes defined in
|
|
|
|
|
// https://opentelemetry.io/docs/specs/otlp/#failures
|
2024-06-24 07:47:21 -07:00
|
|
|
case codes.Canceled,
|
|
|
|
|
codes.DeadlineExceeded,
|
|
|
|
|
codes.Aborted,
|
|
|
|
|
codes.OutOfRange,
|
|
|
|
|
codes.Unavailable,
|
|
|
|
|
codes.DataLoss:
|
|
|
|
|
// Additionally, handle RetryInfo.
|
|
|
|
|
_, d := throttleDelay(s)
|
|
|
|
|
return true, d
|
|
|
|
|
case codes.ResourceExhausted:
|
|
|
|
|
// Retry only if the server signals that the recovery from resource exhaustion is possible.
|
|
|
|
|
return throttleDelay(s)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Not a retry-able error.
|
|
|
|
|
return false, 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// throttleDelay returns if the status is RetryInfo
|
|
|
|
|
// and the duration to wait for if an explicit throttle time is included.
|
|
|
|
|
func throttleDelay(s *status.Status) (bool, time.Duration) {
|
|
|
|
|
for _, detail := range s.Details() {
|
|
|
|
|
if t, ok := detail.(*errdetails.RetryInfo); ok {
|
|
|
|
|
return true, t.RetryDelay.AsDuration()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false, 0
|
2024-04-25 09:57:24 -07:00
|
|
|
}
|