You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-11-25 22:41:46 +02:00
Instrument the otlptracehttp exporter (#7486)
Resolve #7006 ### Benchmarks ```console > benchstat bmark.results goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz │ bmark.results │ │ sec/op │ ExporterExportSpans/Observability-8 111.6µ ± 12% ExporterExportSpans/NoObservability-8 108.5µ ± 7% geomean 110.0µ │ bmark.results │ │ B/op │ ExporterExportSpans/Observability-8 20.69Ki ± 0% ExporterExportSpans/NoObservability-8 19.93Ki ± 0% geomean 20.30Ki │ bmark.results │ │ allocs/op │ ExporterExportSpans/Observability-8 251.0 ± 0% ExporterExportSpans/NoObservability-8 247.0 ± 0% geomean 249.0 ```
This commit is contained in:
@@ -18,6 +18,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
Check the `go.opentelemetry.io/otel/exporters/prometheus/internal/x` package documentation for more information. (#7345)
|
||||
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#7353)
|
||||
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#7459)
|
||||
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486)
|
||||
|
||||
### Fixed
|
||||
|
||||
|
||||
@@ -24,6 +24,8 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/counter"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/observ"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/retry"
|
||||
)
|
||||
@@ -62,6 +64,9 @@ type client struct {
|
||||
client *http.Client
|
||||
stopCh chan struct{}
|
||||
stopOnce sync.Once
|
||||
|
||||
instID int64
|
||||
inst *observ.Instrumentation
|
||||
}
|
||||
|
||||
var _ otlptrace.Client = (*client)(nil)
|
||||
@@ -99,18 +104,29 @@ func NewClient(opts ...Option) otlptrace.Client {
|
||||
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
|
||||
stopCh: stopCh,
|
||||
client: httpClient,
|
||||
instID: counter.NextExporterID(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start does nothing in a HTTP client.
|
||||
func (*client) Start(ctx context.Context) error {
|
||||
func (c *client) Start(ctx context.Context) error {
|
||||
// Initialize the instrumentation if not already done.
|
||||
//
|
||||
// Initialize here instead of NewClient to allow any errors to be passed
|
||||
// back to the caller and so that any setup of the environment variables to
|
||||
// enable instrumentation can be set via code.
|
||||
var err error
|
||||
if c.inst == nil {
|
||||
c.inst, err = observ.NewInstrumentation(c.instID, c.cfg.Endpoint)
|
||||
}
|
||||
|
||||
// nothing to do
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
err = errors.Join(err, ctx.Err())
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// Stop shuts down the client and interrupt any in-flight request.
|
||||
@@ -144,6 +160,12 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
||||
return err
|
||||
}
|
||||
|
||||
var statusCode int
|
||||
if d.inst != nil {
|
||||
op := d.inst.ExportSpans(ctx, len(protoSpans))
|
||||
defer func() { op.End(uploadErr, statusCode) }()
|
||||
}
|
||||
|
||||
return errors.Join(uploadErr, d.requestFunc(ctx, func(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -169,7 +191,8 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
||||
}()
|
||||
}
|
||||
|
||||
if sc := resp.StatusCode; sc >= 200 && sc <= 299 {
|
||||
statusCode = resp.StatusCode
|
||||
if statusCode >= 200 && statusCode <= 299 {
|
||||
// Success, do not retry.
|
||||
// Read the partial success message, if any.
|
||||
var respData bytes.Buffer
|
||||
@@ -213,7 +236,7 @@ func (d *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.Resourc
|
||||
}
|
||||
bodyErr := fmt.Errorf("body: %s", respStr)
|
||||
|
||||
switch resp.StatusCode {
|
||||
switch statusCode {
|
||||
case http.StatusTooManyRequests,
|
||||
http.StatusBadGateway,
|
||||
http.StatusServiceUnavailable,
|
||||
|
||||
@@ -17,10 +17,19 @@ import (
|
||||
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/counter"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/observ"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/otlptracetest"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||
"go.opentelemetry.io/otel/sdk/trace/tracetest"
|
||||
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -469,3 +478,155 @@ func TestCollectorRespondingNonProtobufContent(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, mc.GetSpans(), 1)
|
||||
}
|
||||
|
||||
func TestClientInstrumentation(t *testing.T) {
|
||||
// Enable instrumentation for this test.
|
||||
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
|
||||
// Reset client ID to be deterministic
|
||||
const id = 0
|
||||
counter.SetExporterID(id)
|
||||
|
||||
// Save original meter provider and restore at end of test.
|
||||
orig := otel.GetMeterProvider()
|
||||
t.Cleanup(func() { otel.SetMeterProvider(orig) })
|
||||
|
||||
// Create a new meter provider to capture metrics.
|
||||
reader := metric.NewManualReader()
|
||||
mp := metric.NewMeterProvider(metric.WithReader(reader))
|
||||
otel.SetMeterProvider(mp)
|
||||
|
||||
const n, msg = 2, "partially successful"
|
||||
mc := runMockCollector(t, mockCollectorConfig{
|
||||
InjectHTTPStatus: []int{400},
|
||||
Partial: &coltracepb.ExportTracePartialSuccess{
|
||||
RejectedSpans: n,
|
||||
ErrorMessage: msg,
|
||||
},
|
||||
})
|
||||
t.Cleanup(func() { require.NoError(t, mc.Stop()) })
|
||||
|
||||
driver := otlptracehttp.NewClient(
|
||||
otlptracehttp.WithEndpoint(mc.Endpoint()),
|
||||
otlptracehttp.WithInsecure(),
|
||||
)
|
||||
exporter, err := otlptrace.New(t.Context(), driver)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = exporter.ExportSpans(t.Context(), otlptracetest.SingleReadOnlySpan())
|
||||
assert.Error(t, err)
|
||||
|
||||
require.NoError(t, exporter.Shutdown(t.Context()))
|
||||
var got metricdata.ResourceMetrics
|
||||
require.NoError(t, reader.Collect(t.Context(), &got))
|
||||
|
||||
attrs := observ.BaseAttrs(id, mc.endpoint)
|
||||
|
||||
want := metricdata.ScopeMetrics{
|
||||
Scope: instrumentation.Scope{
|
||||
Name: observ.ScopeName,
|
||||
Version: observ.Version,
|
||||
SchemaURL: observ.SchemaURL,
|
||||
},
|
||||
Metrics: []metricdata.Metrics{
|
||||
{
|
||||
Name: otelconv.SDKExporterSpanInflight{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanInflight{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanInflight{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{Attributes: attribute.NewSet(attrs...)},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: otelconv.SDKExporterSpanExported{}.Name(),
|
||||
Description: otelconv.SDKExporterSpanExported{}.Description(),
|
||||
Unit: otelconv.SDKExporterSpanExported{}.Unit(),
|
||||
Data: metricdata.Sum[int64]{
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{Attributes: attribute.NewSet(attrs...)},
|
||||
{Attributes: attribute.NewSet(append(
|
||||
attrs,
|
||||
otelconv.SDKExporterSpanExported{}.AttrErrorType("*errors.joinError"),
|
||||
)...)},
|
||||
},
|
||||
Temporality: 0x1,
|
||||
IsMonotonic: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: otelconv.SDKExporterOperationDuration{}.Name(),
|
||||
Description: otelconv.SDKExporterOperationDuration{}.Description(),
|
||||
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
|
||||
Data: metricdata.Histogram[float64]{
|
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{
|
||||
{Attributes: attribute.NewSet(append(
|
||||
attrs,
|
||||
otelconv.SDKExporterOperationDuration{}.AttrErrorType("*errors.joinError"),
|
||||
otelconv.SDKExporterOperationDuration{}.AttrHTTPResponseStatusCode(400),
|
||||
)...)},
|
||||
},
|
||||
Temporality: 0x1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
require.Len(t, got.ScopeMetrics, 1)
|
||||
opt := []metricdatatest.Option{
|
||||
metricdatatest.IgnoreTimestamp(),
|
||||
metricdatatest.IgnoreExemplars(),
|
||||
metricdatatest.IgnoreValue(),
|
||||
}
|
||||
metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], opt...)
|
||||
}
|
||||
|
||||
func BenchmarkExporterExportSpans(b *testing.B) {
|
||||
const n = 10
|
||||
|
||||
run := func(b *testing.B) {
|
||||
mc := runMockCollector(b, mockCollectorConfig{
|
||||
Partial: &coltracepb.ExportTracePartialSuccess{
|
||||
RejectedSpans: 5,
|
||||
ErrorMessage: "partially successful",
|
||||
},
|
||||
})
|
||||
b.Cleanup(func() { require.NoError(b, mc.Stop()) })
|
||||
|
||||
c := otlptracehttp.NewClient(
|
||||
otlptracehttp.WithEndpoint(mc.Endpoint()),
|
||||
otlptracehttp.WithInsecure(),
|
||||
)
|
||||
exp, err := otlptrace.New(b.Context(), c)
|
||||
require.NoError(b, err)
|
||||
b.Cleanup(func() {
|
||||
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
|
||||
assert.NoError(b, exp.Shutdown(context.Background()))
|
||||
})
|
||||
|
||||
stubs := make([]tracetest.SpanStub, n)
|
||||
for i := range stubs {
|
||||
stubs[i].Name = fmt.Sprintf("Span %d", i)
|
||||
}
|
||||
spans := tracetest.SpanStubs(stubs).Snapshots()
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
||||
for b.Loop() {
|
||||
err = exp.ExportSpans(b.Context(), spans)
|
||||
}
|
||||
_ = err
|
||||
}
|
||||
|
||||
b.Run("Observability", func(b *testing.B) {
|
||||
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
|
||||
run(b)
|
||||
})
|
||||
|
||||
b.Run("NoObservability", func(b *testing.B) {
|
||||
b.Setenv("OTEL_GO_X_OBSERVABILITY", "false")
|
||||
run(b)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
// Code generated by gotmpl. DO NOT MODIFY.
|
||||
// source: internal/shared/counter/counter.go.tmpl
|
||||
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Package counter provides a simple counter for generating unique IDs.
|
||||
//
|
||||
// This package is used to generate unique IDs while allowing testing packages
|
||||
// to reset the counter.
|
||||
package counter // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/counter"
|
||||
|
||||
import "sync/atomic"
|
||||
|
||||
// exporterN is a global 0-based count of the number of exporters created.
|
||||
var exporterN atomic.Int64
|
||||
|
||||
// NextExporterID returns the next unique ID for an exporter.
|
||||
func NextExporterID() int64 {
|
||||
const inc = 1
|
||||
return exporterN.Add(inc) - inc
|
||||
}
|
||||
|
||||
// SetExporterID sets the exporter ID counter to v and returns the previous
|
||||
// value.
|
||||
//
|
||||
// This function is useful for testing purposes, allowing you to reset the
|
||||
// counter. It should not be used in production code.
|
||||
func SetExporterID(v int64) int64 {
|
||||
return exporterN.Swap(v)
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
// Code generated by gotmpl. DO NOT MODIFY.
|
||||
// source: internal/shared/counter/counter_test.go.tmpl
|
||||
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package counter
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestNextExporterID(t *testing.T) {
|
||||
SetExporterID(0)
|
||||
|
||||
var expected int64
|
||||
for range 10 {
|
||||
id := NextExporterID()
|
||||
if id != expected {
|
||||
t.Errorf("NextExporterID() = %d; want %d", id, expected)
|
||||
}
|
||||
expected++
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetExporterID(t *testing.T) {
|
||||
SetExporterID(0)
|
||||
|
||||
prev := SetExporterID(42)
|
||||
if prev != 0 {
|
||||
t.Errorf("SetExporterID(42) returned %d; want 0", prev)
|
||||
}
|
||||
|
||||
id := NextExporterID()
|
||||
if id != 42 {
|
||||
t.Errorf("NextExporterID() = %d; want 42", id)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNextExporterIDConcurrentSafe(t *testing.T) {
|
||||
SetExporterID(0)
|
||||
|
||||
const goroutines = 100
|
||||
const increments = 10
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(goroutines)
|
||||
|
||||
for range goroutines {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for range increments {
|
||||
NextExporterID()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
expected := int64(goroutines * increments)
|
||||
if id := NextExporterID(); id != expected {
|
||||
t.Errorf("NextExporterID() = %d; want %d", id, expected)
|
||||
}
|
||||
}
|
||||
@@ -25,4 +25,7 @@ package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/ot
|
||||
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlptrace/otlptracetest/otlptest.go.tmpl "--data={}" --out=otlptracetest/otlptest.go
|
||||
|
||||
//go:generate gotmpl --body=../../../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp\" }" --out=x/x.go
|
||||
|
||||
//go:generate gotmpl --body=../../../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp/internal/counter\" }" --out=counter/counter.go
|
||||
//go:generate gotmpl --body=../../../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go
|
||||
//go:generate gotmpl --body=../../../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go
|
||||
|
||||
@@ -213,12 +213,12 @@ func (c *mockCollectorConfig) fillInDefaults() {
|
||||
}
|
||||
}
|
||||
|
||||
func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector {
|
||||
func runMockCollector(tb testing.TB, cfg mockCollectorConfig) *mockCollector {
|
||||
cfg.fillInDefaults()
|
||||
ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", cfg.Port))
|
||||
require.NoError(t, err)
|
||||
require.NoError(tb, err)
|
||||
_, portStr, err := net.SplitHostPort(ln.Addr().String())
|
||||
require.NoError(t, err)
|
||||
require.NoError(tb, err)
|
||||
m := &mockCollector{
|
||||
endpoint: fmt.Sprintf("localhost:%s", portStr),
|
||||
spansStorage: otlptracetest.NewSpansStorage(),
|
||||
@@ -238,9 +238,9 @@ func runMockCollector(t *testing.T, cfg mockCollectorConfig) *mockCollector {
|
||||
}
|
||||
if cfg.WithTLS {
|
||||
pem, err := generateWeakCertificate()
|
||||
require.NoError(t, err)
|
||||
require.NoError(tb, err)
|
||||
tlsCertificate, err := tls.X509KeyPair(pem.Certificate, pem.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
require.NoError(tb, err)
|
||||
server.TLSConfig = &tls.Config{
|
||||
Certificates: []tls.Certificate{tlsCertificate},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user