1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-08 23:21:56 +02:00

feat: logs SDK observability - otlploggrpc exporter metrics (#7353)

This PR adds support for experimental metrics in `otlploggrpc`

- `otel.sdk.exporter.log.inflight`
- `otel.sdk.exporter.log.exported`
- `otel.sdk.exporter.operation.duration`

References:

-  #7084 
-  https://github.com/open-telemetry/opentelemetry-go/issues/7019
- [Follow
guidelines](a5dcd68ebb/CONTRIBUTING.md (encapsulation)).

-----
```txt
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc
cpu: Apple M3
                                   │ disabled.txt │          enabled.txt          │
                                   │    sec/op    │   sec/op     vs base          │
ExporterExportLogs/Observability-8    681.5µ ± 3%   684.3µ ± 6%  ~ (p=0.315 n=10)

                                   │ disabled.txt │          enabled.txt           │
                                   │     B/op     │     B/op      vs base          │
ExporterExportLogs/Observability-8   672.8Ki ± 0%   673.6Ki ± 1%  ~ (p=0.247 n=10)

                                   │ disabled.txt │            enabled.txt             │
                                   │  allocs/op   │  allocs/op   vs base               │
ExporterExportLogs/Observability-8    9.224k ± 0%   9.232k ± 0%  +0.09% (p=0.000 n=10)
```

-----
```txt
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ
cpu: Apple M3
                                         │  bench.txt  │
                                         │   sec/op    │
InstrumentationExportLogs/NoError-8        162.6n ± 3%
InstrumentationExportLogs/PartialError-8   705.5n ± 5%
InstrumentationExportLogs/FullError-8      592.1n ± 1%
geomean                                    408.0n

                                         │ bench.txt  │
                                         │    B/op    │
InstrumentationExportLogs/NoError-8        152.0 ± 0%
InstrumentationExportLogs/PartialError-8   697.0 ± 0%
InstrumentationExportLogs/FullError-8      616.0 ± 0%
geomean                                    402.6

                                         │ bench.txt  │
                                         │ allocs/op  │
InstrumentationExportLogs/NoError-8        3.000 ± 0%
InstrumentationExportLogs/PartialError-8   10.00 ± 0%
InstrumentationExportLogs/FullError-8      8.000 ± 0%
geomean                                    6.214
```

-----
```txt
pkg: go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ
cpu: Apple M3
                                 │ parse_target.txt │
                                 │      sec/op      │
ParseTarget/HostName-8                 38.00n ± ∞ ¹
ParseTarget/HostPort-8                 51.33n ± ∞ ¹
ParseTarget/IPv4WithoutPort-8          44.74n ± ∞ ¹
ParseTarget/IPv4WithPort-8             62.56n ± ∞ ¹
ParseTarget/IPv6Bare-8                 94.89n ± ∞ ¹
ParseTarget/IPv6Bracket-8              93.78n ± ∞ ¹
ParseTarget/IPv6WithPort-8             57.57n ± ∞ ¹
ParseTarget/UnixSocket-8               8.329n ± ∞ ¹
ParseTarget/UnixAbstractSocket-8       9.082n ± ∞ ¹
ParseTarget/Passthrough-8              58.06n ± ∞ ¹
geomean                                40.64n
¹ need >= 6 samples for confidence interval at level 0.95

                                 │ parse_target.txt │
                                 │       B/op       │
ParseTarget/HostName-8                  48.00 ± ∞ ¹
ParseTarget/HostPort-8                  48.00 ± ∞ ¹
ParseTarget/IPv4WithoutPort-8           16.00 ± ∞ ¹
ParseTarget/IPv4WithPort-8              48.00 ± ∞ ¹
ParseTarget/IPv6Bare-8                  16.00 ± ∞ ¹
ParseTarget/IPv6Bracket-8               16.00 ± ∞ ¹
ParseTarget/IPv6WithPort-8              48.00 ± ∞ ¹
ParseTarget/UnixSocket-8                0.000 ± ∞ ¹
ParseTarget/UnixAbstractSocket-8        0.000 ± ∞ ¹
ParseTarget/Passthrough-8               48.00 ± ∞ ¹
geomean                                           ²
¹ need >= 6 samples for confidence interval at level 0.95
² summaries must be >0 to compute geomean

                                 │ parse_target.txt │
                                 │    allocs/op     │
ParseTarget/HostName-8                  1.000 ± ∞ ¹
ParseTarget/HostPort-8                  1.000 ± ∞ ¹
ParseTarget/IPv4WithoutPort-8           1.000 ± ∞ ¹
ParseTarget/IPv4WithPort-8              1.000 ± ∞ ¹
ParseTarget/IPv6Bare-8                  1.000 ± ∞ ¹
ParseTarget/IPv6Bracket-8               1.000 ± ∞ ¹
ParseTarget/IPv6WithPort-8              1.000 ± ∞ ¹
ParseTarget/UnixSocket-8                0.000 ± ∞ ¹
ParseTarget/UnixAbstractSocket-8        0.000 ± ∞ ¹
ParseTarget/Passthrough-8               1.000 ± ∞ ¹
geomean                                           ²
¹ need >= 6 samples for confidence interval at level 0.95
² summaries must be >0 to compute geomean
```

---------

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
ian
2025-10-03 01:15:41 +08:00
committed by GitHub
parent 4b9e111aa5
commit 5dd35ce873
19 changed files with 1986 additions and 23 deletions

View File

@@ -16,6 +16,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Greatly reduce the cost of recording metrics in `go.opentelemetry.io/otel/sdk/metric` using hashing for map keys. (#7175) - Greatly reduce the cost of recording metrics in `go.opentelemetry.io/otel/sdk/metric` using hashing for map keys. (#7175)
- Add experimental observability for the prometheus exporter in `go.opentelemetry.io/otel/exporters/prometheus`. - Add experimental observability for the prometheus exporter in `go.opentelemetry.io/otel/exporters/prometheus`.
Check the `go.opentelemetry.io/otel/exporters/prometheus/internal/x` package documentation for more information. (#7345) Check the `go.opentelemetry.io/otel/exporters/prometheus/internal/x` package documentation for more information. (#7345)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#7353)
### Fixed ### Fixed

View File

@@ -6,7 +6,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o
import ( import (
"context" "context"
"errors" "errors"
"fmt" "sync/atomic"
"time" "time"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1" collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
@@ -21,6 +21,8 @@ import (
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
) )
@@ -37,6 +39,8 @@ type client struct {
ourConn bool ourConn bool
conn *grpc.ClientConn conn *grpc.ClientConn
lsc collogpb.LogsServiceClient lsc collogpb.LogsServiceClient
instrumentation *observ.Instrumentation
} }
// Used for testing. // Used for testing.
@@ -71,7 +75,18 @@ func newClient(cfg config) (*client, error) {
c.lsc = collogpb.NewLogsServiceClient(c.conn) c.lsc = collogpb.NewLogsServiceClient(c.conn)
return c, nil var err error
id := nextExporterID()
c.instrumentation, err = observ.NewInstrumentation(id, c.conn.CanonicalTarget())
return c, err
}
var exporterN atomic.Int64
// nextExporterID returns the next unique ID for an exporter.
func nextExporterID() int64 {
const inc = 1
return exporterN.Add(inc) - inc
} }
func newGRPCDialOptions(cfg config) []grpc.DialOption { func newGRPCDialOptions(cfg config) []grpc.DialOption {
@@ -131,6 +146,14 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
ctx, cancel := c.exportContext(ctx) ctx, cancel := c.exportContext(ctx)
defer cancel() defer cancel()
count := int64(len(rl))
if c.instrumentation != nil {
eo := c.instrumentation.ExportLogs(ctx, count)
defer func() {
eo.End(uploadErr)
}()
}
return errors.Join(uploadErr, c.requestFunc(ctx, func(ctx context.Context) error { return errors.Join(uploadErr, c.requestFunc(ctx, func(ctx context.Context) error {
resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{ resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{
ResourceLogs: rl, ResourceLogs: rl,
@@ -139,7 +162,7 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
msg := resp.PartialSuccess.GetErrorMessage() msg := resp.PartialSuccess.GetErrorMessage()
n := resp.PartialSuccess.GetRejectedLogRecords() n := resp.PartialSuccess.GetRejectedLogRecords()
if n != 0 || msg != "" { if n != 0 || msg != "" {
err := errPartial{msg: msg, n: n} err := internal.LogPartialSuccessError(n, msg)
uploadErr = errors.Join(uploadErr, err) uploadErr = errors.Join(uploadErr, err)
} }
} }
@@ -152,23 +175,6 @@ func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) (uplo
})) }))
} }
type errPartial struct {
msg string
n int64
}
var _ error = errPartial{}
func (e errPartial) Error() string {
const form = "OTLP partial success: %s (%d log records rejected)"
return fmt.Sprintf(form, e.msg, e.n)
}
func (errPartial) Is(target error) bool {
_, ok := target.(errPartial)
return ok
}
// Shutdown shuts down the client, freeing all resources. // Shutdown shuts down the client, freeing all resources.
// //
// Any active connections to a remote endpoint are closed if they were created // Any active connections to a remote endpoint are closed if they were created

View File

@@ -5,6 +5,7 @@ package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/o
import ( import (
"context" "context"
"errors"
"net" "net"
"sync" "sync"
"testing" "testing"
@@ -27,8 +28,17 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/durationpb"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/log" "go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0" semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
) )
var ( var (
@@ -546,7 +556,7 @@ func TestClient(t *testing.T) {
ctx := t.Context() ctx := t.Context()
client, _ := clientFactory(t, rCh) client, _ := clientFactory(t, rCh)
assert.ErrorIs(t, client.UploadLogs(ctx, resourceLogs), errPartial{}) assert.ErrorIs(t, client.UploadLogs(ctx, resourceLogs), internal.PartialSuccess{})
assert.NoError(t, client.UploadLogs(ctx, resourceLogs)) assert.NoError(t, client.UploadLogs(ctx, resourceLogs))
assert.NoError(t, client.UploadLogs(ctx, resourceLogs)) assert.NoError(t, client.UploadLogs(ctx, resourceLogs))
}) })
@@ -587,3 +597,694 @@ func TestConfig(t *testing.T) {
assert.Equal(t, []string{headers[key]}, got[key]) assert.Equal(t, []string{headers[key]}, got[key])
}) })
} }
// SetExporterID sets the exporter ID counter to v and returns the previous
// value.
//
// This function is useful for testing purposes, allowing you to reset the
// counter. It should not be used in production code.
func SetExporterID(v int64) int64 {
return exporterN.Swap(v)
}
func TestClientObservability(t *testing.T) {
testCases := []struct {
name string
enabled bool
test func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics)
}{
{
name: "disable",
enabled: false,
test: func(t *testing.T, _ func() metricdata.ScopeMetrics) {
client, _ := clientFactory(t, nil)
assert.Empty(t, client.instrumentation)
},
},
{
name: "upload success",
enabled: true,
test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) {
ctx := t.Context()
client, coll := clientFactory(t, nil)
componentName := observ.GetComponentName(0)
serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget())
wantMetrics := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterLogInflight{}.Name(),
Description: otelconv.SDKExporterLogInflight{}.Description(),
Unit: otelconv.SDKExporterLogInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName),
otelconv.SDKExporterLogInflight{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
},
},
},
{
Name: otelconv.SDKExporterLogExported{}.Name(),
Description: otelconv.SDKExporterLogExported{}.Description(),
Unit: otelconv.SDKExporterLogExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: int64(len(resourceLogs)),
},
},
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterOperationDuration{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode(
otelconv.RPCGRPCStatusCodeAttr(
codes.OK,
),
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Count: 1,
},
},
},
},
},
}
require.NoError(t, client.UploadLogs(ctx, resourceLogs))
require.NoError(t, client.Shutdown(ctx))
got := coll.Collect().Dump()
require.Len(t, got, 1, "upload of one ResourceLogs")
diff := cmp.Diff(got[0], resourceLogs[0], cmp.Comparer(proto.Equal))
if diff != "" {
t.Fatalf("unexpected ResourceLogs:\n%s", diff)
}
assert.Equal(t, instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
}, wantMetrics.Scope)
g := scopeMetrics()
metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(
t,
wantMetrics.Metrics[2],
g.Metrics[2],
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreValue(),
)
},
},
{
name: "partial success",
enabled: true,
test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) {
const n, msg = 2, "bad data"
rCh := make(chan exportResult, 1)
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
},
}
ctx := t.Context()
client, _ := clientFactory(t, rCh)
componentName := observ.GetComponentName(0)
serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget())
var wantErr error
wantErr = errors.Join(wantErr, internal.LogPartialSuccessError(n, msg))
wantMetrics := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterLogInflight{}.Name(),
Description: otelconv.SDKExporterLogInflight{}.Description(),
Unit: otelconv.SDKExporterLogInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName),
otelconv.SDKExporterLogInflight{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
},
},
},
{
Name: otelconv.SDKExporterLogExported{}.Name(),
Description: otelconv.SDKExporterLogExported{}.Description(),
Unit: otelconv.SDKExporterLogExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
semconv.ErrorType(wantErr),
),
Value: 1,
},
},
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterOperationDuration{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode(
otelconv.RPCGRPCStatusCodeAttr(
status.Code(wantErr),
),
),
serverAddrAttrs[0],
serverAddrAttrs[1],
semconv.ErrorType(wantErr),
),
Count: 1,
},
},
},
},
},
}
err := client.UploadLogs(ctx, resourceLogs)
assert.ErrorContains(t, err, wantErr.Error())
assert.Equal(t, instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
}, wantMetrics.Scope)
g := scopeMetrics()
metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(
t,
wantMetrics.Metrics[2],
g.Metrics[2],
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreValue(),
)
},
},
{
name: "upload failure",
enabled: true,
test: func(t *testing.T, scopeMetrics func() metricdata.ScopeMetrics) {
err := status.Error(codes.InvalidArgument, "request contains invalid arguments")
var wantErr error
wantErr = errors.Join(wantErr, err)
wantErrTypeAttr := semconv.ErrorType(wantErr)
wantGRPCStatusCodeAttr := otelconv.RPCGRPCStatusCodeAttr(codes.InvalidArgument)
rCh := make(chan exportResult, 1)
rCh <- exportResult{
Err: err,
}
ctx := t.Context()
client, _ := clientFactory(t, rCh)
uploadErr := client.UploadLogs(ctx, resourceLogs)
assert.ErrorContains(t, uploadErr, "request contains invalid arguments")
componentName := observ.GetComponentName(0)
serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget())
wantMetrics := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterLogInflight{}.Name(),
Description: otelconv.SDKExporterLogInflight{}.Description(),
Unit: otelconv.SDKExporterLogInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName),
otelconv.SDKExporterLogInflight{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
},
},
},
{
Name: otelconv.SDKExporterLogExported{}.Name(),
Description: otelconv.SDKExporterLogExported{}.Description(),
Unit: otelconv.SDKExporterLogExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
wantErrTypeAttr,
),
Value: 1,
},
},
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterOperationDuration{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode(
wantGRPCStatusCodeAttr,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
wantErrTypeAttr,
),
Count: 1,
},
},
},
},
},
}
g := scopeMetrics()
assert.Equal(t, instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
}, wantMetrics.Scope)
metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(
t,
wantMetrics.Metrics[2],
g.Metrics[2],
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreValue(),
)
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.enabled {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
// Reset component name counter for each test.
_ = SetExporterID(0)
}
prev := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(prev)
})
r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)
scopeMetrics := func() metricdata.ScopeMetrics {
var got metricdata.ResourceMetrics
err := r.Collect(t.Context(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
return got.ScopeMetrics[0]
}
tc.test(t, scopeMetrics)
})
}
}
func TestClientObservabilityWithRetry(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
_ = SetExporterID(0)
prev := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(prev)
})
r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)
scopeMetrics := func() metricdata.ScopeMetrics {
var got metricdata.ResourceMetrics
err := r.Collect(t.Context(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
return got.ScopeMetrics[0]
}
rCh := make(chan exportResult, 2)
rCh <- exportResult{
Err: status.Error(codes.Unavailable, "service temporarily unavailable"),
}
const n, msg = 1, "some logs rejected"
rCh <- exportResult{
Response: &collogpb.ExportLogsServiceResponse{
PartialSuccess: &collogpb.ExportLogsPartialSuccess{
RejectedLogRecords: n,
ErrorMessage: msg,
},
},
}
ctx := t.Context()
client, _ := clientFactory(t, rCh)
componentName := observ.GetComponentName(0)
serverAddrAttrs := observ.ServerAddrAttrs(client.conn.CanonicalTarget())
var wantErr error
wantErr = errors.Join(wantErr, internal.LogPartialSuccessError(n, msg))
wantMetrics := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterLogInflight{}.Name(),
Description: otelconv.SDKExporterLogInflight{}.Description(),
Unit: otelconv.SDKExporterLogInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogInflight{}.AttrComponentName(componentName),
otelconv.SDKExporterLogInflight{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: 0,
},
},
},
},
{
Name: otelconv.SDKExporterLogExported{}.Name(),
Description: otelconv.SDKExporterLogExported{}.Description(),
Unit: otelconv.SDKExporterLogExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
),
Value: int64(len(resourceLogs)) - n,
},
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterLogExported{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
serverAddrAttrs[0],
serverAddrAttrs[1],
semconv.ErrorType(wantErr),
),
Value: n,
},
},
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: attribute.NewSet(
otelconv.SDKExporterLogExported{}.AttrComponentName(componentName),
otelconv.SDKExporterOperationDuration{}.AttrComponentType(
otelconv.ComponentTypeOtlpGRPCLogExporter,
),
otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode(
otelconv.RPCGRPCStatusCodeAttr(
status.Code(wantErr),
),
),
serverAddrAttrs[0],
serverAddrAttrs[1],
semconv.ErrorType(wantErr),
),
Count: 1,
},
},
},
},
},
}
err := client.UploadLogs(ctx, resourceLogs)
assert.ErrorContains(t, err, wantErr.Error())
assert.Equal(t, instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: semconv.SchemaURL,
}, wantMetrics.Scope)
g := scopeMetrics()
metricdatatest.AssertEqual(t, wantMetrics.Metrics[0], g.Metrics[0], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(t, wantMetrics.Metrics[1], g.Metrics[1], metricdatatest.IgnoreTimestamp())
metricdatatest.AssertEqual(
t,
wantMetrics.Metrics[2],
g.Metrics[2],
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreValue(),
)
}
func BenchmarkExporterExportLogs(b *testing.B) {
const logRecordsCount = 100
run := func(b *testing.B) {
coll, err := newGRPCCollector("", nil)
require.NoError(b, err)
b.Cleanup(func() {
coll.srv.Stop()
})
ctx := b.Context()
opts := []Option{
WithEndpoint(coll.listener.Addr().String()),
WithInsecure(),
WithTimeout(5 * time.Second),
}
exp, err := New(ctx, opts...)
require.NoError(b, err)
b.Cleanup(func() {
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
assert.NoError(b, exp.Shutdown(context.Background()))
})
logs := make([]log.Record, logRecordsCount)
now := time.Now()
for i := range logs {
logs[i].SetTimestamp(now)
logs[i].SetObservedTimestamp(now)
}
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
err := exp.Export(b.Context(), logs)
require.NoError(b, err)
}
}
b.Run("Observability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
run(b)
})
b.Run("NoObservability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "false")
run(b)
})
}
func TestNextExporterID(t *testing.T) {
SetExporterID(0)
var expected int64
for range 10 {
id := nextExporterID()
if id != expected {
t.Errorf("nextExporterID() = %d; want %d", id, expected)
}
expected++
}
}
func TestSetExporterID(t *testing.T) {
SetExporterID(0)
prev := SetExporterID(42)
if prev != 0 {
t.Errorf("SetExporterID(42) returned %d; want 0", prev)
}
id := nextExporterID()
if id != 42 {
t.Errorf("nextExporterID() = %d; want 42", id)
}
}
func TestNextExporterIDConcurrentSafe(t *testing.T) {
SetExporterID(0)
const goroutines = 100
const increments = 10
var wg sync.WaitGroup
wg.Add(goroutines)
for range goroutines {
go func() {
defer wg.Done()
for range increments {
nextExporterID()
}
}()
}
wg.Wait()
expected := int64(goroutines * increments)
if id := nextExporterID(); id != expected {
t.Errorf("nextExporterID() = %d; want %d", id, expected)
}
}

