1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-11-23 22:34:47 +02:00
Files
ian 5dd35ce873 feat: logs SDK observability - otlploggrpc exporter metrics (#7353)
This PR adds support for experimental metrics in `otlploggrpc`

- `otel.sdk.exporter.log.inflight`
- `otel.sdk.exporter.log.exported`
- `otel.sdk.exporter.operation.duration`

References:

-  #7084 
-  https://github.com/open-telemetry/opentelemetry-go/issues/7019
- [Follow
guidelines](a5dcd68ebb/CONTRIBUTING.md (encapsulation)).

-----
```txt
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc
cpu: Apple M3
                                   │ disabled.txt │          enabled.txt          │
                                   │    sec/op    │   sec/op     vs base          │
ExporterExportLogs/Observability-8    681.5µ ± 3%   684.3µ ± 6%  ~ (p=0.315 n=10)

                                   │ disabled.txt │          enabled.txt           │
                                   │     B/op     │     B/op      vs base          │
ExporterExportLogs/Observability-8   672.8Ki ± 0%   673.6Ki ± 1%  ~ (p=0.247 n=10)

                                   │ disabled.txt │            enabled.txt             │
                                   │  allocs/op   │  allocs/op   vs base               │
ExporterExportLogs/Observability-8    9.224k ± 0%   9.232k ± 0%  +0.09% (p=0.000 n=10)
```

-----
```txt
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ
cpu: Apple M3
                                         │  bench.txt  │
                                         │   sec/op    │
InstrumentationExportLogs/NoError-8        162.6n ± 3%
InstrumentationExportLogs/PartialError-8   705.5n ± 5%
InstrumentationExportLogs/FullError-8      592.1n ± 1%
geomean                                    408.0n

                                         │ bench.txt  │
                                         │    B/op    │
InstrumentationExportLogs/NoError-8        152.0 ± 0%
InstrumentationExportLogs/PartialError-8   697.0 ± 0%
InstrumentationExportLogs/FullError-8      616.0 ± 0%
geomean                                    402.6

                                         │ bench.txt  │
                                         │ allocs/op  │
InstrumentationExportLogs/NoError-8        3.000 ± 0%
InstrumentationExportLogs/PartialError-8   10.00 ± 0%
InstrumentationExportLogs/FullError-8      8.000 ± 0%
geomean                                    6.214
```

-----
```txt
pkg: go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ
cpu: Apple M3
                                 │ parse_target.txt │
                                 │      sec/op      │
ParseTarget/HostName-8                 38.00n ± ∞ ¹
ParseTarget/HostPort-8                 51.33n ± ∞ ¹
ParseTarget/IPv4WithoutPort-8          44.74n ± ∞ ¹
ParseTarget/IPv4WithPort-8             62.56n ± ∞ ¹
ParseTarget/IPv6Bare-8                 94.89n ± ∞ ¹
ParseTarget/IPv6Bracket-8              93.78n ± ∞ ¹
ParseTarget/IPv6WithPort-8             57.57n ± ∞ ¹
ParseTarget/UnixSocket-8               8.329n ± ∞ ¹
ParseTarget/UnixAbstractSocket-8       9.082n ± ∞ ¹
ParseTarget/Passthrough-8              58.06n ± ∞ ¹
geomean                                40.64n
¹ need >= 6 samples for confidence interval at level 0.95

                                 │ parse_target.txt │
                                 │       B/op       │
ParseTarget/HostName-8                  48.00 ± ∞ ¹
ParseTarget/HostPort-8                  48.00 ± ∞ ¹
ParseTarget/IPv4WithoutPort-8           16.00 ± ∞ ¹
ParseTarget/IPv4WithPort-8              48.00 ± ∞ ¹
ParseTarget/IPv6Bare-8                  16.00 ± ∞ ¹
ParseTarget/IPv6Bracket-8               16.00 ± ∞ ¹
ParseTarget/IPv6WithPort-8              48.00 ± ∞ ¹
ParseTarget/UnixSocket-8                0.000 ± ∞ ¹
ParseTarget/UnixAbstractSocket-8        0.000 ± ∞ ¹
ParseTarget/Passthrough-8               48.00 ± ∞ ¹
geomean                                           ²
¹ need >= 6 samples for confidence interval at level 0.95
² summaries must be >0 to compute geomean

                                 │ parse_target.txt │
                                 │    allocs/op     │
ParseTarget/HostName-8                  1.000 ± ∞ ¹
ParseTarget/HostPort-8                  1.000 ± ∞ ¹
ParseTarget/IPv4WithoutPort-8           1.000 ± ∞ ¹
ParseTarget/IPv4WithPort-8              1.000 ± ∞ ¹
ParseTarget/IPv6Bare-8                  1.000 ± ∞ ¹
ParseTarget/IPv6Bracket-8               1.000 ± ∞ ¹
ParseTarget/IPv6WithPort-8              1.000 ± ∞ ¹
ParseTarget/UnixSocket-8                0.000 ± ∞ ¹
ParseTarget/UnixAbstractSocket-8        0.000 ± ∞ ¹
ParseTarget/Passthrough-8               1.000 ± ∞ ¹
geomean                                           ²
¹ need >= 6 samples for confidence interval at level 0.95
² summaries must be >0 to compute geomean
```

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
2025-10-02 10:15:41 -07:00

1291 lines
37 KiB
Go

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"
import (
"context"
"errors"
"net"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
lpb "go.opentelemetry.io/proto/otlp/logs/v1"
rpb "go.opentelemetry.io/proto/otlp/resource/v1"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
var (
// Sat Jan 01 2000 00:00:00 GMT+0000.
ts = time.Date(2000, time.January, 0o1, 0, 0, 0, 0, time.FixedZone("GMT", 0))
obs = ts.Add(30 * time.Second)
kvAlice = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{StringValue: "alice"},
}}
kvBob = &cpb.KeyValue{Key: "user", Value: &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{StringValue: "bob"},
}}
kvSrvName = &cpb.KeyValue{Key: "service.name", Value: &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{StringValue: "test server"},
}}
kvSrvVer = &cpb.KeyValue{Key: "service.version", Value: &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{StringValue: "v0.1.0"},
}}
pbSevA = lpb.SeverityNumber_SEVERITY_NUMBER_INFO
pbSevB = lpb.SeverityNumber_SEVERITY_NUMBER_ERROR
pbBodyA = &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{
StringValue: "a",
},
}
pbBodyB = &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{
StringValue: "b",
},
}
spanIDA = []byte{0, 0, 0, 0, 0, 0, 0, 1}
spanIDB = []byte{0, 0, 0, 0, 0, 0, 0, 2}
traceIDA = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}
traceIDB = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2}
flagsA = byte(1)
flagsB = byte(0)
logRecords = []*lpb.LogRecord{
{
TimeUnixNano: uint64(ts.UnixNano()),
ObservedTimeUnixNano: uint64(obs.UnixNano()),
SeverityNumber: pbSevA,
SeverityText: "A",
Body: pbBodyA,
Attributes: []*cpb.KeyValue{kvAlice},
Flags: uint32(flagsA),
TraceId: traceIDA,
SpanId: spanIDA,
},
{
TimeUnixNano: uint64(ts.UnixNano()),
ObservedTimeUnixNano: uint64(obs.UnixNano()),
SeverityNumber: pbSevA,
SeverityText: "A",
Body: pbBodyA,
Attributes: []*cpb.KeyValue{kvBob},
Flags: uint32(flagsA),
TraceId: traceIDA,
SpanId: spanIDA,
},
{
TimeUnixNano: uint64(ts.UnixNano()),
ObservedTimeUnixNano: uint64(obs.UnixNano()),
SeverityNumber: pbSevB,
SeverityText: "B",
Body: pbBodyB,
Attributes: []*cpb.KeyValue{kvAlice},
Flags: uint32(flagsB),
TraceId: traceIDB,
SpanId: spanIDB,
},
{
TimeUnixNano: uint64(ts.UnixNano()),
ObservedTimeUnixNano: uint64(obs.UnixNano()),
SeverityNumber: pbSevB,
SeverityText: "B",
Body: pbBodyB,
Attributes: []*cpb.KeyValue{kvBob},
Flags: uint32(flagsB),
TraceId: traceIDB,
SpanId: spanIDB,
},
}
scope = &cpb.InstrumentationScope{
Name: "test/code/path",
Version: "v0.1.0",
}
scopeLogs = []*lpb.ScopeLogs{
{
Scope: scope,
LogRecords: logRecords,
SchemaUrl: semconv.SchemaURL,
},
}
res = &rpb.Resource{
Attributes: []*cpb.KeyValue{kvSrvName, kvSrvVer},
}
resourceLogs = []*lpb.ResourceLogs{{
Resource: res,
ScopeLogs: scopeLogs,
SchemaUrl: semconv.SchemaURL,
}}
)
func TestThrottleDelay(t *testing.T) {
c := codes.ResourceExhausted
testcases := []struct {
status *status.Status
wantOK bool
wantDuration time.Duration
}{
{
status: status.New(c, "NoRetryInfo"),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "SingleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Millisecond),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 15 * time.Millisecond,
},
{
status: func() *status.Status {
s, err := status.New(c, "ErrorInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "no throttle detail"},
)
require.NoError(t, err)
return s
}(),
wantOK: false,
wantDuration: 0,
},
{
status: func() *status.Status {
s, err := status.New(c, "ErrorAndRetryInfo").WithDetails(
&errdetails.ErrorInfo{Reason: "with throttle detail"},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 13 * time.Minute,
},
{
status: func() *status.Status {
s, err := status.New(c, "DoubleRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(13 * time.Minute),
},
&errdetails.RetryInfo{
RetryDelay: durationpb.New(15 * time.Minute),
},
)
require.NoError(t, err)
return s
}(),
wantOK: true,
wantDuration: 13 * time.Minute,
},
}
for _, tc := range testcases {
t.Run(tc.status.Message(), func(t *testing.T) {
ok, d := throttleDelay(tc.status)
assert.Equal(t, tc.wantOK, ok)
assert.Equal(t, tc.wantDuration, d)
})
}
}
func TestRetryable(t *testing.T) {
retryableCodes := map[codes.Code]bool{
codes.OK: false,
codes.Canceled: true,
codes.Unknown: false,
codes.InvalidArgument: false,
codes.DeadlineExceeded: true,
codes.NotFound: false,
codes.AlreadyExists: false,
codes.PermissionDenied: false,
codes.ResourceExhausted: false,
codes.FailedPrecondition: false,
codes.Aborted: true,
codes.OutOfRange: true,
codes.Unimplemented: false,
codes.Internal: false,
codes.Unavailable: true,
codes.DataLoss: true,
codes.Unauthenticated: false,
}
for c, want := range retryableCodes {
got, _ := retryable(status.Error(c, ""))
assert.Equalf(t, want, got, "evaluate(%s)", c)
}
}
func TestRetryableGRPCStatusResourceExhaustedWithRetryInfo(t *testing.T) {
delay := 15 * time.Millisecond
s, err := status.New(codes.ResourceExhausted, "WithRetryInfo").WithDetails(
&errdetails.RetryInfo{
RetryDelay: durationpb.New(delay),
},
)
require.NoError(t, err)
ok, d := retryableGRPCStatus(s)
assert.True(t, ok)
assert.Equal(t, delay, d)
}
func TestNewClient(t *testing.T) {
newGRPCClientFnSwap := newGRPCClientFn
t.Cleanup(func() {
newGRPCClientFn = newGRPCClientFnSwap
})
// The gRPC connection created by newClient.
conn, err := grpc.NewClient("test", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
newGRPCClientFn = func(string, ...grpc.DialOption) (*grpc.ClientConn, error) {
return conn, nil
}
// The gRPC connection created by users.
userConn, err := grpc.NewClient("test 2", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
testCases := []struct {
name string
cfg config
cli *client
}{
{
name: "empty config",
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
},
},
{
name: "with headers",
cfg: config{
headers: newSetting(map[string]string{
"key": "value",
}),
},
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
metadata: map[string][]string{"key": {"value"}},
},
},
{
name: "with gRPC connection",
cfg: config{
gRPCConn: newSetting(userConn),
},
cli: &client{
ourConn: false,
conn: userConn,
lsc: collogpb.NewLogsServiceClient(userConn),
},
},
{
// It is not possible to compare grpc dial options directly, so we just check that the client is created
// and no panic occurs.
name: "with dial options",
cfg: config{
serviceConfig: newSetting("service config"),
gRPCCredentials: newSetting(credentials.NewTLS(nil)),
compression: newSetting(GzipCompression),
reconnectionPeriod: newSetting(10 * time.Second),
},
cli: &client{
ourConn: true,
conn: conn,
lsc: collogpb.NewLogsServiceClient(conn),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cli, err := newClient(tc.cfg)
require.NoError(t, err)
assert.Equal(t, tc.cli.metadata, cli.metadata)
assert.Equal(t, tc.cli.exportTimeout, cli.exportTimeout)
assert.Equal(t, tc.cli.ourConn, cli.ourConn)
assert.Equal(t, tc.cli.conn, cli.conn)
assert.Equal(t, tc.cli.lsc, cli.lsc)
})
}
}
type exportResult struct {
Response *collogpb.ExportLogsServiceResponse
Err error
}
// storage stores uploaded OTLP log data in their proto form.
type storage struct {
dataMu sync.Mutex
data []*lpb.ResourceLogs
}
// newStorage returns a configure storage ready to store received requests.
func newStorage() *storage {
return &storage{}
}
// Add adds the request to the Storage.
func (s *storage) Add(request *collogpb.ExportLogsServiceRequest) {
s.dataMu.Lock()
defer s.dataMu.Unlock()
s.data = append(s.data, request.ResourceLogs...)
}
// Dump returns all added ResourceLogs and clears the storage.
func (s *storage) Dump() []*lpb.ResourceLogs {
s.dataMu.Lock()
defer s.dataMu.Unlock()
var data []*lpb.ResourceLogs
data, s.data = s.data, []*lpb.ResourceLogs{}
return data
}
// grpcCollector is an OTLP gRPC server that collects all requests it receives.
type grpcCollector struct {
collogpb.UnimplementedLogsServiceServer
headersMu sync.Mutex
headers metadata.MD
storage *storage
resultCh <-chan exportResult
listener net.Listener
srv *grpc.Server
}
var _ collogpb.LogsServiceServer = (*grpcCollector)(nil)
// newGRPCCollector returns a *grpcCollector that is listening at the provided
// endpoint.
//
// If endpoint is an empty string, the returned collector will be listening on
// the localhost interface at an OS chosen port.
//
// If errCh is not nil, the collector will respond to Export calls with errors
// sent on that channel. This means that if errCh is not nil Export calls will
// block until an error is received.
func newGRPCCollector(endpoint string, resultCh <-chan exportResult) (*grpcCollector, error) {
if endpoint == "" {
endpoint = "localhost:0"
}
c := &grpcCollector{
storage: newStorage(),
resultCh: resultCh,
}
var err error
c.listener, err = net.Listen("tcp", endpoint)
if err != nil {
return nil, err
}
c.srv = grpc.NewServer()
collogpb.RegisterLogsServiceServer(c.srv, c)
go func() { _ = c.srv.Serve(c.listener) }()
return c, nil
}
// Export handles the export req.
func (c *grpcCollector) Export(
ctx context.Context,
req *collogpb.ExportLogsServiceRequest,
) (*collogpb.ExportLogsServiceResponse, error) {
c.storage.Add(req)
if h, ok := metadata.FromIncomingContext(ctx); ok {
c.headersMu.Lock()
c.headers = metadata.Join(c.headers, h)
c.headersMu.Unlock()
}
if c.resultCh != nil {
r := <-c.resultCh
if r.Response == nil {
return &collogpb.ExportLogsServiceResponse{}, r.Err
}
return r.Response, r.Err
}
return &collogpb.ExportLogsServiceResponse{}, nil
}
// Collect returns the Storage holding all collected requests.
func (c *grpcCollector) Collect() *storage {
return c.storage
}
func clientFactory(t *testing.T, rCh <-chan exportResult) (*client, *grpcCollector) {
t.Helper()
coll, err := newGRPCCollector("", rCh)
require.NoError(t, err)
addr := coll.listener.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
cfg := newConfig(opts)
client, err := newClient(cfg)
require.NoError(t, err)
return client, coll
}
func testCtxErrs(factory func() func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(t.Context())
t.Cleanup(cancel)
t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
t.Cleanup(innerCancel)
<-innerCtx.Done()
f := factory()
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
})
t.Run("Canceled", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()
f := factory()
assert.ErrorIs(t, f(innerCtx), context.Canceled)
})
}
}
func TestClient(t *testing.T) {
t.Run("ClientHonorsContextErrors", func(t *testing.T) {
t.Run("Shutdown", testCtxErrs(func() func(context.Context) error {
c, _ := clientFactory(t, nil)
return c.Shutdown
}))
t.Run("UploadLog", testCtxErrs(func() func(context.Context) error {
c, _ := clientFactory(t, nil)
return func(ctx context.Context) error {
return c.UploadLogs(ctx, nil)
}
}))
})
t.Run("UploadLogs", func(t *testing.T) {
ctx := t.Context()
client, coll := clientFactory(t, nil)
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.Shutdown(ctx))
got := coll.Collect().Dump()
require.Len(t, got, 1, "upload of one ResourceLogs")
diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal))
if diff != "" {
t.Fatalf("unexpected ResourceLogs:\n%s", diff)
}
})
t.Run("PartialSuccess", func(t *testing.T) {
const n, msg = 2, "bad data"
rCh := make(chan exportResult, 3)
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
// Should not be logged.
RejectedLogRecords: 0,
ErrorMessage: "",
},
},
}
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{},
}
ctx := t.Context()
client, _ := clientFactory(t, rCh)
assert.ErrorIs(t, client.UploadLogs(ctx, resourceLogs), internal.PartialSuccess{})
assert.NoError(t, client.UploadLogs(ctx, resourceLogs))
assert.NoError(t, client.UploadLogs(ctx, resourceLogs))
})
}
func TestConfig(t *testing.T) {
factoryFunc := func(rCh <-chan exportResult, o ...Option) (log.Exporter, *grpcCollector) {
coll, err := newGRPCCollector("", rCh)
require.NoError(t, err)
ctx := t.Context()
opts := append([]Option{
WithEndpoint(coll.listener.Addr().String()),
WithInsecure(),
}, o...)
exp, err := New(ctx, opts...)
require.NoError(t, err)
return exp, coll
}
t.Run("WithHeaders", func(t *testing.T) {
key := "my-custom-header"
headers := map[string]string{key: "custom-value"}
exp, coll := factoryFunc(nil, WithHeaders(headers))
t.Cleanup(coll.srv.Stop)
ctx := t.Context()
additionalKey := "additional-custom-header"
ctx = metadata.AppendToOutgoingContext(ctx, additionalKey, "additional-value")
require.NoError(t, exp.Export(ctx, make([]log.Record, 1)))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))
got := metadata.Join(coll.headers)
require.Regexp(t, "OTel Go OTLP over gRPC logs exporter/[01]\\..*", got)
require.Contains(t, got, key)
require.Contains(t, got, additionalKey)
assert.Equal(t, []string{headers[key]}, got[key])
})
}
// SetExporterID sets the exporter ID counter to v and returns the previous
// value.
//
// This function is useful for testing purposes, allowing you to reset the
// counter. It should not be used in production code.
func SetExporterID(v int64) int64 {
return exporterN.Swap(v)
}
func TestClientObservability(t *testing.T) {
testCases := []struct {
name string
enabled bool
test func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics)
}{
{
name: "disable",
enabled: false,
test: func(t *testing.T, _ func() metricdata.ScopeMetrics) {
client, _ := clientFactory(t, nil)
assert.Empty(t, client.instrumentation)
},
},
{
name: "upload success",
enabled: true,
test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) {
ctx := t.Context()
client, coll := clientFactory(t, nil)
componentName := observ.GetComponentName(0)
serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget())
wantMetrics := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterLogInflight{}.Name(),
Description: otelconv.SDKExporterLogInflight{}.Description(),
Unit: otelconv.SDKExporterLogInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName),
otelconv.SDKExporterLogInflight{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
},
},
},
{
Name: otelconv.SDKExporterLogExported{}.Name(),
Description: otelconv.SDKExporterLogExported{}.Description(),
Unit: otelconv.SDKExporterLogExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: int64(len(resourceLogs)),
},
},
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterOperationDuration{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode(
otelconv.RPCGRPCStatusCodeAttr(
codes.OK,
),
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Count: 1,
},
},
},
},
},
}
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.Shutdown(ctx))
got := coll.Collect().Dump()
require.Len(t, got, 1, "upload of one ResourceLogs")
diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal))
if diff != "" {
t.Fatalf("unexpected ResourceLogs:\n%s", diff)
}
assert.Equal(t, instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
}, wantMetrics.Scope)
g := scopeMetrics()
metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(
t,
wantMetrics.Metrics[2],
g.Metrics[2],
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreValue(),
)
},
},
{
name: "partial success",
enabled: true,
test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) {
const n, msg = 2, "bad data"
rCh := make(chan exportResult, 1)
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
},
}
ctx := t.Context()
client, _ := clientFactory(t, rCh)
componentName := observ.GetComponentName(0)
serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget())
var wantErr error
wantErr = errors.Join(wantErr, internal.LogPartialSuccessError(n, msg))
wantMetrics := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterLogInflight{}.Name(),
Description: otelconv.SDKExporterLogInflight{}.Description(),
Unit: otelconv.SDKExporterLogInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName),
otelconv.SDKExporterLogInflight{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
},
},
},
{
Name: otelconv.SDKExporterLogExported{}.Name(),
Description: otelconv.SDKExporterLogExported{}.Description(),
Unit: otelconv.SDKExporterLogExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
semconv.ErrorType(wantErr),
),
Value: 1,
},
},
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterOperationDuration{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode(
otelconv.RPCGRPCStatusCodeAttr(
status.Code(wantErr),
),
),
serverAddrAttrs[0],
serverAddrAttrs[1],
semconv.ErrorType(wantErr),
),
Count: 1,
},
},
},
},
},
}
err := client.UploadLogs(ctx, resourceLogs)
assert.ErrorContains(t, err, wantErr.Error())
assert.Equal(t, instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
}, wantMetrics.Scope)
g := scopeMetrics()
metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(
t,
wantMetrics.Metrics[2],
g.Metrics[2],
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreValue(),
)
},
},
{
name: "upload failure",
enabled: true,
test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) {
err := status.Error(codes.InvalidArgument, "request contains invalid arguments")
var wantErr error
wantErr = errors.Join(wantErr, err)
wantErrTypeAttr := semconv.ErrorType(wantErr)
wantGRPCStatusCodeAttr := otelconv.RPCGRPCStatusCodeAttr(codes.InvalidArgument)
rCh := make(chan exportResult, 1)
rCh <- exportResult{
Err: err,
}
ctx := t.Context()
client, _ := clientFactory(t, rCh)
uploadErr := client.UploadLogs(ctx, resourceLogs)
assert.ErrorContains(t, uploadErr, "request contains invalid arguments")
componentName := observ.GetComponentName(0)
serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget())
wantMetrics := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterLogInflight{}.Name(),
Description: otelconv.SDKExporterLogInflight{}.Description(),
Unit: otelconv.SDKExporterLogInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName),
otelconv.SDKExporterLogInflight{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
},
},
},
{
Name: otelconv.SDKExporterLogExported{}.Name(),
Description: otelconv.SDKExporterLogExported{}.Description(),
Unit: otelconv.SDKExporterLogExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
wantErrTypeAttr,
),
Value: 1,
},
},
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterOperationDuration{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode(
wantGRPCStatusCodeAttr,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
wantErrTypeAttr,
),
Count: 1,
},
},
},
},
},
}
g := scopeMetrics()
assert.Equal(t, instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
}, wantMetrics.Scope)
metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(
t,
wantMetrics.Metrics[2],
g.Metrics[2],
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreValue(),
)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.enabled {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
// Reset component name counter for each test.
_ = SetExporterID(0)
}
prev := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(prev)
})
r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)
scopeMetrics := func() metricdata.ScopeMetrics {
var got metricdata.ResourceMetrics
err := r.Collect(t.Context(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
return got.ScopeMetrics[0]
}
tc.test(t, scopeMetrics)
})
}
}
func TestClientObservabilityWithRetry(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
_ = SetExporterID(0)
prev := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(prev)
})
r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)
scopeMetrics := func() metricdata.ScopeMetrics {
var got metricdata.ResourceMetrics
err := r.Collect(t.Context(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
return got.ScopeMetrics[0]
}
rCh := make(chan exportResult, 2)
rCh <- exportResult{
Err: status.Error(codes.Unavailable, "service temporarily unavailable"),
}
const n, msg = 1, "some logs rejected"
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
},
}
ctx := t.Context()
client, _ := clientFactory(t, rCh)
componentName := observ.GetComponentName(0)
serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget())
var wantErr error
wantErr = errors.Join(wantErr, internal.LogPartialSuccessError(n, msg))
wantMetrics := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterLogInflight{}.Name(),
Description: otelconv.SDKExporterLogInflight{}.Description(),
Unit: otelconv.SDKExporterLogInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName),
otelconv.SDKExporterLogInflight{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
},
},
},
{
Name: otelconv.SDKExporterLogExported{}.Name(),
Description: otelconv.SDKExporterLogExported{}.Description(),
Unit: otelconv.SDKExporterLogExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: int64(len(resourceLogs)) - n,
},
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
semconv.ErrorType(wantErr),
),
Value: n,
},
},
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterOperationDuration{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode(
otelconv.RPCGRPCStatusCodeAttr(
status.Code(wantErr),
),
),
serverAddrAttrs[0],
serverAddrAttrs[1],
semconv.ErrorType(wantErr),
),
Count: 1,
},
},
},
},
},
}
err := client.UploadLogs(ctx, resourceLogs)
assert.ErrorContains(t, err, wantErr.Error())
assert.Equal(t, instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
}, wantMetrics.Scope)
g := scopeMetrics()
metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(
t,
wantMetrics.Metrics[2],
g.Metrics[2],
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreValue(),
)
}
func BenchmarkExporterExportLogs(b *testing.B) {
const logRecordsCount = 100
run := func(b *testing.B) {
coll, err := newGRPCCollector("", nil)
require.NoError(b, err)
b.Cleanup(func() {
coll.srv.Stop()
})
ctx := b.Context()
opts := []Option{
WithEndpoint(coll.listener.Addr().String()),
WithInsecure(),
WithTimeout(5 * time.Second),
}
exp, err := New(ctx, opts...)
require.NoError(b, err)
b.Cleanup(func() {
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
assert.NoError(b, exp.Shutdown(context.Background()))
})
logs := make([]log.Record, logRecordsCount)
now := time.Now()
for i := range logs {
logs[i].SetTimestamp(now)
logs[i].SetObservedTimestamp(now)
}
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
err := exp.Export(b.Context(), logs)
require.NoError(b, err)
}
}
b.Run("Observability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
run(b)
})
b.Run("NoObservability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "false")
run(b)
})
}
func TestNextExporterID(t *testing.T) {
SetExporterID(0)
var expected int64
for range 10 {
id := nextExporterID()
if id != expected {
t.Errorf("nextExporterID() = %d; want %d", id, expected)
}
expected++
}
}
func TestSetExporterID(t *testing.T) {
SetExporterID(0)
prev := SetExporterID(42)
if prev != 0 {
t.Errorf("SetExporterID(42) returned %d; want 0", prev)
}
id := nextExporterID()
if id != 42 {
t.Errorf("nextExporterID() = %d; want 42", id)
}
}
func TestNextExporterIDConcurrentSafe(t *testing.T) {
SetExporterID(0)
const goroutines = 100
const increments = 10
var wg sync.WaitGroup
wg.Add(goroutines)
for range goroutines {
go func() {
defer wg.Done()
for range increments {
nextExporterID()
}
}()
}
wg.Wait()
expected := int64(goroutines * increments)
if id := nextExporterID(); id != expected {
t.Errorf("nextExporterID() = %d; want %d", id, expected)
}
}