1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-05-22 09:35:21 +02:00

feat: add self-observability metrics to otlpmetrichttp metric exporters (#8194)

Fixes https://github.com/open-telemetry/opentelemetry-go/issues/7011

This PR updates
https://github.com/open-telemetry/opentelemetry-go/pull/7493, and
addresses outstanding comments. See commit descriptions for the changes
made on top of that PR. Credit to @tongoss for most of the work.

Benchmark results:

```
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ
cpu: Intel(R) Xeon(R) CPU @ 2.20GHz
                                             │ sec/op      │
InstrumentationExportMetrics/NoError-24        787.4n ± 5%
InstrumentationExportMetrics/PartialError-24   4.137µ ± 9%
InstrumentationExportMetrics/FullError-24      3.938µ ± 7%
geomean                                        2.341µ

                                             │  B/op       │
InstrumentationExportMetrics/NoError-24        0.000 ± 0%
InstrumentationExportMetrics/PartialError-24   787.0 ± 0%
InstrumentationExportMetrics/FullError-24      787.0 ± 0%
geomean                                                   ¹
¹ summaries must be >0 to compute geomean

                                             │ allocs/op   │
InstrumentationExportMetrics/NoError-24        0.000 ± 0%
InstrumentationExportMetrics/PartialError-24   6.000 ± 0%
InstrumentationExportMetrics/FullError-24      6.000 ± 0%
geomean                                                   ¹
¹ summaries must be >0 to compute geomean
```
```
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp
cpu: Intel(R) Xeon(R) CPU @ 2.20GHz
                                         │    sec/op     │
ExporterExportMetrics/Observability-24     433.5µ ± 4%
ExporterExportMetrics/NoObservability-24   439.3µ ± 4%
geomean                                    436.4µ

                                         │     B/op      │
ExporterExportMetrics/Observability-24     28.72Ki ± 2%
ExporterExportMetrics/NoObservability-24   28.71Ki ± 2%
geomean                                    28.71Ki

                                         │   allocs/op   │
ExporterExportMetrics/Observability-24     446.0 ± 0%
ExporterExportMetrics/NoObservability-24   446.0 ± 0%
geomean                                    446.0

```

Written with assistance from Gemini.

---------

Co-authored-by: Robert Wu <robertxtw@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Flc゛ <i@flc.io>
This commit is contained in:
David Ashpole
2026-04-21 14:58:55 -04:00
committed by GitHub
parent 63ee0c59c6
commit 5ecaa5b54f
19 changed files with 1703 additions and 11 deletions
+1
View File
@@ -29,6 +29,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
See `go.opentelemetry.io/otel/sdk/metric/internal/x` for feature documentation. (#8071)
- Add `WithDefaultAttributes` to `go.opentelemetry.io/otel/metric/x` to support setting default attributes on instruments. (#8135)
- Add `Settable` to `go.opentelemetry.io/otel/metric/x` to allow reusing attribute options. (#8178)
- Add experimental self-observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#8194)
### Changed
@@ -23,6 +23,8 @@ import (
"google.golang.org/protobuf/proto"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/retry"
)
@@ -33,6 +35,8 @@ type client struct {
compression Compression
requestFunc retry.RequestFunc
httpClient *http.Client
inst *observ.Instrumentation
}
// Keep it in sync with golang's DefaultTransport from net/http! We
@@ -111,12 +115,16 @@ func newClient(cfg oconf.Config) (*client, error) {
}
req.Header.Set("Content-Type", "application/x-protobuf")
// Initialize the instrumentation.
inst, err := observ.NewInstrumentation(counter.NextExporterID(), cfg.Metrics.Endpoint)
return &client{
compression: Compression(cfg.Metrics.Compression),
req: req,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
httpClient: httpClient,
}, nil
inst: inst,
}, err
}
// Shutdown shuts down the client, freeing all resources.
@@ -151,6 +159,12 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
return err
}
var statusCode int
if c.inst != nil {
op := c.inst.ExportMetrics(ctx, protoMetrics)
defer func() { op.End(uploadErr, statusCode) }()
}
return errors.Join(uploadErr, c.requestFunc(ctx, func(iCtx context.Context) error {
select {
case <-iCtx.Done():
@@ -158,6 +172,7 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
default:
}
statusCode = 0
request.reset(iCtx)
// nolint:gosec // URL is constructed from validated OTLP endpoint configuration
resp, err := c.httpClient.Do(request.Request)
@@ -168,15 +183,18 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
if err != nil {
return err
}
if resp != nil && resp.Body != nil {
defer func() {
if err := resp.Body.Close(); err != nil {
uploadErr = errors.Join(uploadErr, err)
}
}()
if resp != nil {
statusCode = resp.StatusCode
if resp.Body != nil {
defer func() {
if err := resp.Body.Close(); err != nil {
uploadErr = errors.Join(uploadErr, err)
}
}()
}
}
if sc := resp.StatusCode; sc >= 200 && sc <= 299 {
if statusCode >= 200 && statusCode <= 299 {
// Success, do not retry.
// Read the partial success message, if any.
@@ -21,12 +21,23 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/otest"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.40.0"
"go.opentelemetry.io/otel/semconv/v1.40.0/otelconv"
)
type clientShim struct {
@@ -503,3 +514,288 @@ func TestResponseBodySizeLimit(t *testing.T) {
})
}
}
func TestClientInstrumentation(t *testing.T) {
// Enable instrumentation for this test.
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
// Reset client ID to be deterministic.
const id = 0
origID := counter.SetExporterID(id)
t.Cleanup(func() { counter.SetExporterID(origID) })
// Save original meter provider and restore at end of test.
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })
// Create a new meter provider to capture metrics.
reader := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(reader))
otel.SetMeterProvider(mp)
const n, msg = 2, "partially successful"
rCh := make(chan otest.ExportResult, 1)
// Test partial success - return HTTP 200 with partial success info
rCh <- otest.ExportResult{
Response: &colmetricpb.ExportMetricsServiceResponse{
PartialSuccess: &colmetricpb.ExportMetricsPartialSuccess{
RejectedDataPoints: n,
ErrorMessage: msg,
},
},
}
coll, err := otest.NewHTTPCollector("", rCh)
require.NoError(t, err)
t.Cleanup(func() {
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
require.NoError(t, coll.Shutdown(context.Background()))
})
t.Cleanup(func() { close(rCh) })
addr := coll.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
ctx := t.Context()
exp, err := New(ctx, opts...)
require.NoError(t, err)
// Export some test data
err = exp.Export(ctx, &metricdata.ResourceMetrics{
Resource: resource.NewWithAttributes(semconv.SchemaURL, attribute.String("service.name", "test")),
ScopeMetrics: []metricdata.ScopeMetrics{
{
Scope: instrumentation.Scope{Name: "test"},
Metrics: []metricdata.Metrics{
{
Name: "test-metric",
Data: metricdata.Gauge[int64]{DataPoints: []metricdata.DataPoint[int64]{{Value: 42}}},
},
},
},
},
})
// Should get partial success error
wantErr := internal.MetricPartialSuccessError(n, msg)
require.ErrorIs(t, err, wantErr, "Expected partial success error")
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
require.NoError(t, exp.Shutdown(context.Background()))
var got metricdata.ResourceMetrics
require.NoError(t, reader.Collect(ctx, &got))
attrs := observ.BaseAttrs(id, addr)
want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: observ.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(),
Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(),
Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: attribute.NewSet(attrs...)},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: otelconv.SDKExporterMetricDataPointExported{}.Name(),
Description: otelconv.SDKExporterMetricDataPointExported{}.Description(),
Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: attribute.NewSet(attrs...)},
{Attributes: attribute.NewSet(append(
attrs,
semconv.ErrorType(err),
)...)},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{Attributes: attribute.NewSet(append(
attrs,
semconv.ErrorType(err),
otelconv.SDKExporterOperationDuration{}.AttrHTTPResponseStatusCode(200),
)...)},
},
Temporality: metricdata.CumulativeTemporality,
},
},
},
}
require.Len(t, got.ScopeMetrics, 1)
opt := []metricdatatest.Option{
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
metricdatatest.IgnoreValue(),
}
metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], opt...)
}
func BenchmarkExporterExportMetrics(b *testing.B) {
const n = 10
run := func(b *testing.B) {
coll, err := otest.NewHTTPCollector("", nil)
require.NoError(b, err)
b.Cleanup(func() {
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
require.NoError(b, coll.Shutdown(context.Background()))
})
opts := []Option{WithEndpoint(coll.Addr().String()), WithInsecure()}
ctx := b.Context()
exp, err := New(ctx, opts...)
require.NoError(b, err)
b.Cleanup(func() {
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
assert.NoError(b, exp.Shutdown(context.Background()))
})
// Generate realistic test metric data with multiple metrics.
now := time.Now()
rm := &metricdata.ResourceMetrics{
ScopeMetrics: []metricdata.ScopeMetrics{
{
Scope: instrumentation.Scope{
Name: "test",
Version: "v1.0.0",
},
Metrics: make([]metricdata.Metrics, n),
},
},
}
for i := range rm.ScopeMetrics[0].Metrics {
rm.ScopeMetrics[0].Metrics[i] = metricdata.Metrics{
Name: fmt.Sprintf("test_counter_%d", i),
Description: fmt.Sprintf("A test counter %d", i),
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("test", "value"),
attribute.Int("counter", i),
),
StartTime: now,
Time: now,
Value: int64(i * 10),
},
},
},
}
}
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
err = exp.Export(b.Context(), rm)
}
_ = err
}
b.Run("Observability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
run(b)
})
b.Run("NoObservability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "false")
run(b)
})
}
func TestClientInstrumentationStaleStatusCode(t *testing.T) {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
const id = 0
origID := counter.SetExporterID(id)
t.Cleanup(func() { counter.SetExporterID(origID) })
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })
reader := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(reader))
otel.SetMeterProvider(mp)
// Use a client that returns a 503 error once and then a network error.
var calls int
client := &http.Client{
Transport: roundTripperFunc(func(_ *http.Request) (*http.Response, error) {
calls++
if calls == 1 {
return &http.Response{
StatusCode: http.StatusServiceUnavailable,
Status: fmt.Sprintf("%d %s",
http.StatusServiceUnavailable,
http.StatusText(http.StatusServiceUnavailable)),
Header: make(http.Header),
Body: io.NopCloser(strings.NewReader("")),
}, nil
}
return nil, errors.New("network error")
}),
}
ctx := t.Context()
exp, err := New(ctx,
WithHTTPClient(client),
WithInsecure(),
WithRetry(RetryConfig{
Enabled: true,
InitialInterval: time.Nanosecond,
MaxInterval: time.Nanosecond,
MaxElapsedTime: time.Second,
}),
)
require.NoError(t, err)
err = exp.Export(ctx, &metricdata.ResourceMetrics{})
assert.Error(t, err)
require.NoError(t, exp.Shutdown(ctx))
// Validate that the status code is 0 and not the stale 503 on self-observability metrics.
var got metricdata.ResourceMetrics
require.NoError(t, reader.Collect(ctx, &got))
require.Len(t, got.ScopeMetrics, 1)
metrics := got.ScopeMetrics[0].Metrics
var found bool
for _, m := range metrics {
if m.Name != (otelconv.SDKExporterOperationDuration{}).Name() {
continue
}
found = true
data := m.Data.(metricdata.Histogram[float64])
require.NotEmpty(t, data.DataPoints)
dp := data.DataPoints[0]
_, ok := dp.Attributes.Value(otelconv.SDKExporterOperationDuration{}.AttrHTTPResponseStatusCode(0).Key)
assert.False(t, ok, "should not report status code when the request fails before getting a response.")
}
assert.True(t, found, "expected to find operation duration metric")
}
type roundTripperFunc func(*http.Request) (*http.Response, error)
func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}
@@ -6,9 +6,11 @@ retract v0.32.2 // Contains unresolvable dependencies.
require (
github.com/cenkalti/backoff/v5 v5.0.3
github.com/go-logr/logr v1.4.3
github.com/google/go-cmp v0.7.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/metric v1.43.0
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
go.opentelemetry.io/proto/otlp v1.10.0
@@ -19,13 +21,11 @@ require (
require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.29.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/metric v1.43.0 // indirect
go.opentelemetry.io/otel/trace v1.43.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/sys v0.43.0 // indirect
@@ -0,0 +1,31 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/counter/counter.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package counter provides a simple counter for generating unique IDs.
//
// This package is used to generate unique IDs while allowing testing packages
// to reset the counter.
package counter // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter"
import "sync/atomic"
// exporterN is a global 0-based count of the number of exporters created.
var exporterN atomic.Int64
// NextExporterID returns the next unique ID for an exporter.
func NextExporterID() int64 {
const inc = 1
return exporterN.Add(inc) - inc
}
// SetExporterID sets the exporter ID counter to v and returns the previous
// value.
//
// This function is useful for testing purposes, allowing you to reset the
// counter. It should not be used in production code.
func SetExporterID(v int64) int64 {
return exporterN.Swap(v)
}
@@ -0,0 +1,65 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/counter/counter_test.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package counter
import (
"sync"
"testing"
)
func TestNextExporterID(t *testing.T) {
SetExporterID(0)
var expected int64
for range 10 {
id := NextExporterID()
if id != expected {
t.Errorf("NextExporterID() = %d; want %d", id, expected)
}
expected++
}
}
func TestSetExporterID(t *testing.T) {
SetExporterID(0)
prev := SetExporterID(42)
if prev != 0 {
t.Errorf("SetExporterID(42) returned %d; want 0", prev)
}
id := NextExporterID()
if id != 42 {
t.Errorf("NextExporterID() = %d; want 42", id)
}
}
func TestNextExporterIDConcurrentSafe(t *testing.T) {
SetExporterID(0)
const goroutines = 100
const increments = 10
var wg sync.WaitGroup
wg.Add(goroutines)
for range goroutines {
go func() {
defer wg.Done()
for range increments {
NextExporterID()
}
}()
}
wg.Wait()
expected := int64(goroutines * increments)
if id := NextExporterID(); id != expected {
t.Errorf("NextExporterID() = %d; want %d", id, expected)
}
}
@@ -30,3 +30,9 @@ package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/o
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlpmetric/transform/error_test.go.tmpl "--data={}" --out=transform/error_test.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlpmetric/transform/metricdata.go.tmpl "--data={}" --out=transform/metricdata.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/otlpmetric/transform/metricdata_test.go.tmpl "--data={}" --out=transform/metricdata_test.go
//go:generate gotmpl --body=../../../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter\" }" --out=counter/counter.go
//go:generate gotmpl --body=../../../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go
//go:generate gotmpl --body=../../../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x\" }" --out=x/x.go
//go:generate gotmpl --body=../../../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go
@@ -0,0 +1,42 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ"
import metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
// countDataPoints counts the total number of data points in a ResourceMetrics.
func countDataPoints(rm *metricpb.ResourceMetrics) int64 {
if rm == nil {
return 0
}
var total int64
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
switch data := m.Data.(type) {
case *metricpb.Metric_Gauge:
if data.Gauge != nil {
total += int64(len(data.Gauge.DataPoints))
}
case *metricpb.Metric_Sum:
if data.Sum != nil {
total += int64(len(data.Sum.DataPoints))
}
case *metricpb.Metric_Histogram:
if data.Histogram != nil {
total += int64(len(data.Histogram.DataPoints))
}
case *metricpb.Metric_ExponentialHistogram:
if data.ExponentialHistogram != nil {
total += int64(len(data.ExponentialHistogram.DataPoints))
}
case *metricpb.Metric_Summary:
if data.Summary != nil {
total += int64(len(data.Summary.DataPoints))
}
}
}
}
return total
}
@@ -0,0 +1,169 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ
import (
"testing"
"github.com/stretchr/testify/assert"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
func TestCountDataPoints(t *testing.T) {
tests := []struct {
name string
rm *metricpb.ResourceMetrics
want int64
}{
{
name: "nil",
rm: nil,
want: 0,
},
{
name: "empty",
rm: &metricpb.ResourceMetrics{},
want: 0,
},
{
name: "gauge",
rm: &metricpb.ResourceMetrics{
ScopeMetrics: []*metricpb.ScopeMetrics{
{
Metrics: []*metricpb.Metric{
{
Data: &metricpb.Metric_Gauge{
Gauge: &metricpb.Gauge{
DataPoints: []*metricpb.NumberDataPoint{{}, {}},
},
},
},
},
},
},
},
want: 2,
},
{
name: "sum",
rm: &metricpb.ResourceMetrics{
ScopeMetrics: []*metricpb.ScopeMetrics{
{
Metrics: []*metricpb.Metric{
{
Data: &metricpb.Metric_Sum{
Sum: &metricpb.Sum{
DataPoints: []*metricpb.NumberDataPoint{{}, {}, {}},
},
},
},
},
},
},
},
want: 3,
},
{
name: "histogram",
rm: &metricpb.ResourceMetrics{
ScopeMetrics: []*metricpb.ScopeMetrics{
{
Metrics: []*metricpb.Metric{
{
Data: &metricpb.Metric_Histogram{
Histogram: &metricpb.Histogram{
DataPoints: []*metricpb.HistogramDataPoint{{}},
},
},
},
},
},
},
},
want: 1,
},
{
name: "exponential_histogram",
rm: &metricpb.ResourceMetrics{
ScopeMetrics: []*metricpb.ScopeMetrics{
{
Metrics: []*metricpb.Metric{
{
Data: &metricpb.Metric_ExponentialHistogram{
ExponentialHistogram: &metricpb.ExponentialHistogram{
DataPoints: []*metricpb.ExponentialHistogramDataPoint{{}, {}},
},
},
},
},
},
},
},
want: 2,
},
{
name: "summary",
rm: &metricpb.ResourceMetrics{
ScopeMetrics: []*metricpb.ScopeMetrics{
{
Metrics: []*metricpb.Metric{
{
Data: &metricpb.Metric_Summary{
Summary: &metricpb.Summary{
DataPoints: []*metricpb.SummaryDataPoint{{}, {}, {}, {}},
},
},
},
},
},
},
},
want: 4,
},
{
name: "multiple",
rm: &metricpb.ResourceMetrics{
ScopeMetrics: []*metricpb.ScopeMetrics{
{
Metrics: []*metricpb.Metric{
{
Data: &metricpb.Metric_Gauge{
Gauge: &metricpb.Gauge{
DataPoints: []*metricpb.NumberDataPoint{{}},
},
},
},
{
Data: &metricpb.Metric_Sum{
Sum: &metricpb.Sum{
DataPoints: []*metricpb.NumberDataPoint{{}, {}},
},
},
},
},
},
{
Metrics: []*metricpb.Metric{
{
Data: &metricpb.Metric_Histogram{
Histogram: &metricpb.Histogram{
DataPoints: []*metricpb.HistogramDataPoint{{}, {}, {}},
},
},
},
},
},
},
},
want: 6,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := countDataPoints(tt.rm)
assert.Equal(t, tt.want, got)
})
}
}
@@ -0,0 +1,411 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package observ provides experimental observability instrumentation for the
// otlpmetrichttp exporter.
package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ"
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"net/netip"
"strconv"
"strings"
"sync"
"time"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.40.0"
"go.opentelemetry.io/otel/semconv/v1.40.0/otelconv"
)
const (
// ScopeName is the unique name of the meter used for instrumentation.
ScopeName = "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ"
// SchemaURL is the schema URL of the metrics produced by this
// instrumentation.
SchemaURL = semconv.SchemaURL
// Version is the current version of this instrumentation.
//
// This matches the version of the exporter.
Version = internal.Version
)
var (
measureAttrsPool = &sync.Pool{
New: func() any {
const n = 1 + // component.name
1 + // component.type
1 + // server.addr
1 + // server.port
1 + // error.type
1 // http.response.status_code
s := make([]attribute.KeyValue, 0, n)
// Return a pointer to a slice instead of a slice itself
// to avoid allocations on every call.
return &s
},
}
addOptPool = &sync.Pool{
New: func() any {
const n = 1 // WithAttributeSet
o := make([]metric.AddOption, 0, n)
return &o
},
}
recordOptPool = &sync.Pool{
New: func() any {
const n = 1 // WithAttributeSet
o := make([]metric.RecordOption, 0, n)
return &o
},
}
)
func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) }
func put[T any](p *sync.Pool, s *[]T) {
*s = (*s)[:0] // Reset.
p.Put(s)
}
// ComponentName returns the component name for the exporter with the
// provided ID.
func ComponentName(id int64) string {
t := semconv.OTelComponentTypeOtlpHTTPMetricExporter.Value.AsString()
return fmt.Sprintf("%s/%d", t, id)
}
// Instrumentation is experimental instrumentation for the exporter.
type Instrumentation struct {
inflightMetric metric.Int64UpDownCounter
exportedMetric metric.Int64Counter
opDuration metric.Float64Histogram
attrs []attribute.KeyValue
addOpt metric.AddOption
recOpt metric.RecordOption
}
// NewInstrumentation returns instrumentation for an OTLP over HTTP metric
// exporter with the provided ID and endpoint. It uses the global
// MeterProvider to create the instrumentation.
//
// The id should be the unique exporter instance ID. It is used
// to set the "component.name" attribute.
//
// The endpoint is the HTTP endpoint the exporter is exporting to.
//
// If the experimental observability is disabled, nil is returned.
func NewInstrumentation(id int64, endpoint string) (*Instrumentation, error) {
if !x.Observability.Enabled() {
return nil, nil
}
attrs := BaseAttrs(id, endpoint)
i := &Instrumentation{
attrs: attrs,
addOpt: metric.WithAttributeSet(attribute.NewSet(attrs...)),
// Do not modify attrs (NewSet sorts in-place), make a new slice.
recOpt: metric.WithAttributeSet(attribute.NewSet(append(
// Default to OK status code (200).
[]attribute.KeyValue{semconv.HTTPResponseStatusCode(http.StatusOK)},
attrs...,
)...)),
}
mp := otel.GetMeterProvider()
m := mp.Meter(
ScopeName,
metric.WithInstrumentationVersion(Version),
metric.WithSchemaURL(SchemaURL),
)
var err error
inflightMetric, e := otelconv.NewSDKExporterMetricDataPointInflight(m)
if e != nil {
e = fmt.Errorf("failed to create inflight metric: %w", e)
err = errors.Join(err, e)
}
i.inflightMetric = inflightMetric.Inst()
exportedMetric, e := otelconv.NewSDKExporterMetricDataPointExported(m)
if e != nil {
e = fmt.Errorf("failed to create exported metric: %w", e)
err = errors.Join(err, e)
}
i.exportedMetric = exportedMetric.Inst()
opDuration, e := otelconv.NewSDKExporterOperationDuration(m)
if e != nil {
e = fmt.Errorf("failed to create operation duration metric: %w", e)
err = errors.Join(err, e)
}
i.opDuration = opDuration.Inst()
return i, err
}
// BaseAttrs returns the base attributes for the exporter with the provided ID
// and endpoint.
//
// The id should be the unique exporter instance ID. It is used
// to set the "component.name" attribute.
//
// The endpoint is the HTTP endpoint the exporter is exporting to. It should be
// in the format "host[:port]".
func BaseAttrs(id int64, endpoint string) []attribute.KeyValue {
host, port, err := parseEndpoint(endpoint)
if err != nil || (host == "" && port < 0) {
if err != nil {
global.Debug("failed to parse endpoint", "endpoint", endpoint, "error", err)
}
return []attribute.KeyValue{
semconv.OTelComponentName(ComponentName(id)),
semconv.OTelComponentTypeOtlpHTTPMetricExporter,
}
}
// Do not use append so the slice is exactly allocated.
if port < 0 {
return []attribute.KeyValue{
semconv.OTelComponentName(ComponentName(id)),
semconv.OTelComponentTypeOtlpHTTPMetricExporter,
semconv.ServerAddress(host),
}
}
if host == "" {
return []attribute.KeyValue{
semconv.OTelComponentName(ComponentName(id)),
semconv.OTelComponentTypeOtlpHTTPMetricExporter,
semconv.ServerPort(port),
}
}
return []attribute.KeyValue{
semconv.OTelComponentName(ComponentName(id)),
semconv.OTelComponentTypeOtlpHTTPMetricExporter,
semconv.ServerAddress(host),
semconv.ServerPort(port),
}
}
// parseEndpoint parses the host and port from endpoint that has the form
// "host[:port]", or it returns an error if the endpoint is not parsable.
//
// If no port is specified, -1 is returned.
//
// If no host is specified, an empty string is returned.
func parseEndpoint(endpoint string) (string, int, error) {
// First check if the endpoint is just an IP address.
if ip := parseIP(endpoint); ip != "" {
return ip, -1, nil
}
// If there's no colon, there is no port (IPv6 with no port checked above).
if !strings.Contains(endpoint, ":") {
return endpoint, -1, nil
}
// Otherwise, parse as host:port.
host, portStr, err := net.SplitHostPort(endpoint)
if err != nil {
return "", -1, fmt.Errorf("invalid host:port %q: %w", endpoint, err)
}
const base, bitSize = 10, 16
port16, err := strconv.ParseUint(portStr, base, bitSize)
if err != nil {
return "", -1, fmt.Errorf("invalid port %q: %w", portStr, err)
}
port := int(port16) // port is guaranteed to be in the range [0, 65535].
return host, port, nil
}
// parseIP attempts to parse the entire endpoint as an IP address.
// It returns the normalized string form of the IP if successful,
// or an empty string if parsing fails.
func parseIP(ip string) string {
// Strip leading and trailing brackets for IPv6 addresses.
if len(ip) >= 2 && ip[0] == '[' && ip[len(ip)-1] == ']' {
ip = ip[1 : len(ip)-1]
}
addr, err := netip.ParseAddr(ip)
if err != nil {
return ""
}
// Return the normalized string form of the IP.
return addr.String()
}
// ExportMetrics instruments the UploadMetrics method of the client. It returns an
// [ExportOp] that must have its [ExportOp.End] method called when the
// operation ends.
func (i *Instrumentation) ExportMetrics(ctx context.Context, rm *metricpb.ResourceMetrics) ExportOp {
start := time.Now()
nMetrics := countDataPoints(rm)
if i.inflightMetric.Enabled(ctx) {
addOpt := get[metric.AddOption](addOptPool)
defer put(addOptPool, addOpt)
*addOpt = append(*addOpt, i.addOpt)
i.inflightMetric.Add(ctx, nMetrics, *addOpt...)
}
return ExportOp{
ctx: ctx,
start: start,
nMetrics: nMetrics,
inst: i,
}
}
// ExportOp tracks the export operation being observed by
// [Instrumentation.ExportMetrics].
type ExportOp struct {
ctx context.Context
start time.Time
nMetrics int64
inst *Instrumentation
}
// End completes the observation of the operation being observed by a call to
// [Instrumentation.ExportMetrics].
//
// Any error that is encountered is provided as err.
// The HTTP status code from the response is provided as status.
//
// If err is not nil, all metrics will be recorded as failures unless error is of
// type [internal.PartialSuccess]. In the case of a PartialSuccess, the number
// of successfully exported metrics will be determined by inspecting the
// RejectedItems field of the PartialSuccess.
func (e ExportOp) End(err error, status int) {
addOpt := get[metric.AddOption](addOptPool)
defer put(addOptPool, addOpt)
*addOpt = append(*addOpt, e.inst.addOpt)
if e.inst.inflightMetric.Enabled(e.ctx) {
e.inst.inflightMetric.Add(e.ctx, -e.nMetrics, *addOpt...)
}
success := successful(e.nMetrics, err)
// Record successfully exported metrics, even if the value is 0 which are
// meaningful to distribution aggregations.
if e.inst.exportedMetric.Enabled(e.ctx) {
e.inst.exportedMetric.Add(e.ctx, success, *addOpt...)
}
if err != nil && e.inst.exportedMetric.Enabled(e.ctx) {
attrs := get[attribute.KeyValue](measureAttrsPool)
defer put(measureAttrsPool, attrs)
*attrs = append(*attrs, e.inst.attrs...)
*attrs = append(*attrs, semconv.ErrorType(err))
// Do not inefficiently make a copy of attrs by using
// WithAttributes instead of WithAttributeSet.
o := metric.WithAttributeSet(attribute.NewSet(*attrs...))
// Reset addOpt with new attribute set.
*addOpt = append((*addOpt)[:0], o)
e.inst.exportedMetric.Add(e.ctx, e.nMetrics-success, *addOpt...)
}
if e.inst.opDuration.Enabled(e.ctx) {
recOpt := get[metric.RecordOption](recordOptPool)
defer put(recordOptPool, recOpt)
*recOpt = append(*recOpt, e.inst.recordOption(err, status))
d := time.Since(e.start).Seconds()
e.inst.opDuration.Record(e.ctx, d, *recOpt...)
}
}
// recordOption returns a RecordOption with attributes representing the
// outcome of the operation being recorded.
//
// If err is nil and status is 200, the default recOpt of the
// Instrumentation is returned.
//
// Otherwise, a new RecordOption is returned with the base attributes of the
// Instrumentation plus the http.response.status_code attribute set to the
// provided status (if non-zero), and if err is not nil, the error.type attribute set
// to the type of the error.
func (i *Instrumentation) recordOption(err error, status int) metric.RecordOption {
if err == nil && status == http.StatusOK {
return i.recOpt
}
attrs := get[attribute.KeyValue](measureAttrsPool)
defer put(measureAttrsPool, attrs)
*attrs = append(*attrs, i.attrs...)
if status != 0 {
*attrs = append(*attrs, semconv.HTTPResponseStatusCode(status))
}
if err != nil {
*attrs = append(*attrs, semconv.ErrorType(err))
}
// Do not inefficiently make a copy of attrs by using WithAttributes
// instead of WithAttributeSet.
return metric.WithAttributeSet(attribute.NewSet(*attrs...))
}
// successful returns the number of successfully exported metrics out of the n
// that were exported based on the provided error.
//
// If err is nil, n is returned. All metrics were successfully exported.
//
// If err is not nil and not an [internal.PartialSuccess] error, 0 is returned.
// It is assumed all metrics failed to be exported.
//
// If err is an [internal.PartialSuccess] error, the number of successfully
// exported metrics is computed by subtracting the RejectedItems field from n. If
// RejectedItems is negative, n is returned. If RejectedItems is greater than
// n, 0 is returned.
func successful(n int64, err error) int64 {
if err == nil {
return n // All metrics successfully exported.
}
// Split rejection calculation so successful is inlinable.
return n - rejected(n, err)
}
var errPartialPool = &sync.Pool{
New: func() any { return new(internal.PartialSuccess) },
}
// rejected returns how many out of the n metrics were rejected based on the
// provided non-nil err.
func rejected(n int64, err error) int64 {
ps := errPartialPool.Get().(*internal.PartialSuccess)
defer errPartialPool.Put(ps)
// Check for partial success.
if errors.As(err, ps) {
// Bound RejectedItems to [0, n]. This should not be needed,
// but be defensive as this is from an external source.
return min(max(ps.RejectedItems, 0), n)
}
return n // All metrics rejected.
}
@@ -0,0 +1,430 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ_test
import (
"errors"
"net/http"
"strconv"
"testing"
"github.com/go-logr/logr"
"github.com/go-logr/logr/testr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ"
"go.opentelemetry.io/otel/internal/global"
mapi "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.40.0"
"go.opentelemetry.io/otel/semconv/v1.40.0/otelconv"
)
const (
ID = 0
ServerAddr = "localhost"
ServerPort = 4318
)
var Endpoint = ServerAddr + ":" + strconv.Itoa(ServerPort)
var Scope = instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: observ.SchemaURL,
}
type errMeterProvider struct {
mapi.MeterProvider
err error
}
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
return &errMeter{err: m.err}
}
type errMeter struct {
mapi.Meter
err error
}
func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) {
return nil, m.err
}
func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
return nil, m.err
}
func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) {
return nil, m.err
}
func TestNewInstrumentationObservabilityErrors(t *testing.T) {
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })
mp := &errMeterProvider{err: assert.AnError}
otel.SetMeterProvider(mp)
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
_, err := observ.NewInstrumentation(ID, Endpoint)
require.ErrorIs(t, err, assert.AnError, "new instrument errors")
assert.ErrorContains(t, err, "inflight metric")
assert.ErrorContains(t, err, "exported metric")
assert.ErrorContains(t, err, "operation duration metric")
}
func TestNewInstrumentationObservabilityDisabled(t *testing.T) {
// Do not set OTEL_GO_X_OBSERVABILITY.
got, err := observ.NewInstrumentation(ID, Endpoint)
assert.NoError(t, err)
assert.Nil(t, got)
}
func setup(t *testing.T) (*observ.Instrumentation, func() metricdata.ScopeMetrics) {
t.Helper()
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
original := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(original) })
r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)
inst, err := observ.NewInstrumentation(ID, Endpoint)
require.NoError(t, err)
require.NotNil(t, inst)
return inst, func() metricdata.ScopeMetrics {
var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(t.Context(), &rm))
require.Len(t, rm.ScopeMetrics, 1)
return rm.ScopeMetrics[0]
}
}
func baseAttrs(err error) []attribute.KeyValue {
attrs := []attribute.KeyValue{
semconv.OTelComponentName(observ.ComponentName(ID)),
semconv.OTelComponentTypeOtlpHTTPMetricExporter,
semconv.ServerAddress(ServerAddr),
semconv.ServerPort(ServerPort),
}
if err != nil {
attrs = append(attrs, semconv.ErrorType(err))
}
return attrs
}
func set(err error) attribute.Set {
return attribute.NewSet(baseAttrs(err)...)
}
func metricInflight() metricdata.Metrics {
return metricdata.Metrics{
Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(),
Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(),
Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: set(nil), Value: 0},
},
},
}
}
func metricExported(success, total int64, err error) metricdata.Metrics {
dp := []metricdata.DataPoint[int64]{
{Attributes: set(nil), Value: success},
}
if err != nil {
dp = append(dp, metricdata.DataPoint[int64]{
Attributes: set(err),
Value: total - success,
})
}
return metricdata.Metrics{
Name: otelconv.SDKExporterMetricDataPointExported{}.Name(),
Description: otelconv.SDKExporterMetricDataPointExported{}.Description(),
Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: dp,
},
}
}
func operationDuration(err error, statusCode int) metricdata.Metrics {
httpSet := func(err error, statusCode int) attribute.Set {
attrs := baseAttrs(err)
attrs = append(attrs, semconv.HTTPResponseStatusCode(statusCode))
return attribute.NewSet(attrs...)
}
return metricdata.Metrics{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{Attributes: httpSet(err, statusCode)},
},
},
}
}
func assertMetrics(t *testing.T, got metricdata.ScopeMetrics, metrics, success int64, err error, statusCode int) {
t.Helper()
assert.Equal(t, Scope, got.Scope, "unexpected scope")
m := got.Metrics
require.Len(t, m, 3, "expected 3 metrics")
o := metricdatatest.IgnoreTimestamp()
want := metricInflight()
metricdatatest.AssertEqual(t, want, m[0], o)
want = metricExported(success, metrics, err)
metricdatatest.AssertEqual(t, want, m[1], o)
want = operationDuration(err, statusCode)
metricdatatest.AssertEqual(t, want, m[2], o, metricdatatest.IgnoreValue())
}
func makeResourceMetrics(n int) *metricpb.ResourceMetrics {
dpts := make([]*metricpb.NumberDataPoint, n)
for i := range n {
dpts[i] = &metricpb.NumberDataPoint{
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 1},
}
}
return &metricpb.ResourceMetrics{
ScopeMetrics: []*metricpb.ScopeMetrics{
{
Metrics: []*metricpb.Metric{
{
Data: &metricpb.Metric_Gauge{
Gauge: &metricpb.Gauge{
DataPoints: dpts,
},
},
},
},
},
},
}
}
func TestInstrumentationExportMetrics(t *testing.T) {
tests := []struct {
name string
err error
statusCode int
numMetrics int
wantSuccess int64
}{
{
name: "Success",
err: nil,
statusCode: http.StatusOK,
numMetrics: 10,
wantSuccess: 10,
},
{
name: "AllErrored",
err: errors.New("http error"),
statusCode: http.StatusInternalServerError,
numMetrics: 10,
wantSuccess: 0,
},
{
name: "PartialErrored",
err: errors.Join(errors.New("partial failure"), &internal.PartialSuccess{RejectedItems: 5}),
statusCode: http.StatusOK,
numMetrics: 10,
wantSuccess: 5,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
inst, collect := setup(t)
inst.ExportMetrics(t.Context(), makeResourceMetrics(tt.numMetrics)).End(tt.err, tt.statusCode)
assertMetrics(t, collect(), int64(tt.numMetrics), tt.wantSuccess, tt.err, tt.statusCode)
})
}
}
func TestInstrumentationExportMetricsInvalidPartialErrored(t *testing.T) {
inst, collect := setup(t)
const n = 10
pErr := &internal.PartialSuccess{RejectedItems: -5}
err := errors.Join(errors.New("temporary"), pErr)
inst.ExportMetrics(t.Context(), makeResourceMetrics(n)).End(err, http.StatusServiceUnavailable)
// Round -5 to 0.
success := int64(n) // (n - 0)
assertMetrics(t, collect(), n, success, err, http.StatusServiceUnavailable)
// Note: the metrics are cumulative, so account for the previous
// ExportMetrics call.
pErr.RejectedItems = n + 5
inst.ExportMetrics(t.Context(), makeResourceMetrics(n)).End(err, http.StatusServiceUnavailable)
// Round n+5 to n.
success += 0 // success + (n - n)
assertMetrics(t, collect(), n+n, success, err, http.StatusServiceUnavailable)
}
func TestBaseAttrs(t *testing.T) {
tests := []struct {
endpoint string
host string
port int
}{
// Empty.
{endpoint: "", host: "", port: -1},
// Only a port.
{endpoint: ":4318", host: "", port: 4318},
// Hostname.
{endpoint: "localhost:4318", host: "localhost", port: 4318},
{endpoint: "localhost", host: "localhost", port: -1},
// IPv4 address.
{endpoint: "127.0.0.1:4318", host: "127.0.0.1", port: 4318},
{endpoint: "127.0.0.1", host: "127.0.0.1", port: -1},
// IPv6 address.
{endpoint: "2001:0db8:85a3:0000:0000:8a2e:0370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{endpoint: "2001:db8:85a3:0:0:8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{endpoint: "2001:db8:85a3::8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{endpoint: "[2001:db8:85a3::8a2e:370:7334]", host: "2001:db8:85a3::8a2e:370:7334", port: -1},
{endpoint: "[::1]:9090", host: "::1", port: 9090},
// Port edge cases.
{endpoint: "example.com:0", host: "example.com", port: 0},
{endpoint: "example.com:65535", host: "example.com", port: 65535},
// Case insensitive.
{endpoint: "ExAmPlE.COM:8080", host: "ExAmPlE.COM", port: 8080},
}
for _, tt := range tests {
got := observ.BaseAttrs(ID, tt.endpoint)
want := []attribute.KeyValue{
semconv.OTelComponentName(observ.ComponentName(ID)),
semconv.OTelComponentTypeOtlpHTTPMetricExporter,
}
if tt.host != "" {
want = append(want, semconv.ServerAddress(tt.host))
}
if tt.port != -1 {
want = append(want, semconv.ServerPort(tt.port))
}
assert.Equal(t, want, got)
}
}
type logSink struct {
logr.LogSink
level int
msg string
keysAndValues []any
}
func (*logSink) Enabled(int) bool { return true }
func (l *logSink) Info(level int, msg string, keysAndValues ...any) {
l.level, l.msg, l.keysAndValues = level, msg, keysAndValues
l.LogSink.Info(level, msg, keysAndValues...)
}
func TestBaseAttrsError(t *testing.T) {
endpoints := []string{
"example.com:invalid", // Non-numeric port.
"example.com:8080:9090", // Multiple colons in port.
"example.com:99999", // Port out of range.
"example.com:-1", // Port out of range.
}
for _, endpoint := range endpoints {
l := &logSink{LogSink: testr.New(t).GetSink()}
t.Cleanup(func(orig logr.Logger) func() {
global.SetLogger(logr.New(l))
return func() { global.SetLogger(orig) }
}(global.GetLogger()))
// Set the logger as global so BaseAttrs can log the error.
got := observ.BaseAttrs(ID, endpoint)
want := []attribute.KeyValue{
semconv.OTelComponentName(observ.ComponentName(ID)),
semconv.OTelComponentTypeOtlpHTTPMetricExporter,
}
assert.Equal(t, want, got)
assert.Equal(t, 8, l.level, "expected Debug log level")
assert.Equal(t, "failed to parse endpoint", l.msg)
}
}
func BenchmarkInstrumentationExportMetrics(b *testing.B) {
setup := func(b *testing.B) *observ.Instrumentation {
b.Helper()
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
original := otel.GetMeterProvider()
b.Cleanup(func() { otel.SetMeterProvider(original) })
r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)
inst, err := observ.NewInstrumentation(ID, Endpoint)
if err != nil {
b.Fatalf("failed to create instrumentation: %v", err)
}
return inst
}
run := func(err error, statusCode int) func(*testing.B) {
return func(b *testing.B) {
inst := setup(b)
rm := makeResourceMetrics(10)
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
inst.ExportMetrics(b.Context(), rm).End(err, statusCode)
}
}
}
b.Run("NoError", run(nil, http.StatusOK))
err := &internal.PartialSuccess{RejectedItems: 6}
b.Run("PartialError", run(err, http.StatusOK))
b.Run("FullError", run(assert.AnError, http.StatusInternalServerError))
}
@@ -0,0 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal"
// Version is the current release version of the OpenTelemetry OTLP HTTP metric
// exporter in use.
const Version = "1.43.0"
@@ -0,0 +1,36 @@
# Experimental Features
The `otlpmetrichttp` exporter contains features that have not yet stabilized in the OpenTelemetry specification.
These features are added to the `otlpmetrichttp` exporter prior to stabilization in the specification so that users can start experimenting with them and provide feedback.
These features may change in backwards incompatible ways as feedback is applied.
See the [Compatibility and Stability](#compatibility-and-stability) section for more information.
## Features
- [Observability](#observability)
### Observability
The `otlpmetrichttp` exporter can be configured to provide observability about itself using OpenTelemetry metrics.
To opt-in, set the environment variable `OTEL_GO_X_OBSERVABILITY` to `true`.
When enabled, the exporter will create the following metrics using the global `MeterProvider`:
- `otel.sdk.exporter.metric_data_point.inflight`
- `otel.sdk.exporter.metric_data_point.exported`
- `otel.sdk.exporter.operation.duration`
Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics.
[Semantic conventions for OpenTelemetry SDK metrics]: https://github.com/open-telemetry/semantic-conventions/blob/v1.40.0/docs/otel/sdk-metrics.md
## Compatibility and Stability
Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../../../VERSIONING.md).
These features may be removed or modified in successive version releases, including patch versions.
When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.
There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version.
If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support.
@@ -0,0 +1,22 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package x // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x"
import "strings"
// Observability is an experimental feature flag that determines if exporter
// observability metrics are enabled.
//
// To enable this feature set the OTEL_GO_X_OBSERVABILITY environment variable
// to the case-insensitive string value of "true" (i.e. "True" and "TRUE"
// will also enable this).
var Observability = newFeature(
[]string{"OBSERVABILITY"},
func(v string) (string, bool) {
if strings.EqualFold(v, "true") {
return v, true
}
return "", false
},
)
@@ -0,0 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package x
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestObservability(t *testing.T) {
const key = "OTEL_GO_X_OBSERVABILITY"
require.Contains(t, Observability.Keys(), key)
t.Run("100", run(setenv(key, "100"), assertDisabled(Observability)))
t.Run("true", run(setenv(key, "true"), assertEnabled(Observability, "true")))
t.Run("True", run(setenv(key, "True"), assertEnabled(Observability, "True")))
t.Run("false", run(setenv(key, "false"), assertDisabled(Observability)))
t.Run("empty", run(assertDisabled(Observability)))
}
@@ -0,0 +1,58 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/x/x.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package x documents experimental features for [go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x].
package x // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x"
import (
"os"
)
// Feature is an experimental feature control flag. It provides a uniform way
// to interact with these feature flags and parse their values.
type Feature[T any] struct {
keys []string
parse func(v string) (T, bool)
}
func newFeature[T any](suffix []string, parse func(string) (T, bool)) Feature[T] {
const envKeyRoot = "OTEL_GO_X_"
keys := make([]string, 0, len(suffix))
for _, s := range suffix {
keys = append(keys, envKeyRoot+s)
}
return Feature[T]{
keys: keys,
parse: parse,
}
}
// Keys returns the environment variable keys that can be set to enable the
// feature.
func (f Feature[T]) Keys() []string { return f.keys }
// Lookup returns the user configured value for the feature and true if the
// user has enabled the feature. Otherwise, if the feature is not enabled, a
// zero-value and false are returned.
func (f Feature[T]) Lookup() (v T, ok bool) {
// https://github.com/open-telemetry/opentelemetry-specification/blob/62effed618589a0bec416a87e559c0a9d96289bb/specification/configuration/sdk-environment-variables.md#parsing-empty-value
//
// > The SDK MUST interpret an empty value of an environment variable the
// > same way as when the variable is unset.
for _, key := range f.keys {
vRaw := os.Getenv(key)
if vRaw != "" {
return f.parse(vRaw)
}
}
return v, ok
}
// Enabled reports whether the feature is enabled.
func (f Feature[T]) Enabled() bool {
_, ok := f.Lookup()
return ok
}
@@ -0,0 +1,75 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/x/x_test.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package x
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
mockKey = "OTEL_GO_X_MOCK_FEATURE"
mockKey2 = "OTEL_GO_X_MOCK_FEATURE2"
)
var mockFeature = newFeature([]string{"MOCK_FEATURE", "MOCK_FEATURE2"}, func(v string) (string, bool) {
if strings.EqualFold(v, "true") {
return v, true
}
return "", false
})
func TestFeature(t *testing.T) {
require.Contains(t, mockFeature.Keys(), mockKey)
require.Contains(t, mockFeature.Keys(), mockKey2)
t.Run("100", run(setenv(mockKey, "100"), assertDisabled(mockFeature)))
t.Run("true", run(setenv(mockKey, "true"), assertEnabled(mockFeature, "true")))
t.Run("True", run(setenv(mockKey, "True"), assertEnabled(mockFeature, "True")))
t.Run("false", run(setenv(mockKey, "false"), assertDisabled(mockFeature)))
t.Run("empty", run(assertDisabled(mockFeature)))
}
func run(steps ...func(*testing.T)) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
for _, step := range steps {
step(t)
}
}
}
func setenv(k, v string) func(t *testing.T) { //nolint:unparam // This is a reusable test utility function.
return func(t *testing.T) { t.Setenv(k, v) }
}
func assertEnabled[T any](f Feature[T], want T) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
assert.True(t, f.Enabled(), "not enabled")
v, ok := f.Lookup()
assert.True(t, ok, "Lookup state")
assert.Equal(t, want, v, "Lookup value")
}
}
func assertDisabled[T any](f Feature[T]) func(*testing.T) {
var zero T
return func(t *testing.T) {
t.Helper()
assert.False(t, f.Enabled(), "enabled")
v, ok := f.Lookup()
assert.False(t, ok, "Lookup state")
assert.Equal(t, zero, v, "Lookup value")
}
}
@@ -167,7 +167,7 @@ func NewInstrumentation(id int64, endpoint string) (*Instrumentation, error) {
// to set the "component.name" attribute.
//
// The endpoint is the HTTP endpoint the exporter is exporting to. It should be
// in the format "host:port" or a full URL.
// in the format "host[:port]".
func BaseAttrs(id int64, endpoint string) []attribute.KeyValue {
host, port, err := parseEndpoint(endpoint)
if err != nil || (host == "" && port < 0) {
+3
View File
@@ -56,6 +56,9 @@ modules:
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc:
version-refs:
- ./internal/version.go
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp:
version-refs:
- ./internal/version.go
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc:
version-refs:
- ./internal/version.go