View File

@@ -11,6 +11,8 @@ import (
"sync/atomic" "sync/atomic"
"testing" "testing"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1" collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
@@ -214,7 +216,7 @@ func TestExporter(t *testing.T) {
c, _ := clientFactory(t, rCh) c, _ := clientFactory(t, rCh)
e := newExporter(c) e := newExporter(c)
assert.ErrorIs(t, e.Export(ctx, records), errPartial{}) assert.ErrorIs(t, e.Export(ctx, records), internal.PartialSuccess{})
assert.NoError(t, e.Export(ctx, records)) assert.NoError(t, e.Export(ctx, records))
assert.NoError(t, e.Export(ctx, records)) assert.NoError(t, e.Export(ctx, records))
}) })

View File

@@ -11,9 +11,11 @@ require (
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/log v0.14.0 go.opentelemetry.io/otel/log v0.14.0
go.opentelemetry.io/otel/metric v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/sdk/log v0.14.0 go.opentelemetry.io/otel/sdk/log v0.14.0
go.opentelemetry.io/otel/sdk/log/logtest v0.14.0 go.opentelemetry.io/otel/sdk/log/logtest v0.14.0
go.opentelemetry.io/otel/sdk/metric v1.38.0
go.opentelemetry.io/otel/trace v1.38.0 go.opentelemetry.io/otel/trace v1.38.0
go.opentelemetry.io/proto/otlp v1.8.0 go.opentelemetry.io/proto/otlp v1.8.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4 google.golang.org/genproto/googleapis/rpc v0.0.0-20250929231259-57b25ae835d4
@@ -29,7 +31,6 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
golang.org/x/net v0.44.0 // indirect golang.org/x/net v0.44.0 // indirect
golang.org/x/sys v0.36.0 // indirect golang.org/x/sys v0.36.0 // indirect
golang.org/x/text v0.29.0 // indirect golang.org/x/text v0.29.0 // indirect

View File

@@ -5,6 +5,12 @@
// package. // package.
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal" package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
//go:generate gotmpl --body=../../../../../internal/shared/otlp/observ/target.go.tmpl "--data={ \"pkg\": \"observ\", \"pkg_path\": \"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ\" }" --out=observ/target.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/observ/target_test.go.tmpl "--data={ \"pkg\": \"observ\" }" --out=observ/target_test.go
//go:generate gotmpl --body=../../../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc\" }" --out=x/x.go
//go:generate gotmpl --body=../../../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry.go.tmpl "--data={}" --out=retry/retry.go //go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry.go.tmpl "--data={}" --out=retry/retry.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry_test.go.tmpl "--data={}" --out=retry/retry_test.go //go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry_test.go.tmpl "--data={}" --out=retry/retry_test.go

View File

@@ -0,0 +1,310 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package observ provides observability metrics for OTLP log exporters.
// This is an experimental feature controlled by the x.Observability feature flag.
package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
import (
"context"
"errors"
"fmt"
"sync"
"time"
"go.opentelemetry.io/otel/internal/global"
"google.golang.org/grpc/status"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/x"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
const (
// ScopeName is the unique name of the meter used for instrumentation.
ScopeName = "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
// Version is the current version of this instrumentation.
//
// This matches the version of the exporter.
Version = internal.Version
)
var (
attrsPool = &sync.Pool{
New: func() any {
const n = 1 /* component.name */ +
1 /* component.type */ +
1 /* server.addr */ +
1 /* server.port */ +
1 /* error.type */ +
1 /* rpc.grpc.status_code */
s := make([]attribute.KeyValue, 0, n)
// Return a pointer to a slice instead of a slice itself
// to avoid allocations on every call.
return &s
},
}
addOpPool = &sync.Pool{
New: func() any {
const n = 1 // WithAttributeSet
o := make([]metric.AddOption, 0, n)
return &o
},
}
recordOptPool = &sync.Pool{
New: func() any {
const n = 1 // WithAttributeSet
o := make([]metric.RecordOption, 0, n)
return &o
},
}
)
func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) }
func put[T any](p *sync.Pool, s *[]T) {
*s = (*s)[:0]
p.Put(s)
}
// GetComponentName returns the constant name for the exporter with the
// provided id.
func GetComponentName(id int64) string {
return fmt.Sprintf("%s/%d", otelconv.ComponentTypeOtlpGRPCLogExporter, id)
}
// getPresetAttrs builds the preset attributes for instrumentation.
func getPresetAttrs(id int64, target string) []attribute.KeyValue {
serverAttrs := ServerAddrAttrs(target)
attrs := make([]attribute.KeyValue, 0, 2+len(serverAttrs))
attrs = append(
attrs,
semconv.OTelComponentName(GetComponentName(id)),
semconv.OTelComponentTypeOtlpGRPCLogExporter,
)
attrs = append(attrs, serverAttrs...)
return attrs
}
// Instrumentation is experimental instrumentation for the exporter.
type Instrumentation struct {
logInflightMetric metric.Int64UpDownCounter
logExportedMetric metric.Int64Counter
logExportedDurationMetric metric.Float64Histogram
presetAttrs []attribute.KeyValue
addOpt metric.AddOption
recOpt metric.RecordOption
}
// NewInstrumentation returns instrumentation for otlplog grpc exporter.
func NewInstrumentation(id int64, target string) (*Instrumentation, error) {
if !x.Observability.Enabled() {
return nil, nil
}
i := &Instrumentation{}
mp := otel.GetMeterProvider()
m := mp.Meter(
ScopeName,
metric.WithInstrumentationVersion(Version),
metric.WithSchemaURL(semconv.SchemaURL),
)
var err error
logInflightMetric, e := otelconv.NewSDKExporterLogInflight(m)
if e != nil {
e = fmt.Errorf("failed to create log inflight metric: %w", e)
err = errors.Join(err, e)
}
i.logInflightMetric = logInflightMetric.Inst()
logExportedMetric, e := otelconv.NewSDKExporterLogExported(m)
if e != nil {
e = fmt.Errorf("failed to create log exported metric: %w", e)
err = errors.Join(err, e)
}
i.logExportedMetric = logExportedMetric.Inst()
logOpDurationMetric, e := otelconv.NewSDKExporterOperationDuration(m)
if e != nil {
e = fmt.Errorf("failed to create log operation duration metric: %w", e)
err = errors.Join(err, e)
}
i.logExportedDurationMetric = logOpDurationMetric.Inst()
if err != nil {
return nil, err
}
i.presetAttrs = getPresetAttrs(id, target)
i.addOpt = metric.WithAttributeSet(attribute.NewSet(i.presetAttrs...))
i.recOpt = metric.WithAttributeSet(attribute.NewSet(append(
// Default to OK status code.
[]attribute.KeyValue{semconv.RPCGRPCStatusCodeOk},
i.presetAttrs...,
)...))
return i, nil
}
// ExportLogs instruments the ExportLogs method of the exporter. It returns
// an [ExportOp] that must have its [ExportOp.End] method called when the
// ExportLogs method returns.
func (i *Instrumentation) ExportLogs(ctx context.Context, count int64) ExportOp {
start := time.Now()
addOpt := get[metric.AddOption](addOpPool)
defer put(addOpPool, addOpt)
*addOpt = append(*addOpt, i.addOpt)
i.logInflightMetric.Add(ctx, count, *addOpt...)
return ExportOp{
nLogs: count,
ctx: ctx,
start: start,
inst: i,
}
}
// ExportOp tracks the operation being observed by [Instrumentation.ExportLogs].
type ExportOp struct {
nLogs int64
ctx context.Context
start time.Time
inst *Instrumentation
}
// End completes the observation of the operation being observed by a call to
// [Instrumentation.ExportLogs].
// Any error that is encountered is provided as err.
//
// If err is not nil, all logs will be recorded as failures unless error is of
// type [internal.PartialSuccess]. In the case of a PartialSuccess, the number
// of successfully exported logs will be determined by inspecting the
// RejectedItems field of the PartialSuccess.
func (e ExportOp) End(err error) {
addOpt := get[metric.AddOption](addOpPool)
defer put(addOpPool, addOpt)
*addOpt = append(*addOpt, e.inst.addOpt)
e.inst.logInflightMetric.Add(e.ctx, -e.nLogs, *addOpt...)
success := successful(e.nLogs, err)
e.inst.logExportedMetric.Add(e.ctx, success, *addOpt...)
if err != nil {
// Add the error.type attribute to the attribute set.
attrs := get[attribute.KeyValue](attrsPool)
defer put(attrsPool, attrs)
*attrs = append(*attrs, e.inst.presetAttrs...)
*attrs = append(*attrs, semconv.ErrorType(err))
o := metric.WithAttributeSet(attribute.NewSet(*attrs...))
// Reset addOpt with new attribute set
*addOpt = append((*addOpt)[:0], o)
e.inst.logExportedMetric.Add(e.ctx, e.nLogs-success, *addOpt...)
}
recordOpt := get[metric.RecordOption](recordOptPool)
defer put(recordOptPool, recordOpt)
*recordOpt = append(*recordOpt, e.inst.recordOption(err))
e.inst.logExportedDurationMetric.Record(e.ctx, time.Since(e.start).Seconds(), *recordOpt...)
}
func (i *Instrumentation) recordOption(err error) metric.RecordOption {
if err == nil {
return i.recOpt
}
attrs := get[attribute.KeyValue](attrsPool)
defer put(attrsPool, attrs)
*attrs = append(*attrs, i.presetAttrs...)
code := int64(status.Code(err))
*attrs = append(
*attrs,
semconv.RPCGRPCStatusCodeKey.Int64(code),
semconv.ErrorType(err),
)
return metric.WithAttributeSet(attribute.NewSet(*attrs...))
}
// successful returns the number of successfully exported logs out of the n
// that were exported based on the provided error.
//
// If err is nil, n is returned. All logs were successfully exported.
//
// If err is not nil and not an [internal.PartialSuccess] error, 0 is returned.
// It is assumed all logs failed to be exported.
//
// If err is an [internal.PartialSuccess] error, the number of successfully
// exported logs is computed by subtracting the RejectedItems field from n. If
// RejectedItems is negative, n is returned. If RejectedItems is greater than
// n, 0 is returned.
func successful(n int64, err error) int64 {
if err == nil {
return n // All logs successfully exported.
}
// Split rejection calculation so successful is inlineable.
return n - rejectedCount(n, err)
}
var errPool = sync.Pool{
New: func() any {
return new(internal.PartialSuccess)
},
}
// rejectedCount returns how many out of the n logs exporter were rejected based on
// the provided non-nil err.
func rejectedCount(n int64, err error) int64 {
ps := errPool.Get().(*internal.PartialSuccess)
defer errPool.Put(ps)
// check for partial success
if errors.As(err, ps) {
return min(max(ps.RejectedItems, 0), n)
}
// all logs exporter
return n
}
// ServerAddrAttrs is a function that extracts server address and port attributes
// from a target string.
func ServerAddrAttrs(target string) []attribute.KeyValue {
addr, port, err := ParseCanonicalTarget(target)
if err != nil || (addr == "" && port < 0) {
if err != nil {
global.Debug("failed to parse target", "target", target, "error", err)
}
return nil
}
// Unix domain sockets: return only the path as server.address
if port == -1 {
return []attribute.KeyValue{semconv.ServerAddress(addr)}
}
// For network addresses, only include port if it's valid (> 0)
if port > 0 {
return []attribute.KeyValue{
semconv.ServerAddress(addr),
semconv.ServerPort(port),
}
}
// Port is 0 or invalid, only return address
return []attribute.KeyValue{semconv.ServerAddress(addr)}
}

