From f58f79bacb885ef3e1fe0edccf72f95d49186cb3 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Fri, 10 Oct 2025 15:24:13 -0700 Subject: [PATCH] Instrument the `otlptracehttp` exporter (#7486) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Resolve #7006 ### Benchmarks ```console > benchstat bmark.results goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz │ bmark.results │ │ sec/op │ ExporterExportSpans/Observability-8 111.6µ ± 12% ExporterExportSpans/NoObservability-8 108.5µ ± 7% geomean 110.0µ │ bmark.results │ │ B/op │ ExporterExportSpans/Observability-8 20.69Ki ± 0% ExporterExportSpans/NoObservability-8 19.93Ki ± 0% geomean 20.30Ki │ bmark.results │ │ allocs/op │ ExporterExportSpans/Observability-8 251.0 ± 0% ExporterExportSpans/NoObservability-8 247.0 ± 0% geomean 249.0 ``` --- CHANGELOG.md | 1 + .../otlp/otlptrace/otlptracehttp/client.go | 33 +++- .../otlptrace/otlptracehttp/client_test.go | 161 ++++++++++++++++++ .../otlptracehttp/internal/counter/counter.go | 31 ++++ .../internal/counter/counter_test.go | 65 +++++++ .../otlptrace/otlptracehttp/internal/gen.go | 3 + .../otlptracehttp/mock_collector_test.go | 10 +- 7 files changed, 294 insertions(+), 10 deletions(-) create mode 100644 exporters/otlp/otlptrace/otlptracehttp/internal/counter/counter.go create mode 100644 exporters/otlp/otlptrace/otlptracehttp/internal/counter/counter_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index f3547687b..9903d131a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm Check the `go.opentelemetry.io/otel/exporters/prometheus/internal/x` package documentation for more information. (#7345) - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#7353) - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#7459) +- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486) ### Fixed diff --git a/exporters/otlp/otlptrace/otlptracehttp/client.go b/exporters/otlp/otlptrace/otlptracehttp/client.go index 7c3273025..d0688c201 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client.go @@ -24,6 +24,8 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/counter" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/observ" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/retry" ) @@ -62,6 +64,9 @@ type client struct { client *http.Client stopCh chan struct{} stopOnce sync.Once + + instID int64 + inst *observ.Instrumentation } var _ otlptrace.Client = (*client)(nil) @@ -99,18 +104,29 @@ func NewClient(opts ...Option) otlptrace.Client { requestFunc: cfg.RetryConfig.RequestFunc(evaluate), stopCh: stopCh, client: httpClient, + instID: counter.NextExporterID(), } } // Start does nothing in a HTTP client. -func (*client) Start(ctx context.Context) error { +func (c *client) Start(ctx context.Context) error { + // Initialize the instrumentation if not already done. + // + // Initialize here instead of NewClient to allow any errors to be passed + // back to the caller and so that any setup of the environment variables to + // enable instrumentation can be set via code. + var err error + if c.inst == nil { + c.inst, err = observ.NewInstrumentation(c.instID, c.cfg.Endpoint) + } + // nothing to do select { case <-ctx.Done(): - return ctx.Err() + err = errors.Join(err, ctx.Err()) default: } - return nil + return err } // Stop shuts down the client and interrupt any in-flight request. @@ -144,6 +160,12 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc return err } + var statusCode int + if d.inst != nil { + op := d.inst.ExportSpans(ctx, len(protoSpans)) + defer func() { op.End(uploadErr, statusCode) }() + } + return errors.Join(uploadErr, d.requestFunc(ctx, func(ctx context.Context) error { select { case <-ctx.Done(): @@ -169,7 +191,8 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc }() } - if sc := resp.StatusCode; sc >= 200 && sc <= 299 { + statusCode = resp.StatusCode + if statusCode >= 200 && statusCode <= 299 { // Success, do not retry. // Read the partial success message, if any. var respData bytes.Buffer @@ -213,7 +236,7 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc } bodyErr := fmt.Errorf("body: %s", respStr) - switch resp.StatusCode { + switch statusCode { case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, diff --git a/exporters/otlp/otlptrace/otlptracehttp/client_test.go b/exporters/otlp/otlptrace/otlptracehttp/client_test.go index 88907de54..fce39f921 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client_test.go @@ -17,10 +17,19 @@ import ( coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/counter" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/observ" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/otlptracetest" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" ) const ( @@ -469,3 +478,155 @@ func TestCollectorRespondingNonProtobufContent(t *testing.T) { assert.NoError(t, err) assert.Len(t, mc.GetSpans(), 1) } + +func TestClientInstrumentation(t *testing.T) { + // Enable instrumentation for this test. + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + // Reset client ID to be deterministic + const id = 0 + counter.SetExporterID(id) + + // Save original meter provider and restore at end of test. + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + + // Create a new meter provider to capture metrics. + reader := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(mp) + + const n, msg = 2, "partially successful" + mc := runMockCollector(t, mockCollectorConfig{ + InjectHTTPStatus: []int{400}, + Partial: &coltracepb.ExportTracePartialSuccess{ + RejectedSpans: n, + ErrorMessage: msg, + }, + }) + t.Cleanup(func() { require.NoError(t, mc.Stop()) }) + + driver := otlptracehttp.NewClient( + otlptracehttp.WithEndpoint(mc.Endpoint()), + otlptracehttp.WithInsecure(), + ) + exporter, err := otlptrace.New(t.Context(), driver) + require.NoError(t, err) + + err = exporter.ExportSpans(t.Context(), otlptracetest.SingleReadOnlySpan()) + assert.Error(t, err) + + require.NoError(t, exporter.Shutdown(t.Context())) + var got metricdata.ResourceMetrics + require.NoError(t, reader.Collect(t.Context(), &got)) + + attrs := observ.BaseAttrs(id, mc.endpoint) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: observ.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterSpanInflight{}.Name(), + Description: otelconv.SDKExporterSpanInflight{}.Description(), + Unit: otelconv.SDKExporterSpanInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attribute.NewSet(attrs...)}, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: otelconv.SDKExporterSpanExported{}.Name(), + Description: otelconv.SDKExporterSpanExported{}.Description(), + Unit: otelconv.SDKExporterSpanExported{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attribute.NewSet(attrs...)}, + {Attributes: attribute.NewSet(append( + attrs, + otelconv.SDKExporterSpanExported{}.AttrErrorType("*errors.joinError"), + )...)}, + }, + Temporality: 0x1, + IsMonotonic: true, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: attribute.NewSet(append( + attrs, + otelconv.SDKExporterOperationDuration{}.AttrErrorType("*errors.joinError"), + otelconv.SDKExporterOperationDuration{}.AttrHTTPResponseStatusCode(400), + )...)}, + }, + Temporality: 0x1, + }, + }, + }, + } + require.Len(t, got.ScopeMetrics, 1) + opt := []metricdatatest.Option{ + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreExemplars(), + metricdatatest.IgnoreValue(), + } + metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], opt...) +} + +func BenchmarkExporterExportSpans(b *testing.B) { + const n = 10 + + run := func(b *testing.B) { + mc := runMockCollector(b, mockCollectorConfig{ + Partial: &coltracepb.ExportTracePartialSuccess{ + RejectedSpans: 5, + ErrorMessage: "partially successful", + }, + }) + b.Cleanup(func() { require.NoError(b, mc.Stop()) }) + + c := otlptracehttp.NewClient( + otlptracehttp.WithEndpoint(mc.Endpoint()), + otlptracehttp.WithInsecure(), + ) + exp, err := otlptrace.New(b.Context(), c) + require.NoError(b, err) + b.Cleanup(func() { + //nolint:usetesting // required to avoid getting a canceled context at cleanup. + assert.NoError(b, exp.Shutdown(context.Background())) + }) + + stubs := make([]tracetest.SpanStub, n) + for i := range stubs { + stubs[i].Name = fmt.Sprintf("Span %d", i) + } + spans := tracetest.SpanStubs(stubs).Snapshots() + + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + err = exp.ExportSpans(b.Context(), spans) + } + _ = err + } + + b.Run("Observability", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + run(b) + }) + + b.Run("NoObservability", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "false") + run(b) + }) +} diff --git a/exporters/otlp/otlptrace/otlptracehttp/internal/counter/counter.go b/exporters/otlp/otlptrace/otlptracehttp/internal/counter/counter.go new file mode 100644 index 000000000..75e5c0c7e --- /dev/null +++ b/exporters/otlp/otlptrace/otlptracehttp/internal/counter/counter.go @@ -0,0 +1,31 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package counter provides a simple counter for generating unique IDs. +// +// This package is used to generate unique IDs while allowing testing packages +// to reset the counter. +package counter // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/counter" + +import "sync/atomic" + +// exporterN is a global 0-based count of the number of exporters created. +var exporterN atomic.Int64 + +// NextExporterID returns the next unique ID for an exporter. +func NextExporterID() int64 { + const inc = 1 + return exporterN.Add(inc) - inc +} + +// SetExporterID sets the exporter ID counter to v and returns the previous +// value. +// +// This function is useful for testing purposes, allowing you to reset the +// counter. It should not be used in production code. +func SetExporterID(v int64) int64 { + return exporterN.Swap(v) +} diff --git a/exporters/otlp/otlptrace/otlptracehttp/internal/counter/counter_test.go b/exporters/otlp/otlptrace/otlptracehttp/internal/counter/counter_test.go new file mode 100644 index 000000000..f3e380d33 --- /dev/null +++ b/exporters/otlp/otlptrace/otlptracehttp/internal/counter/counter_test.go @@ -0,0 +1,65 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package counter + +import ( + "sync" + "testing" +) + +func TestNextExporterID(t *testing.T) { + SetExporterID(0) + + var expected int64 + for range 10 { + id := NextExporterID() + if id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } + expected++ + } +} + +func TestSetExporterID(t *testing.T) { + SetExporterID(0) + + prev := SetExporterID(42) + if prev != 0 { + t.Errorf("SetExporterID(42) returned %d; want 0", prev) + } + + id := NextExporterID() + if id != 42 { + t.Errorf("NextExporterID() = %d; want 42", id) + } +} + +func TestNextExporterIDConcurrentSafe(t *testing.T) { + SetExporterID(0) + + const goroutines = 100 + const increments = 10 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + for range increments { + NextExporterID() + } + }() + } + + wg.Wait() + + expected := int64(goroutines * increments) + if id := NextExporterID(); id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } +} \ No newline at end of file diff --git a/exporters/otlp/otlptrace/otlptracehttp/internal/gen.go b/exporters/otlp/otlptrace/otlptracehttp/internal/gen.go index a96964636..ca387432f 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/internal/gen.go +++ b/exporters/otlp/otlptrace/otlptracehttp/internal/gen.go @@ -25,4 +25,7 @@ package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/ot //go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlptracetest/otlptest.go.tmpl "--data={}" --out=otlptracetest/otlptest.go //go:generate gotmpl --body=../../../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp\" }" --out=x/x.go + +//go:generate gotmpl --body=../../../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/counter\" }" --out=counter/counter.go +//go:generate gotmpl --body=../../../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go //go:generate gotmpl --body=../../../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go diff --git a/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go b/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go index 8d131a295..af7f6ef6b 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/mock_collector_test.go @@ -213,12 +213,12 @@ func (c *mockCollectorConfig) fillInDefaults() { } } -func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector { +func runMockCollector(tb testing.TB, cfg mockCollectorConfig) *mockCollector { cfg.fillInDefaults() ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port)) - require.NoError(t, err) + require.NoError(tb, err) _, portStr, err := net.SplitHostPort(ln.Addr().String()) - require.NoError(t, err) + require.NoError(tb, err) m := &mockCollector{ endpoint: fmt.Sprintf("localhost:%s", portStr), spansStorage: otlptracetest.NewSpansStorage(), @@ -238,9 +238,9 @@ func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector { } if cfg.WithTLS { pem, err := generateWeakCertificate() - require.NoError(t, err) + require.NoError(tb, err) tlsCertificate, err := tls.X509KeyPair(pem.Certificate, pem.PrivateKey) - require.NoError(t, err) + require.NoError(tb, err) server.TLSConfig = &tls.Config{ Certificates: []tls.Certificate{tlsCertificate}, }