1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-08 23:21:56 +02:00

Instrument the otlptracegrpc exporter (#7459)

Resolve #7007 

### Benchmarks

```console
> benchstat inst-otlptracegrpc.bmark.result
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc
cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
                                      │ inst-otlptracegrpc.bmark.result │
                                      │             sec/op              │
ExporterExportSpans/Observability-8                         144.3µ ± 4%
ExporterExportSpans/NoObservability-8                       147.3µ ± 4%
geomean                                                     145.8µ

                                      │ inst-otlptracegrpc.bmark.result │
                                      │              B/op               │
ExporterExportSpans/Observability-8                        23.07Ki ± 0%
ExporterExportSpans/NoObservability-8                      22.34Ki ± 0%
geomean                                                    22.70Ki

                                      │ inst-otlptracegrpc.bmark.result │
                                      │            allocs/op            │
ExporterExportSpans/Observability-8                          335.0 ± 0%
ExporterExportSpans/NoObservability-8                        331.0 ± 0%
geomean                                                      333.0
```
This commit is contained in:
Tyler Yahn
2025-10-07 14:16:39 -07:00
committed by GitHub
parent ce38247ae9
commit c692bc4b87
7 changed files with 307 additions and 16 deletions

View File

@@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add experimental observability for the prometheus exporter in `go.opentelemetry.io/otel/exporters/prometheus`.
Check the `go.opentelemetry.io/otel/exporters/prometheus/internal/x` package documentation for more information. (#7345)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#7353)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#7459)
### Fixed

View File

@@ -19,6 +19,8 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/counter"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/observ"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
)
@@ -44,6 +46,9 @@ type client struct {
conn *grpc.ClientConn
tscMu sync.RWMutex
tsc coltracepb.TraceServiceClient
instID int64
inst *observ.Instrumentation
}
// Compile time check *client implements otlptrace.Client.
@@ -67,6 +72,7 @@ func newClient(opts ...Option) *client {
stopCtx: ctx,
stopFunc: cancel,
conn: cfg.GRPCConn,
instID: counter.NextExporterID(),
}
if len(cfg.Traces.Headers) > 0 {
@@ -91,13 +97,24 @@ func (c *client) Start(context.Context) error {
c.conn = conn
}
// Initialize the instrumentation if not already done.
//
// Initialize here instead of NewClient to allow any errors to be passed
// back to the caller and so that any setup of the environment variables to
// enable instrumentation can be set via code.
var err error
if c.inst == nil {
target := c.conn.CanonicalTarget()
c.inst, err = observ.NewInstrumentation(c.instID, target)
}
// The otlptrace.Client interface states this method is called just once,
// so no need to check if already started.
c.tscMu.Lock()
c.tsc = coltracepb.NewTraceServiceClient(c.conn)
c.tscMu.Unlock()
return nil
return err
}
var errAlreadyStopped = errors.New("the client is already stopped")
@@ -188,6 +205,12 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
ctx, cancel := c.exportContext(ctx)
defer cancel()
var code codes.Code
if c.inst != nil {
op := c.inst.ExportSpans(ctx, len(protoSpans))
defer func() { op.End(uploadErr, code) }()
}
return c.requestFunc(ctx, func(iCtx context.Context) error {
resp, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
@@ -201,7 +224,8 @@ func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
}
}
// nil is converted to OK.
if status.Code(err) == codes.OK {
code = status.Code(err)
if code == codes.OK {
// Success.
return uploadErr
}

View File

@@ -20,17 +20,26 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/counter"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/observ"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
func TestMain(m *testing.M) {
@@ -115,7 +124,7 @@ func TestWithEndpointURL(t *testing.T) {
}
func newGRPCExporter(
t *testing.T,
tb testing.TB,
ctx context.Context,
endpoint string,
additionalOpts ...otlptracegrpc.Option,
@@ -130,7 +139,7 @@ func newGRPCExporter(
client := otlptracegrpc.NewClient(opts...)
exp, err := otlptrace.New(ctx, client)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
tb.Fatalf("failed to create a new collector exporter: %v", err)
}
return exp
}
@@ -430,3 +439,161 @@ func TestCustomUserAgent(t *testing.T) {
headers := mc.getHeaders()
require.Contains(t, headers.Get("user-agent")[0], customUserAgent)
}
func TestClientInstrumentation(t *testing.T) {
// Enable instrumentation for this test.
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
// Reset client ID to be deterministic
const id = 0
counter.SetExporterID(id)
// Save original meter provider and restore at end of test.
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })
// Create a new meter provider to capture metrics.
reader := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(reader))
otel.SetMeterProvider(mp)
const n, msg = 2, "partially successful"
mc := runMockCollectorWithConfig(t, &mockConfig{
endpoint: "localhost:0", // Determine canonical endpoint.
partial: &coltracepb.ExportTracePartialSuccess{
RejectedSpans: n,
ErrorMessage: msg,
},
})
t.Cleanup(func() { require.NoError(t, mc.stop()) })
exp := newGRPCExporter(t, t.Context(), mc.endpoint)
err := exp.ExportSpans(t.Context(), roSpans)
assert.ErrorIs(t, err, internal.TracePartialSuccessError(n, msg))
require.NoError(t, exp.Shutdown(t.Context()))
var got metricdata.ResourceMetrics
require.NoError(t, reader.Collect(t.Context(), &got))
attrs := observ.BaseAttrs(id, canonical(t, mc.endpoint))
want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: observ.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterSpanInflight{}.Name(),
Description: otelconv.SDKExporterSpanInflight{}.Description(),
Unit: otelconv.SDKExporterSpanInflight{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: attribute.NewSet(attrs...)},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: otelconv.SDKExporterSpanExported{}.Name(),
Description: otelconv.SDKExporterSpanExported{}.Description(),
Unit: otelconv.SDKExporterSpanExported{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: attribute.NewSet(attrs...)},
{Attributes: attribute.NewSet(append(
attrs,
otelconv.SDKExporterSpanExported{}.AttrErrorType("*errors.joinError"),
)...)},
},
Temporality: 0x1,
IsMonotonic: true,
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{Attributes: attribute.NewSet(append(
attrs,
otelconv.SDKExporterOperationDuration{}.AttrErrorType("*errors.joinError"),
otelconv.SDKExporterOperationDuration{}.AttrRPCGRPCStatusCode(
otelconv.RPCGRPCStatusCodeOk,
),
)...)},
},
Temporality: 0x1,
},
},
},
}
require.Len(t, got.ScopeMetrics, 1)
opt := []metricdatatest.Option{
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
metricdatatest.IgnoreValue(),
}
metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], opt...)
}
func canonical(t *testing.T, endpoint string) string {
t.Helper()
opt := grpc.WithTransportCredentials(insecure.NewCredentials())
c, err := grpc.NewClient(endpoint, opt) // Used to normaliz endpoint.
if err != nil {
t.Fatalf("failed to create grpc client: %v", err)
}
out := c.CanonicalTarget()
_ = c.Close()
return out
}
func BenchmarkExporterExportSpans(b *testing.B) {
const n = 10
run := func(b *testing.B) {
mc := runMockCollectorWithConfig(b, &mockConfig{
endpoint: "localhost:0",
partial: &coltracepb.ExportTracePartialSuccess{
RejectedSpans: 5,
ErrorMessage: "partially successful",
},
})
b.Cleanup(func() { require.NoError(b, mc.stop()) })
exp := newGRPCExporter(b, b.Context(), mc.endpoint)
b.Cleanup(func() {
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
assert.NoError(b, exp.Shutdown(context.Background()))
})
stubs := make([]tracetest.SpanStub, n)
for i := range stubs {
stubs[i].Name = fmt.Sprintf("Span %d", i)
}
spans := tracetest.SpanStubs(stubs).Snapshots()
b.ReportAllocs()
b.ResetTimer()
var err error
for b.Loop() {
err = exp.ExportSpans(b.Context(), spans)
}
_ = err
}
b.Run("Observability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
run(b)
})
b.Run("NoObservability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "false")
run(b)
})
}