View File

@@ -0,0 +1,329 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
mapi "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
const (
ID = 0
TARGET = "localhost:8080"
)
type errMeterProvider struct {
mapi.MeterProvider
err error
}
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
return &errMeter{err: m.err}
}
type errMeter struct {
mapi.Meter
err error
}
func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) {
return nil, m.err
}
func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
return nil, m.err
}
func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) {
return nil, m.err
}
func TestNewExporterMetrics(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
t.Run("No Error", func(t *testing.T) {
em, err := NewInstrumentation(ID, "dns:///example.com:42")
require.NoError(t, err)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.OTelComponentName(GetComponentName(ID)),
semconv.OTelComponentTypeKey.String(string(otelconv.ComponentTypeOtlpGRPCLogExporter)),
semconv.ServerAddress("example.com"),
semconv.ServerPort(42),
}, em.presetAttrs)
assert.NotNil(t, em.logInflightMetric, "logInflightMetric should be created")
assert.NotNil(t, em.logExportedMetric, "logExportedMetric should be created")
assert.NotNil(t, em.logExportedDurationMetric, "logExportedDurationMetric should be created")
})
t.Run("Error", func(t *testing.T) {
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })
mp := &errMeterProvider{err: assert.AnError}
otel.SetMeterProvider(mp)
_, err := NewInstrumentation(ID, "dns:///:8080")
require.ErrorIs(t, err, assert.AnError, "new instrument errors")
assert.ErrorContains(t, err, "inflight metric")
assert.ErrorContains(t, err, "log exported metric")
assert.ErrorContains(t, err, "operation duration metric")
})
}
func TestServerAddrAttrs(t *testing.T) {
testcases := []struct {
name string
target string
want []attribute.KeyValue
}{
{
name: "Unix socket",
target: "unix:///tmp/grpc.sock",
want: []attribute.KeyValue{semconv.ServerAddress("/tmp/grpc.sock")},
},
{
name: "DNS with port",
target: "dns:///localhost:8080",
want: []attribute.KeyValue{semconv.ServerAddress("localhost"), semconv.ServerPort(8080)},
},
{
name: "Dns with endpoint host:port",
target: "dns://8.8.8.8/example.com:4",
want: []attribute.KeyValue{semconv.ServerAddress("example.com"), semconv.ServerPort(4)},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
attrs := ServerAddrAttrs(tc.target)
assert.Equal(t, tc.want, attrs)
})
}
}
func set(err error) attribute.Set {
attrs := []attribute.KeyValue{
semconv.OTelComponentName(GetComponentName(ID)),
semconv.OTelComponentTypeKey.String(string(otelconv.ComponentTypeOtlpGRPCLogExporter)),
}
attrs = append(attrs, ServerAddrAttrs(TARGET)...)
if err != nil {
attrs = append(attrs, semconv.ErrorType(err))
}
return attribute.NewSet(attrs...)
}
func logInflightMetrics() metricdata.Metrics {
m := otelconv.SDKExporterLogInflight{}
return metricdata.Metrics{
Name: m.Name(),
Description: m.Description(),
Unit: m.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: set(nil), Value: 0},
},
},
}
}
func logExportedMetrics(success, total int64, err error) metricdata.Metrics {
dp := []metricdata.DataPoint[int64]{
{Attributes: set(nil), Value: success},
}
if err != nil {
dp = append(dp, metricdata.DataPoint[int64]{
Attributes: set(err),
Value: total - success,
})
}
m := otelconv.SDKExporterLogExported{}
return metricdata.Metrics{
Name: m.Name(),
Description: m.Description(),
Unit: m.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: dp,
},
}
}
func logOperationDurationMetrics(err error, code codes.Code) metricdata.Metrics {
attrs := []attribute.KeyValue{
semconv.OTelComponentName(GetComponentName(ID)),
semconv.OTelComponentTypeKey.String(string(otelconv.ComponentTypeOtlpGRPCLogExporter)),
semconv.RPCGRPCStatusCodeKey.Int64(int64(code)),
}
attrs = append(attrs, ServerAddrAttrs(TARGET)...)
if err != nil {
attrs = append(attrs, semconv.ErrorType(err))
}
m := otelconv.SDKExporterOperationDuration{}
return metricdata.Metrics{
Name: m.Name(),
Description: m.Description(),
Unit: m.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{Attributes: attribute.NewSet(attrs...)},
},
},
}
}
func setup(t *testing.T) (*Instrumentation, func() metricdata.ScopeMetrics) {
t.Helper()
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
original := otel.GetMeterProvider()
t.Cleanup(func() {
otel.SetMeterProvider(original)
})
r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)
inst, err := NewInstrumentation(ID, TARGET)
require.NoError(t, err)
require.NotNil(t, inst)
return inst, func() metricdata.ScopeMetrics {
var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(t.Context(), &rm))
require.Len(t, rm.ScopeMetrics, 1)
return rm.ScopeMetrics[0]
}
}
var Scope = instrumentation.Scope{
Name: ScopeName,
Version: Version,
SchemaURL: semconv.SchemaURL,
}
func assertMetrics(
t *testing.T,
got metricdata.ScopeMetrics,
spans int64,
success int64,
err error,
code codes.Code,
) {
t.Helper()
assert.Equal(t, Scope, got.Scope, "unexpected scope")
m := got.Metrics
require.Len(t, m, 3, "expected 3 metrics")
o := metricdatatest.IgnoreTimestamp()
want := logInflightMetrics()
metricdatatest.AssertEqual(t, want, m[0], o)
want = logExportedMetrics(success, spans, err)
metricdatatest.AssertEqual(t, want, m[1], o)
want = logOperationDurationMetrics(err, code)
metricdatatest.AssertEqual(t, want, m[2], metricdatatest.IgnoreValue(), o)
}
func TestInstrumentationExportLogs(t *testing.T) {
inst, collect := setup(t)
const n = 10
inst.ExportLogs(t.Context(), n).End(nil)
assertMetrics(t, collect(), n, n, nil, codes.OK)
}
func TestInstrumentationExportLogPartialErrors(t *testing.T) {
inst, collect := setup(t)
const n = 10
const success = 5
err := internal.PartialSuccess{RejectedItems: success}
inst.ExportLogs(t.Context(), n).End(err)
assertMetrics(t, collect(), n, success, err, status.Code(err))
}
func TestInstrumentationExportLogAllErrors(t *testing.T) {
inst, collect := setup(t)
const n = 10
const success = 0
inst.ExportLogs(t.Context(), n).End(assert.AnError)
assertMetrics(t, collect(), n, success, assert.AnError, status.Code(assert.AnError))
}
func TestInstrumentationExportLogsInvalidPartialErrored(t *testing.T) {
inst, collect := setup(t)
const n = 10
err := internal.PartialSuccess{RejectedItems: -5}
inst.ExportLogs(t.Context(), n).End(err)
success := int64(n)
assertMetrics(t, collect(), n, success, err, status.Code(err))
err.RejectedItems = n + 5
inst.ExportLogs(t.Context(), n).End(err)
success += 0
assertMetrics(t, collect(), n+n, success, err, status.Code(err))
}
func BenchmarkInstrumentationExportLogs(b *testing.B) {
setup := func(tb *testing.B) *Instrumentation {
tb.Helper()
tb.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
inst, err := NewInstrumentation(ID, TARGET)
if err != nil {
tb.Fatalf("failed to create instrumentation: %v", err)
}
return inst
}
run := func(err error) func(*testing.B) {
return func(b *testing.B) {
inst := setup(b)
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
inst.ExportLogs(b.Context(), 10).End(err)
}
}
}
b.Run("NoError", run(nil))
b.Run("PartialError", run(&internal.PartialSuccess{RejectedItems: 6}))
b.Run("FullError", run(assert.AnError))
}
func BenchmarkSetPresetAttrs(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := range b.N {
getPresetAttrs(int64(i), "dns:///192.168.1.1:8080")
}
}