View File

@@ -0,0 +1,31 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/counter/counter.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package counter provides a simple counter for generating unique IDs.
//
// This package is used to generate unique IDs while allowing testing packages
// to reset the counter.
package counter // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/counter"
import "sync/atomic"
// exporterN is a global 0-based count of the number of exporters created.
var exporterN atomic.Int64
// NextExporterID returns the next unique ID for an exporter.
func NextExporterID() int64 {
const inc = 1
return exporterN.Add(inc) - inc
}
// SetExporterID sets the exporter ID counter to v and returns the previous
// value.
//
// This function is useful for testing purposes, allowing you to reset the
// counter. It should not be used in production code.
func SetExporterID(v int64) int64 {
return exporterN.Swap(v)
}

View File

@@ -0,0 +1,65 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/counter/counter_test.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package counter
import (
"sync"
"testing"
)
func TestNextExporterID(t *testing.T) {
SetExporterID(0)
var expected int64
for range 10 {
id := NextExporterID()
if id != expected {
t.Errorf("NextExporterID() = %d; want %d", id, expected)
}
expected++
}
}
func TestSetExporterID(t *testing.T) {
SetExporterID(0)
prev := SetExporterID(42)
if prev != 0 {
t.Errorf("SetExporterID(42) returned %d; want 0", prev)
}
id := NextExporterID()
if id != 42 {
t.Errorf("NextExporterID() = %d; want 42", id)
}
}
func TestNextExporterIDConcurrentSafe(t *testing.T) {
SetExporterID(0)
const goroutines = 100
const increments = 10
var wg sync.WaitGroup
wg.Add(goroutines)
for range goroutines {
go func() {
defer wg.Done()
for range increments {
NextExporterID()
}
}()
}
wg.Wait()
expected := int64(goroutines * increments)
if id := NextExporterID(); id != expected {
t.Errorf("NextExporterID() = %d; want %d", id, expected)
}
}