View File

@@ -0,0 +1,143 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/observ/target.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/observ"
import (
"errors"
"fmt"
"net"
"net/netip"
"strconv"
"strings"
)
const (
schemeUnix = "unix"
schemeUnixAbstract = "unix-abstract"
)
// ParseCanonicalTarget parses a target string and returns the extracted host
// (domain address or IP), the target port, or an error.
//
// If no port is specified, -1 is returned.
//
// If no host is specified, an empty string is returned.
//
// The target string is expected to always have the form
// "<scheme>://[authority]/<endpoint>". For example:
// - "dns:///example.com:42"
// - "dns://8.8.8.8/example.com:42"
// - "unix:///path/to/socket"
// - "unix-abstract:///socket-name"
// - "passthrough:///192.34.2.1:42"
//
// The target is expected to come from the CanonicalTarget method of a gRPC
// Client.
func ParseCanonicalTarget(target string) (string, int, error) {
const sep = "://"
// Find scheme. Do not allocate the string by using url.Parse.
idx := strings.Index(target, sep)
if idx == -1 {
return "", -1, fmt.Errorf("invalid target %q: missing scheme", target)
}
scheme, endpoint := target[:idx], target[idx+len(sep):]
// Check for unix schemes.
if scheme == schemeUnix || scheme == schemeUnixAbstract {
return parseUnix(endpoint)
}
// Strip leading slash and any authority.
if i := strings.Index(endpoint, "/"); i != -1 {
endpoint = endpoint[i+1:]
}
// DNS, passthrough, and custom resolvers.
return parseEndpoint(endpoint)
}
// parseUnix parses unix socket targets.
func parseUnix(endpoint string) (string, int, error) {
// Format: unix[-abstract]://path
//
// We should have "/path" (empty authority) if valid.
if len(endpoint) >= 1 && endpoint[0] == '/' {
// Return the full path including leading slash.
return endpoint, -1, nil
}
// If there's no leading slash, it means there might be an authority
// Check for authority case (should error): "authority/path"
if slashIdx := strings.Index(endpoint, "/"); slashIdx > 0 {
return "", -1, fmt.Errorf("invalid (non-empty) authority: %s", endpoint[:slashIdx])
}
return "", -1, errors.New("invalid unix target format")
}
// parseEndpoint parses an endpoint from a gRPC target.
//
// It supports the following formats:
// - "host"
// - "host%zone"
// - "host:port"
// - "host%zone:port"
// - "ipv4"
// - "ipv4%zone"
// - "ipv4:port"
// - "ipv4%zone:port"
// - "ipv6"
// - "ipv6%zone"
// - "[ipv6]"
// - "[ipv6%zone]"
// - "[ipv6]:port"
// - "[ipv6%zone]:port"
//
// It returns the host or host%zone (domain address or IP), the port (or -1 if
// not specified), or an error if the input is not a valid.
func parseEndpoint(endpoint string) (string, int, error) {
// First check if the endpoint is just an IP address.
if ip := parseIP(endpoint); ip != "" {
return ip, -1, nil
}
// If there's no colon, there is no port (IPv6 with no port checked above).
if !strings.Contains(endpoint, ":") {
return endpoint, -1, nil
}
host, portStr, err := net.SplitHostPort(endpoint)
if err != nil {
return "", -1, fmt.Errorf("invalid host:port %q: %w", endpoint, err)
}
const base, bitSize = 10, 16
port16, err := strconv.ParseUint(portStr, base, bitSize)
if err != nil {
return "", -1, fmt.Errorf("invalid port %q: %w", portStr, err)
}
port := int(port16) // port is guaranteed to be in the range [0, 65535].
return host, port, nil
}
// parseIP attempts to parse the entire endpoint as an IP address.
// It returns the normalized string form of the IP if successful,
// or an empty string if parsing fails.
func parseIP(ip string) string {
// Strip leading and trailing brackets for IPv6 addresses.
if len(ip) >= 2 && ip[0] == '[' && ip[len(ip)-1] == ']' {
ip = ip[1 : len(ip)-1]
}
addr, err := netip.ParseAddr(ip)
if err != nil {
return ""
}
// Return the normalized string form of the IP.
return addr.String()
}