View File

@@ -29,3 +29,6 @@ package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/ot
//go:generate gotmpl --body=../../../../../internal/shared/otlp/observ/target.go.tmpl "--data={ \"pkg\": \"observ\", \"pkg_path\": \"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/observ\" }" --out=observ/target.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/observ/target_test.go.tmpl "--data={ \"pkg\": \"observ\" }" --out=observ/target_test.go
//go:generate gotmpl --body=../../../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/counter\" }" --out=counter/counter.go
//go:generate gotmpl --body=../../../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go

View File

@@ -19,7 +19,7 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlptracetest"
)
func makeMockCollector(t *testing.T, mockConfig *mockConfig) *mockCollector {
func makeMockCollector(t testing.TB, mockConfig *mockConfig) *mockCollector {
return &mockCollector{
t: t,
traceSvc: &mockTraceService{
@@ -91,7 +91,7 @@ func (mts *mockTraceService) Export(
}
type mockCollector struct {
t *testing.T
t testing.TB
traceSvc *mockTraceService
@@ -150,23 +150,23 @@ func (mc *mockCollector) getHeaders() metadata.MD {
}
// runMockCollector is a helper function to create a mock Collector.
func runMockCollector(t *testing.T) *mockCollector {
t.Helper()
return runMockCollectorAtEndpoint(t, "localhost:0")
func runMockCollector(tb testing.TB) *mockCollector {
tb.Helper()
return runMockCollectorAtEndpoint(tb, "localhost:0")
}
func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector {
t.Helper()
return runMockCollectorWithConfig(t, &mockConfig{endpoint: endpoint})
func runMockCollectorAtEndpoint(tb testing.TB, endpoint string) *mockCollector {
tb.Helper()
return runMockCollectorWithConfig(tb, &mockConfig{endpoint: endpoint})
}
func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockCollector {
t.Helper()
func runMockCollectorWithConfig(tb testing.TB, mockConfig *mockConfig) *mockCollector {
tb.Helper()
ln, err := net.Listen("tcp", mockConfig.endpoint)
require.NoError(t, err, "net.Listen")
require.NoError(tb, err, "net.Listen")
srv := grpc.NewServer()
mc := makeMockCollector(t, mockConfig)
mc := makeMockCollector(tb, mockConfig)
collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc)
go func() {
_ = srv.Serve(ln)