View File

@@ -0,0 +1,162 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/observ/target_test.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ
import "testing"
func TestParseTarget(t *testing.T) {
// gRPC target naming is defined here:
// https://github.com/grpc/grpc/blob/74232c6bd3c0f4bc35bad035dbeecf5cbc834a11/doc/naming.md
//
// The Go gRPC client only supports the "dns", "unix", "unix-abstract", and
// "passthrough" schemes natively with "dns" being the default:
// https://pkg.go.dev/google.golang.org/grpc@v1.75.1/internal/resolver
//
// Other schemes (e.g., "consul", "zk") are supported via custom resolvers
// that can be registered with the gRPC resolver package. These custom
// resolvers are still expected to follow the general target string format
// when rendered with the CanonicalTarget method:
//
// <scheme>://<authority>/<endpoint>
//
// All target strings in these tests are rendered with the
// CanonicalTarget method. Therefore they all follow the above format.
tests := []struct {
target string
host string
port int
}{
// DNS scheme: hostname and port.
{target: "dns:///:8080", host: "", port: 8080},
{target: "dns:///example.com", host: "example.com", port: -1},
{target: "dns:///example.com%eth0", host: "example.com%eth0", port: -1},
{target: "dns:///example.com:42", host: "example.com", port: 42},
{target: "dns:///example.com%eth0:42", host: "example.com%eth0", port: 42},
// DNS scheme: hostname and port with authority.
{target: "dns://8.8.8.8/example.com", host: "example.com", port: -1},
{target: "dns://8.8.8.8/example.com%eth0", host: "example.com%eth0", port: -1},
{target: "dns://8.8.8.8/example.com:42", host: "example.com", port: 42},
{target: "dns://8.8.8.8/example.com%eth0:42", host: "example.com%eth0", port: 42},
// DNS scheme: IPv4 address and port.
{target: "dns:///192.168.1.1", host: "192.168.1.1", port: -1},
{target: "dns:///192.168.1.1%eth0", host: "192.168.1.1%eth0", port: -1},
{target: "dns:///192.168.1.1:8080", host: "192.168.1.1", port: 8080},
{target: "dns:///192.168.1.1%eth0:8080", host: "192.168.1.1%eth0", port: 8080},
// DNS scheme: IPv6 address and port.
{target: "dns:///2001:0db8:85a3:0000:0000:8a2e:0370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{target: "dns:///2001:db8:85a3:0:0:8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{target: "dns:///2001:db8:85a3::8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{target: "dns:///2001:db8:85a3::8a2e:370:7334%eth0", host: "2001:db8:85a3::8a2e:370:7334%eth0", port: -1},
{target: "dns:///[2001:db8:85a3::8a2e:370:7334]", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{target: "dns:///[2001:db8:85a3::8a2e:370:7334%eth0]", host: "2001:db8:85a3::8a2e:370:7334%eth0", port: -1},
{target: "dns:///[::1]:9090", host: "::1", port: 9090},
{target: "dns:///[::1%eth0]:9090", host: "::1%eth0", port: 9090},
// Unix domain sockets.
{target: "unix:///tmp/grpc.sock", host: "/tmp/grpc.sock", port: -1},
{target: "unix:///absolute_path", host: "/absolute_path", port: -1},
// Unix domain socket in abstract namespace.
{target: "unix-abstract:///abstract-socket-name", host: "/abstract-socket-name", port: -1},
// International domain names.
{target: "dns:///测试.example.com:8080", host: "测试.example.com", port: 8080},
// Port edge cases.
{target: "dns:///example.com:0", host: "example.com", port: 0},
{target: "dns:///example.com:65535", host: "example.com", port: 65535},
// Case sensitivity.
{target: "dns:///EXAMPLE.COM:8080", host: "EXAMPLE.COM", port: 8080},
{target: "dns:///Example.Com:8080", host: "Example.Com", port: 8080},
// Custom and passthrough resolvers scheme
{target: "passthrough:///localhost:50051", host: "localhost", port: 50051},
{target: "passthrough:///10.0.0.2:7777", host: "10.0.0.2", port: 7777},
{target: "consul:///my-service", host: "my-service", port: -1},
{target: "zk:///services/my-service", host: "services/my-service", port: -1},
}
for _, tt := range tests {
host, port, err := ParseCanonicalTarget(tt.target)
if err != nil {
t.Errorf("parseTarget(%q) unexpected error: %v", tt.target, err)
continue
}
if host != tt.host {
t.Errorf("parseTarget(%q) host = %q, want %q", tt.target, host, tt.host)
}
if port != tt.port {
t.Errorf("parseTarget(%q) port = %d, want %d", tt.target, port, tt.port)
}
}
}
func TestParseTargetErrors(t *testing.T) {
targets := []string{
"dns:///example.com:invalid", // Non-numeric port in URL.
"dns:///example.com:8080:9090", // Multiple colons in port.
"dns:///example.com:99999", // Port out of range.
"dns:///example.com:-1", // Port out of range.
"unix://localhost/sock", // Non-empty authority for unix scheme.
"unix:", // Empty unix scheme.
"unix-abstract://", // Empty unix-abstract scheme.
"unix-abstract://authority/sock", // Non-empty authority for unix-abstract scheme.
"contains-cont\roll-cha\rs", // Invalid URL.
}
for _, target := range targets {
host, port, err := ParseCanonicalTarget(target)
if err == nil {
t.Errorf("parseTarget(%q) expected error, got nil", target)
}
if host != "" {
t.Errorf("parseTarget(%q) host = %q, want empty", target, host)
}
if port != -1 {
t.Errorf("parseTarget(%q) port = %d, want -1", target, port)
}
}
}
func BenchmarkParseTarget(b *testing.B) {
benchmarks := []struct {
name string
target string
}{
{"HostName", "dns:///example.com"},
{"HostPort", "dns:///example.com:8080"},
{"IPv4WithoutPort", "dns:///192.168.1.1"},
{"IPv4WithPort", "dns:///192.168.1.1:8080"},
{"IPv6Bare", "dns:///2001:db8::1"},
{"IPv6Bracket", "dns:///[2001:db8::1]"},
{"IPv6WithPort", "dns:///[2001:db8::1]:8080"},
{"UnixSocket", "unix:///tmp/grpc.sock"},
{"UnixAbstractSocket", "unix-abstract:///abstract-socket-name"},
{"Passthrough", "passthrough:///localhost:50051"},
}
var (
host string
port int
err error
)
for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
b.ReportAllocs()
for b.Loop() {
host, port, err = ParseCanonicalTarget(bm.target)
}
})
}
_, _, _ = host, port, err
}

View File

@@ -0,0 +1,43 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
import "fmt"
// PartialSuccess represents the underlying error for all handling
// OTLP partial success messages. Use `errors.Is(err,
// PartialSuccess{})` to test whether an error passed to the OTel
// error handler belongs to this category.
type PartialSuccess struct {
ErrorMessage string
RejectedItems int64
RejectedKind string
}
var _ error = PartialSuccess{}
// Error implements the error interface.
func (ps PartialSuccess) Error() string {
msg := ps.ErrorMessage
if msg == "" {
msg = "empty message"
}
return fmt.Sprintf("OTLP partial success: %s (%d %s rejected)", msg, ps.RejectedItems, ps.RejectedKind)
}
// Is supports the errors.Is() interface.
func (PartialSuccess) Is(err error) bool {
_, ok := err.(PartialSuccess)
return ok
}
// LogPartialSuccessError returns an error describing a partial success
// response for the log signal.
func LogPartialSuccessError(itemsRejected int64, errorMessage string) error {
return PartialSuccess{
ErrorMessage: errorMessage,
RejectedItems: itemsRejected,
RejectedKind: "logs",
}
}

View File

@@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal
import (
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func requireErrorString(t *testing.T, expect string, err error) {
t.Helper()
require.Error(t, err)
require.ErrorIs(t, err, PartialSuccess{})
const pfx = "OTLP partial success: "
msg := err.Error()
require.True(t, strings.HasPrefix(msg, pfx))
require.Equal(t, expect, msg[len(pfx):])
}
func TestPartialSuccessFormat(t *testing.T) {
requireErrorString(t, "empty message (0 logs rejected)", LogPartialSuccessError(0, ""))
requireErrorString(t, "help help (0 logs rejected)", LogPartialSuccessError(0, "help help"))
requireErrorString(
t,
"what happened (10 logs rejected)",
LogPartialSuccessError(10, "what happened"),
)
requireErrorString(t, "what happened (15 logs rejected)", LogPartialSuccessError(15, "what happened"))
}

View File

@@ -0,0 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal"
// Version is the current release version of the OpenTelemetry otlploggrpc
// exporter in use.
const Version = "0.14.0"

View File

@@ -0,0 +1,36 @@
# Experimental Features
The `otlploggrpc` exporter contains features that have not yet stabilized in the OpenTelemetry specification.
These features are added to the `otlploggrpc` exporter prior to stabilization in the specification so that users can start experimenting with them and provide feedback.
These features may change in backwards incompatible ways as feedback is applied.
See the [Compatibility and Stability](#compatibility-and-stability) section for more information.
## Features
- [Observability](#observability)
### Observability
The `otlploggrpc` exporter can be configured to provide observability about itself using OpenTelemetry metrics.
To opt-in, set the environment variable `OTEL_GO_X_OBSERVABILITY` to `true`.
When enabled, the exporter will create the following metrics using the global `MeterProvider`:
- `otel.sdk.exporter.log.inflight`
- `otel.sdk.exporter.log.exported`
- `otel.sdk.exporter.operation.duration`
Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics.
[Semantic conventions for OpenTelemetry SDK metrics]: https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/otel/sdk-metrics.md
## Compatibility and Stability
Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../../../VERSIONING.md).
These features may be removed or modified in successive version releases, including patch versions.
When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.
There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version.
If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support.

View File

@@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package x documents experimental features for [go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc].
package x // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/x"
import "strings"
// Observability is an experimental feature flag that determines if exporter
// observability metrics are enabled.
//
// To enable this feature set the OTEL_GO_X_OBSERVABILITY environment variable
// to the case-insensitive string value of "true" (i.e. "True" and "TRUE"
// will also enable this).
var Observability = newFeature(
[]string{"OBSERVABILITY"},
func(v string) (string, bool) {
if strings.EqualFold(v, "true") {
return v, true
}
return "", false
},
)

View File

@@ -0,0 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package x
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestObservability(t *testing.T) {
const key = "OTEL_GO_X_OBSERVABILITY"
require.Contains(t, Observability.Keys(), key)
t.Run("100", run(setenv(key, "100"), assertDisabled(Observability)))
t.Run("true", run(setenv(key, "true"), assertEnabled(Observability, "true")))
t.Run("True", run(setenv(key, "True"), assertEnabled(Observability, "True")))
t.Run("false", run(setenv(key, "false"), assertDisabled(Observability)))
t.Run("empty", run(assertDisabled(Observability)))
}

View File

@@ -0,0 +1,58 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/x/x.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package x documents experimental features for [go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc].
package x // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/x"
import (
"os"
)
// Feature is an experimental feature control flag. It provides a uniform way
// to interact with these feature flags and parse their values.
type Feature[T any] struct {
keys []string
parse func(v string) (T, bool)
}
func newFeature[T any](suffix []string, parse func(string) (T, bool)) Feature[T] {
const envKeyRoot = "OTEL_GO_X_"
keys := make([]string, 0, len(suffix))
for _, s := range suffix {
keys = append(keys, envKeyRoot+s)
}
return Feature[T]{
keys: keys,
parse: parse,
}
}
// Keys returns the environment variable keys that can be set to enable the
// feature.
func (f Feature[T]) Keys() []string { return f.keys }
// Lookup returns the user configured value for the feature and true if the
// user has enabled the feature. Otherwise, if the feature is not enabled, a
// zero-value and false are returned.
func (f Feature[T]) Lookup() (v T, ok bool) {
// https://github.com/open-telemetry/opentelemetry-specification/blob/62effed618589a0bec416a87e559c0a9d96289bb/specification/configuration/sdk-environment-variables.md#parsing-empty-value
//
// > The SDK MUST interpret an empty value of an environment variable the
// > same way as when the variable is unset.
for _, key := range f.keys {
vRaw := os.Getenv(key)
if vRaw != "" {
return f.parse(vRaw)
}
}
return v, ok
}
// Enabled reports whether the feature is enabled.
func (f Feature[T]) Enabled() bool {
_, ok := f.Lookup()
return ok
}

View File

@@ -0,0 +1,75 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/x/x_text.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package x
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
mockKey = "OTEL_GO_X_MOCK_FEATURE"
mockKey2 = "OTEL_GO_X_MOCK_FEATURE2"
)
var mockFeature = newFeature([]string{"MOCK_FEATURE", "MOCK_FEATURE2"}, func(v string) (string, bool) {
if strings.EqualFold(v, "true") {
return v, true
}
return "", false
})
func TestFeature(t *testing.T) {
require.Contains(t, mockFeature.Keys(), mockKey)
require.Contains(t, mockFeature.Keys(), mockKey2)
t.Run("100", run(setenv(mockKey, "100"), assertDisabled(mockFeature)))
t.Run("true", run(setenv(mockKey, "true"), assertEnabled(mockFeature, "true")))
t.Run("True", run(setenv(mockKey, "True"), assertEnabled(mockFeature, "True")))
t.Run("false", run(setenv(mockKey, "false"), assertDisabled(mockFeature)))
t.Run("empty", run(assertDisabled(mockFeature)))
}
func run(steps ...func(*testing.T)) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
for _, step := range steps {
step(t)
}
}
}
func setenv(k, v string) func(t *testing.T) { //nolint:unparam // This is a reusable test utility function.
return func(t *testing.T) { t.Setenv(k, v) }
}
func assertEnabled[T any](f Feature[T], want T) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
assert.True(t, f.Enabled(), "not enabled")
v, ok := f.Lookup()
assert.True(t, ok, "Lookup state")
assert.Equal(t, want, v, "Lookup value")
}
}
func assertDisabled[T any](f Feature[T]) func(*testing.T) {
var zero T
return func(t *testing.T) {
t.Helper()
assert.False(t, f.Enabled(), "enabled")
v, ok := f.Lookup()
assert.False(t, ok, "Lookup state")
assert.Equal(t, zero, v, "Lookup value")
}
}

View File

@@ -49,3 +49,7 @@ modules:
go.opentelemetry.io/otel/exporters/prometheus: go.opentelemetry.io/otel/exporters/prometheus:
version-refs: version-refs:
- ./exporters/prometheus/internal/version.go - ./exporters/prometheus/internal/version.go
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc:
version-refs:
- ./exporters/otlp/otlplog/otlploggrpc/internal/version.go