1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-08 23:21:56 +02:00

feat(prometheus): Add observability for prometheus exporter (#7345)

fix #7013 

References: 

- [Follow
guidelines](a5dcd68ebb/CONTRIBUTING.md (encapsulation)).
- PR: #7307 

Implement following self-observability metrics from
https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/otel/sdk-metrics.md
for https://pkg.go.dev/go.opentelemetry.io/otel/exporters/prometheus:

- otel.sdk.exporter.metric_data_point.inflight
- otel.sdk.exporter.metric_data_point.exported
- otel.sdk.exporter.operation.duration
- otel.sdk.metric_reader.collection.duration

### Benchmarks

```console
➜ benchstat /tmp/bench_disabled.txt /tmp/bench_enabled.txt
goos: darwin
goarch: arm64
pkg: go.opentelemetry.io/otel/exporters/prometheus/internal/observ
cpu: Apple M1 Max
                                           │ /tmp/bench_disabled.txt │       /tmp/bench_enabled.txt       │
                                           │         sec/op          │   sec/op     vs base               │
InstrumentationExportMetrics-10                          177.5n ± 0%   177.8n ± 0%  +0.14% (p=0.039 n=20)
InstrumentationRecordOperationDuration-10                246.6n ± 0%   246.7n ± 0%       ~ (p=0.606 n=20)
InstrumentationRecordCollectionDuration-10               246.8n ± 1%   247.2n ± 0%       ~ (p=0.456 n=20)
geomean                                                  221.1n        221.3n       +0.09%

                                           │ /tmp/bench_disabled.txt │       /tmp/bench_enabled.txt        │
                                           │          B/op           │    B/op     vs base                 │
InstrumentationExportMetrics-10                           256.0 ± 0%   256.0 ± 0%       ~ (p=1.000 n=20) ¹
InstrumentationRecordOperationDuration-10                 272.0 ± 0%   272.0 ± 0%       ~ (p=1.000 n=20) ¹
InstrumentationRecordCollectionDuration-10                272.0 ± 0%   272.0 ± 0%       ~ (p=1.000 n=20) ¹
geomean                                                   266.6        266.6       +0.00%
¹ all samples are equal

                                           │ /tmp/bench_disabled.txt │       /tmp/bench_enabled.txt        │
                                           │        allocs/op        │ allocs/op   vs base                 │
InstrumentationExportMetrics-10                           3.000 ± 0%   3.000 ± 0%       ~ (p=1.000 n=20) ¹
InstrumentationRecordOperationDuration-10                 3.000 ± 0%   3.000 ± 0%       ~ (p=1.000 n=20) ¹
InstrumentationRecordCollectionDuration-10                3.000 ± 0%   3.000 ± 0%       ~ (p=1.000 n=20) ¹
geomean                                                   3.000        3.000       +0.00%
¹ all samples are equal
```

---------

Co-authored-by: Flc゛ <four_leaf_clover@foxmail.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Robert Wu
2025-09-18 10:52:54 -04:00
committed by GitHub
parent d1ddddeec0
commit 60f9f39d78
13 changed files with 1544 additions and 56 deletions

View File

@@ -13,6 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `WithInstrumentationAttributeSet` option to `go.opentelemetry.io/otel/log`, `go.opentelemetry.io/otel/metric`, and `go.opentelemetry.io/otel/trace` packages.
This provides a concurrent-safe and performant alternative to `WithInstrumentationAttributes` by accepting a pre-constructed `attribute.Set`. (#7287)
- Greatly reduce the cost of recording metrics in `go.opentelemetry.io/otel/sdk/metric` using hashing for map keys. (#7175)
- Add experimental observability for the prometheus exporter in `go.opentelemetry.io/otel/exporters/prometheus`.
Check the `go.opentelemetry.io/otel/exporters/prometheus/internal/x` package documentation for more information. (#7345)
### Fixed

View File

@@ -20,6 +20,8 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus/internal/counter"
"go.opentelemetry.io/otel/exporters/prometheus/internal/observ"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
@@ -92,6 +94,8 @@ type collector struct {
metricNamer otlptranslator.MetricNamer
labelNamer otlptranslator.LabelNamer
unitNamer otlptranslator.UnitNamer
inst *observ.Instrumentation
}
// New returns a Prometheus Exporter.
@@ -137,7 +141,10 @@ func New(opts ...Option) (*Exporter, error) {
Reader: reader,
}
return e, nil
var err error
collector.inst, err = observ.NewInstrumentation(counter.NextExporterID())
return e, err
}
// Describe implements prometheus.Collector.
@@ -153,9 +160,26 @@ func (*collector) Describe(chan<- *prometheus.Desc) {
//
// This method is safe to call concurrently.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
var err error
// Blocked by this issue: Propagate context.Context through Gather and Collect (#1538)
// https://github.com/prometheus/client_golang/issues/1538.
ctx := context.TODO()
if c.inst != nil {
endOp := c.inst.RecordOperationDuration(ctx)
defer func() { endOp(err) }()
}
metrics := metricsPool.Get().(*metricdata.ResourceMetrics)
defer metricsPool.Put(metrics)
err := c.reader.Collect(context.TODO(), metrics)
endCollection := func(error) {}
if c.inst != nil {
endCollection = c.inst.RecordCollectionDuration(ctx)
}
err = c.reader.Collect(ctx, metrics)
endCollection(err)
if err != nil {
if errors.Is(err, metric.ErrReaderShutdown) {
return
@@ -174,15 +198,16 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
defer c.mu.Unlock()
if c.targetInfo == nil && !c.disableTargetInfo {
targetInfo, err := c.createInfoMetric(
targetInfo, e := c.createInfoMetric(
otlptranslator.TargetInfoMetricName,
targetInfoDescription,
metrics.Resource,
)
if err != nil {
if e != nil {
// If the target info metric is invalid, disable sending it.
c.disableTargetInfo = true
otel.Handle(err)
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to createInfoMetric: %w", e))
return
}
@@ -195,14 +220,15 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
}
if c.resourceAttributesFilter != nil && len(c.resourceKeyVals.keys) == 0 {
err := c.createResourceAttributes(metrics.Resource)
if err != nil {
otel.Handle(err)
e := c.createResourceAttributes(metrics.Resource)
if e != nil {
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to createResourceAttributes: %w", e))
return
}
}
for _, scopeMetrics := range metrics.ScopeMetrics {
for j, scopeMetrics := range metrics.ScopeMetrics {
n := len(c.resourceKeyVals.keys) + 2 // resource attrs + scope name + scope version
kv := keyVals{
keys: make([]string, 0, n),
@@ -213,9 +239,10 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
kv.keys = append(kv.keys, scopeNameLabel, scopeVersionLabel, scopeSchemaLabel)
kv.vals = append(kv.vals, scopeMetrics.Scope.Name, scopeMetrics.Scope.Version, scopeMetrics.Scope.SchemaURL)
attrKeys, attrVals, err := getAttrs(scopeMetrics.Scope.Attributes, c.labelNamer)
if err != nil {
otel.Handle(err)
attrKeys, attrVals, e := getAttrs(scopeMetrics.Scope.Attributes, c.labelNamer)
if e != nil {
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to getAttrs for ScopeMetrics %d: %w", j, e))
continue
}
for i := range attrKeys {
@@ -228,16 +255,17 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
kv.keys = append(kv.keys, c.resourceKeyVals.keys...)
kv.vals = append(kv.vals, c.resourceKeyVals.vals...)
for _, m := range scopeMetrics.Metrics {
for k, m := range scopeMetrics.Metrics {
typ := c.metricType(m)
if typ == nil {
continue
}
name, err := c.getName(m)
if err != nil {
name, e := c.getName(m)
if e != nil {
// TODO(#7066): Handle this error better. It's not clear this can be
// reached, bad metric names should / will be caught at creation time.
otel.Handle(err)
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to getAttrs for ScopeMetrics %d, Metrics %d: %w", j, k, e))
continue
}
@@ -252,21 +280,21 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
switch v := m.Data.(type) {
case metricdata.Histogram[int64]:
addHistogramMetric(ch, v, m, name, kv, c.labelNamer)
addHistogramMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx)
case metricdata.Histogram[float64]:
addHistogramMetric(ch, v, m, name, kv, c.labelNamer)
addHistogramMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx)
case metricdata.ExponentialHistogram[int64]:
addExponentialHistogramMetric(ch, v, m, name, kv, c.labelNamer)
addExponentialHistogramMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx)
case metricdata.ExponentialHistogram[float64]:
addExponentialHistogramMetric(ch, v, m, name, kv, c.labelNamer)
addExponentialHistogramMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx)
case metricdata.Sum[int64]:
addSumMetric(ch, v, m, name, kv, c.labelNamer)
addSumMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx)
case metricdata.Sum[float64]:
addSumMetric(ch, v, m, name, kv, c.labelNamer)
addSumMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx)
case metricdata.Gauge[int64]:
addGaugeMetric(ch, v, m, name, kv, c.labelNamer)
addGaugeMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx)
case metricdata.Gauge[float64]:
addGaugeMetric(ch, v, m, name, kv, c.labelNamer)
addGaugeMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx)
}
}
}
@@ -332,11 +360,21 @@ func addExponentialHistogramMetric[N int64 | float64](
name string,
kv keyVals,
labelNamer otlptranslator.LabelNamer,
inst *observ.Instrumentation,
ctx context.Context,
) {
for _, dp := range histogram.DataPoints {
keys, values, err := getAttrs(dp.Attributes, labelNamer)
if err != nil {
otel.Handle(err)
var err error
var success int64
if inst != nil {
end := inst.ExportMetrics(ctx, int64(len(histogram.DataPoints)))
defer func() { end(success, err) }()
}
for j, dp := range histogram.DataPoints {
keys, values, e := getAttrs(dp.Attributes, labelNamer)
if e != nil {
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to getAttrs for histogram.DataPoints %d: %w", j, e))
continue
}
keys = append(keys, kv.keys...)
@@ -348,9 +386,12 @@ func addExponentialHistogramMetric[N int64 | float64](
scale := dp.Scale
if scale < -4 {
// Reject scales below -4 as they cannot be represented in Prometheus
otel.Handle(fmt.Errorf(
e := fmt.Errorf(
"exponential histogram scale %d is below minimum supported scale -4, skipping data point",
scale))
scale,
)
otel.Handle(e)
err = errors.Join(err, e)
continue
}
@@ -368,7 +409,9 @@ func addExponentialHistogramMetric[N int64 | float64](
positiveBuckets := make(map[int]int64)
for i, c := range positiveBucket.Counts {
if c > math.MaxInt64 {
otel.Handle(fmt.Errorf("positive count %d is too large to be represented as int64", c))
e := fmt.Errorf("positive count %d is too large to be represented as int64", c)
otel.Handle(e)
err = errors.Join(err, e)
continue
}
positiveBuckets[int(positiveBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above.
@@ -377,13 +420,15 @@ func addExponentialHistogramMetric[N int64 | float64](
negativeBuckets := make(map[int]int64)
for i, c := range negativeBucket.Counts {
if c > math.MaxInt64 {
otel.Handle(fmt.Errorf("negative count %d is too large to be represented as int64", c))
e := fmt.Errorf("negative count %d is too large to be represented as int64", c)
otel.Handle(e)
err = errors.Join(err, e)
continue
}
negativeBuckets[int(negativeBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above.
}
m, err := prometheus.NewConstNativeHistogram(
m, e := prometheus.NewConstNativeHistogram(
desc,
dp.Count,
float64(dp.Sum),
@@ -394,12 +439,18 @@ func addExponentialHistogramMetric[N int64 | float64](
dp.ZeroThreshold,
dp.StartTime,
values...)
if err != nil {
otel.Handle(err)
if e != nil {
otel.Handle(e)
err = errors.Join(
err,
fmt.Errorf("failed to NewConstNativeHistogram for histogram.DataPoints %d: %w", j, e),
)
continue
}
m = addExemplars(m, dp.Exemplars, labelNamer)
ch <- m
success++
}
}
@@ -410,11 +461,21 @@ func addHistogramMetric[N int64 | float64](
name string,
kv keyVals,
labelNamer otlptranslator.LabelNamer,
inst *observ.Instrumentation,
ctx context.Context,
) {
for _, dp := range histogram.DataPoints {
keys, values, err := getAttrs(dp.Attributes, labelNamer)
if err != nil {
otel.Handle(err)
var err error
var success int64
if inst != nil {
end := inst.ExportMetrics(ctx, int64(len(histogram.DataPoints)))
defer func() { end(success, err) }()
}
for j, dp := range histogram.DataPoints {
keys, values, e := getAttrs(dp.Attributes, labelNamer)
if e != nil {
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to getAttrs for histogram.DataPoints %d: %w", j, e))
continue
}
keys = append(keys, kv.keys...)
@@ -428,13 +489,16 @@ func addHistogramMetric[N int64 | float64](
cumulativeCount += dp.BucketCounts[i]
buckets[bound] = cumulativeCount
}
m, err := prometheus.NewConstHistogram(desc, dp.Count, float64(dp.Sum), buckets, values...)
if err != nil {
otel.Handle(err)
m, e := prometheus.NewConstHistogram(desc, dp.Count, float64(dp.Sum), buckets, values...)
if e != nil {
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to NewConstMetric for histogram.DataPoints %d: %w", j, e))
continue
}
m = addExemplars(m, dp.Exemplars, labelNamer)
ch <- m
success++
}
}
@@ -445,25 +509,36 @@ func addSumMetric[N int64 | float64](
name string,
kv keyVals,
labelNamer otlptranslator.LabelNamer,
inst *observ.Instrumentation,
ctx context.Context,
) {
var err error
var success int64
if inst != nil {
end := inst.ExportMetrics(ctx, int64(len(sum.DataPoints)))
defer func() { end(success, err) }()
}
valueType := prometheus.CounterValue
if !sum.IsMonotonic {
valueType = prometheus.GaugeValue
}
for _, dp := range sum.DataPoints {
keys, values, err := getAttrs(dp.Attributes, labelNamer)
if err != nil {
otel.Handle(err)
for i, dp := range sum.DataPoints {
keys, values, e := getAttrs(dp.Attributes, labelNamer)
if e != nil {
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to getAttrs for sum.DataPoints %d: %w", i, e))
continue
}
keys = append(keys, kv.keys...)
values = append(values, kv.vals...)
desc := prometheus.NewDesc(name, m.Description, keys, nil)
m, err := prometheus.NewConstMetric(desc, valueType, float64(dp.Value), values...)
if err != nil {
otel.Handle(err)
m, e := prometheus.NewConstMetric(desc, valueType, float64(dp.Value), values...)
if e != nil {
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to NewConstMetric for sum.DataPoints %d: %w", i, e))
continue
}
// GaugeValues don't support Exemplars at this time
@@ -472,6 +547,8 @@ func addSumMetric[N int64 | float64](
m = addExemplars(m, dp.Exemplars, labelNamer)
}
ch <- m
success++
}
}
@@ -482,23 +559,36 @@ func addGaugeMetric[N int64 | float64](
name string,
kv keyVals,
labelNamer otlptranslator.LabelNamer,
inst *observ.Instrumentation,
ctx context.Context,
) {
for _, dp := range gauge.DataPoints {
keys, values, err := getAttrs(dp.Attributes, labelNamer)
if err != nil {
otel.Handle(err)
var err error
var success int64
if inst != nil {
end := inst.ExportMetrics(ctx, int64(len(gauge.DataPoints)))
defer func() { end(success, err) }()
}
for i, dp := range gauge.DataPoints {
keys, values, e := getAttrs(dp.Attributes, labelNamer)
if e != nil {
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to getAttrs for gauge.DataPoints %d: %w", i, e))
continue
}
keys = append(keys, kv.keys...)
values = append(values, kv.vals...)
desc := prometheus.NewDesc(name, m.Description, keys, nil)
m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(dp.Value), values...)
if err != nil {
otel.Handle(err)
m, e := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(dp.Value), values...)
if e != nil {
otel.Handle(e)
err = errors.Join(err, fmt.Errorf("failed to NewConstMetric for gauge.DataPoints %d: %w", i, e))
continue
}
ch <- m
success++
}
}

View File

@@ -22,7 +22,9 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus/internal/observ"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
@@ -1349,6 +1351,8 @@ func TestExponentialHistogramScaleValidation(t *testing.T) {
"test_histogram",
keyVals{},
otlptranslator.LabelNamer{},
nil,
context.Background(),
)
assert.Error(t, capturedError)
assert.Contains(t, capturedError.Error(), "scale -5 is below minimum")
@@ -1513,6 +1517,8 @@ func TestExponentialHistogramHighScaleDownscaling(t *testing.T) {
"test_high_scale_histogram",
keyVals{},
otlptranslator.LabelNamer{},
nil,
context.Background(),
)
// Verify a metric was produced
@@ -1575,6 +1581,8 @@ func TestExponentialHistogramHighScaleDownscaling(t *testing.T) {
"test_very_high_scale_histogram",
keyVals{},
otlptranslator.LabelNamer{},
nil,
context.Background(),
)
// Verify a metric was produced
@@ -1637,6 +1645,8 @@ func TestExponentialHistogramHighScaleDownscaling(t *testing.T) {
"test_histogram_with_negative_buckets",
keyVals{},
otlptranslator.LabelNamer{},
nil,
context.Background(),
)
// Verify a metric was produced
@@ -1693,6 +1703,8 @@ func TestExponentialHistogramHighScaleDownscaling(t *testing.T) {
"test_int64_exponential_histogram",
keyVals{},
otlptranslator.LabelNamer{},
nil,
context.Background(),
)
// Verify a metric was produced
@@ -1895,3 +1907,544 @@ func TestEscapingErrorHandling(t *testing.T) {
})
}
}
func TestExporterSelfInstrumentation(t *testing.T) {
testCases := []struct {
name string
enableObservability bool
recordMetrics func(ctx context.Context, meter otelmetric.Meter)
expectedObservMetrics []string
expectedMainMetrics int
checkMetrics func(t *testing.T, mainMetrics []*dto.MetricFamily, observMetrics metricdata.ScopeMetrics)
}{
{
name: "self instrumentation disabled",
enableObservability: false,
recordMetrics: func(ctx context.Context, meter otelmetric.Meter) {
counter, err := meter.Int64Counter("test_counter")
require.NoError(t, err)
counter.Add(ctx, 1)
},
expectedObservMetrics: []string{}, // No observability metrics expected
expectedMainMetrics: 2, // test counter + target_info
},
{
name: "self instrumentation enabled with counter",
enableObservability: true,
recordMetrics: func(ctx context.Context, meter otelmetric.Meter) {
counter, err := meter.Int64Counter("test_counter", otelmetric.WithDescription("test counter"))
require.NoError(t, err)
counter.Add(ctx, 1, otelmetric.WithAttributes(attribute.String("key", "value")))
},
expectedObservMetrics: []string{
"otel.sdk.exporter.metric_data_point.inflight",
"otel.sdk.exporter.metric_data_point.exported",
"otel.sdk.exporter.operation.duration",
"otel.sdk.metric_reader.collection.duration",
},
expectedMainMetrics: 2, // test counter + target_info
checkMetrics: func(t *testing.T, _ []*dto.MetricFamily, observMetrics metricdata.ScopeMetrics) {
// Check that exported metrics include success count
for _, m := range observMetrics.Metrics {
if m.Name == "otel.sdk.exporter.metric_data_point.exported" {
sum, ok := m.Data.(metricdata.Sum[int64])
require.True(t, ok)
require.Len(t, sum.DataPoints, 1)
require.Equal(t, int64(1), sum.DataPoints[0].Value)
}
}
},
},
{
name: "self instrumentation enabled with gauge",
enableObservability: true,
recordMetrics: func(ctx context.Context, meter otelmetric.Meter) {
gauge, err := meter.Float64Gauge("test_gauge", otelmetric.WithDescription("test gauge"))
require.NoError(t, err)
gauge.Record(ctx, 42.5, otelmetric.WithAttributes(attribute.String("key", "value")))
},
expectedObservMetrics: []string{
"otel.sdk.exporter.metric_data_point.inflight",
"otel.sdk.exporter.metric_data_point.exported",
"otel.sdk.exporter.operation.duration",
"otel.sdk.metric_reader.collection.duration",
},
expectedMainMetrics: 2,
},
{
name: "self instrumentation enabled with histogram",
enableObservability: true,
recordMetrics: func(ctx context.Context, meter otelmetric.Meter) {
histogram, err := meter.Float64Histogram("test_histogram", otelmetric.WithDescription("test histogram"))
require.NoError(t, err)
histogram.Record(ctx, 1.5, otelmetric.WithAttributes(attribute.String("key", "value")))
histogram.Record(ctx, 2.5, otelmetric.WithAttributes(attribute.String("key", "value")))
},
expectedObservMetrics: []string{
"otel.sdk.exporter.metric_data_point.inflight",
"otel.sdk.exporter.metric_data_point.exported",
"otel.sdk.exporter.operation.duration",
"otel.sdk.metric_reader.collection.duration",
},
expectedMainMetrics: 2,
},
{
name: "self instrumentation with multiple metrics",
enableObservability: true,
recordMetrics: func(ctx context.Context, meter otelmetric.Meter) {
counter, err := meter.Int64Counter("test_counter")
require.NoError(t, err)
counter.Add(ctx, 5, otelmetric.WithAttributes(attribute.String("type", "requests")))
counter.Add(ctx, 3, otelmetric.WithAttributes(attribute.String("type", "errors")))
gauge, err := meter.Float64Gauge("test_gauge")
require.NoError(t, err)
gauge.Record(ctx, 100.0, otelmetric.WithAttributes(attribute.String("status", "active")))
histogram, err := meter.Float64Histogram("test_histogram")
require.NoError(t, err)
histogram.Record(ctx, 0.1)
histogram.Record(ctx, 0.2)
histogram.Record(ctx, 0.3)
},
expectedObservMetrics: []string{
"otel.sdk.exporter.metric_data_point.inflight",
"otel.sdk.exporter.metric_data_point.exported",
"otel.sdk.exporter.operation.duration",
"otel.sdk.metric_reader.collection.duration",
},
expectedMainMetrics: 4, // 3 test metrics + target_info
checkMetrics: func(t *testing.T, _ []*dto.MetricFamily, observMetrics metricdata.ScopeMetrics) {
// Check that exported metrics track multiple data points
for _, m := range observMetrics.Metrics {
if m.Name == "otel.sdk.exporter.metric_data_point.exported" {
sum, ok := m.Data.(metricdata.Sum[int64])
require.True(t, ok)
require.Len(t, sum.DataPoints, 1)
// Counter: 2 data points, Gauge: 1 data point, Histogram: 1 data point = 4 total
require.Equal(t, int64(4), sum.DataPoints[0].Value)
}
}
},
},
{
name: "self instrumentation enabled with up-down counter",
enableObservability: true,
recordMetrics: func(ctx context.Context, meter otelmetric.Meter) {
upDownCounter, err := meter.Int64UpDownCounter(
"test_updown_counter",
otelmetric.WithDescription("test up-down counter"),
)
require.NoError(t, err)
upDownCounter.Add(ctx, 10, otelmetric.WithAttributes(attribute.String("direction", "up")))
upDownCounter.Add(ctx, -5, otelmetric.WithAttributes(attribute.String("direction", "down")))
},
expectedObservMetrics: []string{
"otel.sdk.exporter.metric_data_point.inflight",
"otel.sdk.exporter.metric_data_point.exported",
"otel.sdk.exporter.operation.duration",
"otel.sdk.metric_reader.collection.duration",
},
expectedMainMetrics: 2,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.enableObservability {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
} else {
t.Setenv("OTEL_GO_X_OBSERVABILITY", "")
}
// Setup observability metric collection
var observReader *metric.ManualReader
var observMetricsFunc func() metricdata.ScopeMetrics
if tc.enableObservability {
originalMP := otel.GetMeterProvider()
defer otel.SetMeterProvider(originalMP)
observReader = metric.NewManualReader()
observMP := metric.NewMeterProvider(metric.WithReader(observReader))
otel.SetMeterProvider(observMP)
observMetricsFunc = func() metricdata.ScopeMetrics {
var rm metricdata.ResourceMetrics
err := observReader.Collect(context.Background(), &rm)
require.NoError(t, err)
if len(rm.ScopeMetrics) == 0 {
return metricdata.ScopeMetrics{}
}
return rm.ScopeMetrics[0]
}
}
ctx := context.Background()
registry := prometheus.NewRegistry()
exporter, err := New(WithRegisterer(registry))
require.NoError(t, err)
provider := metric.NewMeterProvider(metric.WithReader(exporter))
meter := provider.Meter("test", otelmetric.WithInstrumentationVersion("v1.0.0"))
// Record the test metrics
tc.recordMetrics(ctx, meter)
// Collect main metrics
mainMetrics, err := registry.Gather()
require.NoError(t, err)
// Verify the number of main metric families
assert.Len(t, mainMetrics, tc.expectedMainMetrics)
// Collect and check observability metrics if enabled
if tc.enableObservability {
observMetrics := observMetricsFunc()
// Check that expected observability metrics are present
observedMetrics := make(map[string]bool)
for _, m := range observMetrics.Metrics {
observedMetrics[m.Name] = true
}
for _, expectedMetric := range tc.expectedObservMetrics {
assert.True(
t,
observedMetrics[expectedMetric],
"Expected observability metric %s not found",
expectedMetric,
)
}
// Verify observability metrics have expected structure
expectedScope := instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: observ.SchemaURL,
}
assert.Equal(t, expectedScope, observMetrics.Scope, "Expected observability scope")
assert.Len(
t,
observMetrics.Metrics,
len(tc.expectedObservMetrics),
"Expected number of observability metrics",
)
// Run custom metric checks if provided
if tc.checkMetrics != nil {
tc.checkMetrics(t, mainMetrics, observMetrics)
}
}
})
}
}
func TestExporterSelfInstrumentationErrors(t *testing.T) {
testCases := []struct {
name string
setupError func() (metric.Reader, func())
expectedMinMetrics int // Minimum expected metrics in error scenarios
checkErrorAttributes func(t *testing.T, observMetrics metricdata.ScopeMetrics)
}{
{
name: "reader shutdown error",
setupError: func() (metric.Reader, func()) {
reader := metric.NewManualReader()
return reader, func() { _ = reader.Shutdown(context.Background()) }
},
expectedMinMetrics: 1, // At least some metrics should be present
},
{
name: "reader not registered error",
setupError: func() (metric.Reader, func()) {
reader := metric.NewManualReader()
// Don't register the reader with a provider
return reader, func() {}
},
expectedMinMetrics: 1, // At least some metrics should be present
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Enable observability
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
// Setup observability metric collection
originalMP := otel.GetMeterProvider()
defer otel.SetMeterProvider(originalMP)
observReader := metric.NewManualReader()
observMP := metric.NewMeterProvider(metric.WithReader(observReader))
otel.SetMeterProvider(observMP)
registry := prometheus.NewRegistry()
reader, cleanup := tc.setupError()
defer cleanup()
// Create exporter with the error-prone reader
cfg := newConfig()
cfg.registerer = registry
collector := &collector{
reader: reader,
disableTargetInfo: cfg.disableTargetInfo,
withoutUnits: cfg.withoutUnits,
withoutCounterSuffixes: cfg.withoutCounterSuffixes,
disableScopeInfo: cfg.disableScopeInfo,
metricFamilies: make(map[string]*dto.MetricFamily),
namespace: cfg.namespace,
resourceAttributesFilter: cfg.resourceAttributesFilter,
metricNamer: otlptranslator.NewMetricNamer(cfg.namespace, cfg.translationStrategy),
}
var err error
collector.inst, err = observ.NewInstrumentation(0)
require.NoError(t, err)
err = registry.Register(collector)
require.NoError(t, err)
// Trigger collection which should encounter the error
_, err = registry.Gather()
require.NoError(t, err)
// Collect observability metrics
var observMetrics metricdata.ResourceMetrics
err = observReader.Collect(context.Background(), &observMetrics)
require.NoError(t, err)
if len(observMetrics.ScopeMetrics) > 0 {
// Verify observability metrics are still present (at least some)
scopeMetrics := observMetrics.ScopeMetrics[0]
foundObservMetrics := 0
for _, m := range scopeMetrics.Metrics {
switch m.Name {
case "otel.sdk.exporter.metric_data_point.inflight",
"otel.sdk.exporter.metric_data_point.exported",
"otel.sdk.exporter.operation.duration",
"otel.sdk.metric_reader.collection.duration":
foundObservMetrics++
}
}
assert.GreaterOrEqual(
t,
foundObservMetrics,
tc.expectedMinMetrics,
"Should have at least some observability metrics even with errors",
)
if tc.checkErrorAttributes != nil {
tc.checkErrorAttributes(t, scopeMetrics)
}
}
})
}
}
func TestExporterSelfInstrumentationConcurrency(t *testing.T) {
// Enable observability
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
// Setup observability metric collection
originalMP := otel.GetMeterProvider()
defer otel.SetMeterProvider(originalMP)
observReader := metric.NewManualReader()
observMP := metric.NewMeterProvider(metric.WithReader(observReader))
otel.SetMeterProvider(observMP)
ctx := context.Background()
registry := prometheus.NewRegistry()
exporter, err := New(WithRegisterer(registry))
require.NoError(t, err)
provider := metric.NewMeterProvider(metric.WithReader(exporter))
meter := provider.Meter("concurrent_test", otelmetric.WithInstrumentationVersion("v1.0.0"))
counter, err := meter.Int64Counter("concurrent_counter")
require.NoError(t, err)
// Run concurrent operations
const numGoroutines = 10
const numOperations = 100
var wg sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 0; j < numOperations; j++ {
counter.Add(ctx, 1, otelmetric.WithAttributes(attribute.Int("goroutine", id)))
// Occasionally trigger collection
if j%10 == 0 {
_, _ = registry.Gather()
}
}
}(i)
}
wg.Wait()
// Final collection
_, err = registry.Gather()
require.NoError(t, err)
// Collect observability metrics
var observMetrics metricdata.ResourceMetrics
err = observReader.Collect(context.Background(), &observMetrics)
require.NoError(t, err)
if len(observMetrics.ScopeMetrics) > 0 {
scopeMetrics := observMetrics.ScopeMetrics[0]
// Verify observability metrics are present and have reasonable values
for _, m := range scopeMetrics.Metrics {
switch m.Name {
case "otel.sdk.exporter.metric_data_point.exported":
sum, ok := m.Data.(metricdata.Sum[int64])
require.True(t, ok)
require.NotEmpty(t, sum.DataPoints)
// Should have exported many data points
assert.Positive(t, sum.DataPoints[0].Value)
case "otel.sdk.exporter.operation.duration":
hist, ok := m.Data.(metricdata.Histogram[float64])
require.True(t, ok)
require.NotEmpty(t, hist.DataPoints)
// Should have recorded operation durations
assert.Positive(t, hist.DataPoints[0].Count)
case "otel.sdk.metric_reader.collection.duration":
hist, ok := m.Data.(metricdata.Histogram[float64])
require.True(t, ok)
require.NotEmpty(t, hist.DataPoints)
// Should have recorded collection durations
assert.Positive(t, hist.DataPoints[0].Count)
}
}
}
}
func TestExporterSelfInstrumentationExemplarHandling(t *testing.T) {
// Enable observability
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
// Setup observability metric collection
originalMP := otel.GetMeterProvider()
defer otel.SetMeterProvider(originalMP)
observReader := metric.NewManualReader()
observMP := metric.NewMeterProvider(metric.WithReader(observReader))
otel.SetMeterProvider(observMP)
ctx := context.Background()
registry := prometheus.NewRegistry()
exporter, err := New(WithRegisterer(registry))
require.NoError(t, err)
provider := metric.NewMeterProvider(metric.WithReader(exporter))
meter := provider.Meter("exemplar_test", otelmetric.WithInstrumentationVersion("v1.0.0"))
// Create trace context for exemplars
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16},
SpanID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8},
TraceFlags: trace.FlagsSampled,
})
ctx = trace.ContextWithSpanContext(ctx, sc)
histogram, err := meter.Float64Histogram("test_histogram_with_exemplars")
require.NoError(t, err)
// Record values that should generate exemplars
histogram.Record(ctx, 1.0, otelmetric.WithAttributes(attribute.String("key", "value1")))
histogram.Record(ctx, 2.0, otelmetric.WithAttributes(attribute.String("key", "value2")))
// Collect metrics
got, err := registry.Gather()
require.NoError(t, err)
// Verify that metrics are collected without errors even when exemplars are present
foundTestHistogram := false
for _, mf := range got {
if *mf.Name == "test_histogram_with_exemplars" {
foundTestHistogram = true
assert.Equal(t, dto.MetricType_HISTOGRAM, *mf.Type)
}
}
assert.True(t, foundTestHistogram, "Test histogram should be present")
// Collect observability metrics
var observMetrics metricdata.ResourceMetrics
err = observReader.Collect(context.Background(), &observMetrics)
require.NoError(t, err)
if len(observMetrics.ScopeMetrics) > 0 {
scopeMetrics := observMetrics.ScopeMetrics[0]
foundObservabilityMetrics := 0
for _, m := range scopeMetrics.Metrics {
switch m.Name {
case "otel.sdk.exporter.metric_data_point.inflight",
"otel.sdk.exporter.metric_data_point.exported",
"otel.sdk.exporter.operation.duration",
"otel.sdk.metric_reader.collection.duration":
foundObservabilityMetrics++
}
}
assert.Equal(t, 4, foundObservabilityMetrics, "All observability metrics should be present")
}
}
func TestExporterSelfInstrumentationInitErrors(t *testing.T) {
// Test when NewInstrumentation returns an error
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
// Set up a meter provider that will cause NewInstrumentation to fail
original := otel.GetMeterProvider()
defer otel.SetMeterProvider(original)
errMP := &errMeterProvider{err: assert.AnError}
otel.SetMeterProvider(errMP)
registry := prometheus.NewRegistry()
// This should fail during exporter creation due to instrumentation init error
_, err := New(WithRegisterer(registry))
require.ErrorIs(t, err, assert.AnError, "Expected NewInstrumentation error to be propagated")
}
// Helper types for testing NewInstrumentation errors.
type errMeterProvider struct {
otelmetric.MeterProvider
err error
}
func (m *errMeterProvider) Meter(string, ...otelmetric.MeterOption) otelmetric.Meter {
return &errMeter{err: m.err}
}
type errMeter struct {
otelmetric.Meter
err error
}
func (m *errMeter) Int64UpDownCounter(
string,
...otelmetric.Int64UpDownCounterOption,
) (otelmetric.Int64UpDownCounter, error) {
return nil, m.err
}
func (m *errMeter) Int64Counter(string, ...otelmetric.Int64CounterOption) (otelmetric.Int64Counter, error) {
return nil, m.err
}
func (m *errMeter) Float64Histogram(string, ...otelmetric.Float64HistogramOption) (otelmetric.Float64Histogram, error) {
return nil, m.err
}

View File

@@ -0,0 +1,31 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/counter/counter.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package counter provides a simple counter for generating unique IDs.
//
// This package is used to generate unique IDs while allowing testing packages
// to reset the counter.
package counter // import "go.opentelemetry.io/otel/exporters/prometheus/internal/counter"
import "sync/atomic"
// exporterN is a global 0-based count of the number of exporters created.
var exporterN atomic.Int64
// NextExporterID returns the next unique ID for an exporter.
func NextExporterID() int64 {
const inc = 1
return exporterN.Add(inc) - inc
}
// SetExporterID sets the exporter ID counter to v and returns the previous
// value.
//
// This function is useful for testing purposes, allowing you to reset the
// counter. It should not be used in production code.
func SetExporterID(v int64) int64 {
return exporterN.Swap(v)
}

View File

@@ -0,0 +1,65 @@
// Code generated by gotmpl. DO NOT MODIFY.
// source: internal/shared/counter/counter_test.go.tmpl
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package counter
import (
"sync"
"testing"
)
func TestNextExporterID(t *testing.T) {
SetExporterID(0)
var expected int64
for range 10 {
id := NextExporterID()
if id != expected {
t.Errorf("NextExporterID() = %d; want %d", id, expected)
}
expected++
}
}
func TestSetExporterID(t *testing.T) {
SetExporterID(0)
prev := SetExporterID(42)
if prev != 0 {
t.Errorf("SetExporterID(42) returned %d; want 0", prev)
}
id := NextExporterID()
if id != 42 {
t.Errorf("NextExporterID() = %d; want 42", id)
}
}
func TestNextExporterIDConcurrentSafe(t *testing.T) {
SetExporterID(0)
const goroutines = 100
const increments = 10
var wg sync.WaitGroup
wg.Add(goroutines)
for range goroutines {
go func() {
defer wg.Done()
for range increments {
NextExporterID()
}
}()
}
wg.Wait()
expected := int64(goroutines * increments)
if id := NextExporterID(); id != expected {
t.Errorf("NextExporterID() = %d; want %d", id, expected)
}
}

View File

@@ -0,0 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package internal provides internal functionality for the prometheus
// package.
package internal // import "go.opentelemetry.io/otel/exporters/prometheus/internal"
//go:generate gotmpl --body=../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/prometheus/internal/counter\" }" --out=counter/counter.go
//go:generate gotmpl --body=../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go

View File

@@ -0,0 +1,224 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package observ provides experimental observability instrumentation
// for the prometheus exporter.
package observ // import "go.opentelemetry.io/otel/exporters/prometheus/internal/observ"
import (
"context"
"errors"
"fmt"
"sync"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus/internal"
"go.opentelemetry.io/otel/exporters/prometheus/internal/x"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
const (
// ComponentType uniquely identifies the OpenTelemetry Exporter component
// being instrumented.
ComponentType = "go.opentelemetry.io/otel/exporters/prometheus/prometheus.Exporter"
// ScopeName is the unique name of the meter used for instrumentation.
ScopeName = "go.opentelemetry.io/otel/exporters/prometheus/internal/observ"
// SchemaURL is the schema URL of the metrics produced by this
// instrumentation.
SchemaURL = semconv.SchemaURL
// Version is the current version of this instrumentation.
//
// This matches the version of the exporter.
Version = internal.Version
)
var (
measureAttrsPool = &sync.Pool{
New: func() any {
// "component.name" + "component.type" + "error.type"
const n = 1 + 1 + 1
s := make([]attribute.KeyValue, 0, n)
// Return a pointer to a slice instead of a slice itself
// to avoid allocations on every call.
return &s
},
}
addOptPool = &sync.Pool{
New: func() any {
const n = 1 // WithAttributeSet
o := make([]metric.AddOption, 0, n)
return &o
},
}
recordOptPool = &sync.Pool{
New: func() any {
const n = 1 // WithAttributeSet
o := make([]metric.RecordOption, 0, n)
return &o
},
}
)
func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) }
func put[T any](p *sync.Pool, s *[]T) {
*s = (*s)[:0] // Reset.
p.Put(s)
}
func ComponentName(id int64) string {
return fmt.Sprintf("%s/%d", ComponentType, id)
}
type Instrumentation struct {
inflightMetric metric.Int64UpDownCounter
exportedMetric metric.Int64Counter
operationDuration metric.Float64Histogram
collectionDuration metric.Float64Histogram
attrs []attribute.KeyValue
setOpt metric.MeasurementOption
}
func NewInstrumentation(id int64) (*Instrumentation, error) {
if !x.Observability.Enabled() {
return nil, nil
}
i := &Instrumentation{
attrs: []attribute.KeyValue{
semconv.OTelComponentName(ComponentName(id)),
semconv.OTelComponentTypeKey.String(ComponentType),
},
}
s := attribute.NewSet(i.attrs...)
i.setOpt = metric.WithAttributeSet(s)
mp := otel.GetMeterProvider()
m := mp.Meter(
ScopeName,
metric.WithInstrumentationVersion(Version),
metric.WithSchemaURL(SchemaURL),
)
var err, e error
inflightMetric, e := otelconv.NewSDKExporterMetricDataPointInflight(m)
if e != nil {
e = fmt.Errorf("failed to create inflight metric: %w", e)
err = errors.Join(err, e)
}
i.inflightMetric = inflightMetric.Inst()
exportedMetric, e := otelconv.NewSDKExporterMetricDataPointExported(m)
if e != nil {
e = fmt.Errorf("failed to create exported metric: %w", e)
err = errors.Join(err, e)
}
i.exportedMetric = exportedMetric.Inst()
operationDuration, e := otelconv.NewSDKExporterOperationDuration(m)
if e != nil {
e = fmt.Errorf("failed to create operation duration metric: %w", e)
err = errors.Join(err, e)
}
i.operationDuration = operationDuration.Inst()
collectionDuration, e := otelconv.NewSDKMetricReaderCollectionDuration(m)
if e != nil {
e = fmt.Errorf("failed to create collection duration metric: %w", e)
err = errors.Join(err, e)
}
i.collectionDuration = collectionDuration.Inst()
return i, err
}
// RecordDurationDone is a function that is called when a call to an Exporters'
// RecordOperationDuration or RecordCollectionDuration completes.
//
// Any error that is encountered is provided as err.
type RecordDurationDone func(error)
func (i *Instrumentation) RecordOperationDuration(ctx context.Context) RecordDurationDone {
return i.recordDuration(ctx, i.operationDuration)
}
func (i *Instrumentation) RecordCollectionDuration(ctx context.Context) RecordDurationDone {
return i.recordDuration(ctx, i.collectionDuration)
}
func (i *Instrumentation) recordDuration(
ctx context.Context,
h metric.Float64Histogram,
) RecordDurationDone {
start := time.Now()
return func(err error) {
recordOpt := get[metric.RecordOption](recordOptPool)
defer put(recordOptPool, recordOpt)
*recordOpt = append(*recordOpt, i.setOpt)
if err != nil {
attrs := get[attribute.KeyValue](measureAttrsPool)
defer put(measureAttrsPool, attrs)
*attrs = append(*attrs, i.attrs...)
*attrs = append(*attrs, semconv.ErrorType(err))
set := attribute.NewSet(*attrs...)
*recordOpt = append((*recordOpt)[:0], metric.WithAttributeSet(set))
}
h.Record(ctx, time.Since(start).Seconds(), *recordOpt...)
}
}
// ExportMetricsDone is a function that is called when a call to an Exporter's
// export methods completes.
//
// The number of successful exports is provided as success. Any error that is
// encountered is provided as err.
type ExportMetricsDone func(success int64, err error)
func (i *Instrumentation) ExportMetrics(ctx context.Context, n int64) ExportMetricsDone {
addOpt := get[metric.AddOption](addOptPool)
defer put(addOptPool, addOpt)
*addOpt = append(*addOpt, i.setOpt)
i.inflightMetric.Add(ctx, n, *addOpt...)
return i.end(ctx, n)
}
func (i *Instrumentation) end(ctx context.Context, n int64) ExportMetricsDone {
return func(success int64, err error) {
addOpt := get[metric.AddOption](addOptPool)
defer put(addOptPool, addOpt)
*addOpt = append(*addOpt, i.setOpt)
i.inflightMetric.Add(ctx, -n, *addOpt...)
i.exportedMetric.Add(ctx, success, *addOpt...)
if err != nil {
attrs := get[attribute.KeyValue](measureAttrsPool)
defer put(measureAttrsPool, attrs)
*attrs = append(*attrs, i.attrs...)
*attrs = append(*attrs, semconv.ErrorType(err))
set := attribute.NewSet(*attrs...)
*addOpt = append((*addOpt)[:0], metric.WithAttributeSet(set))
i.exportedMetric.Add(ctx, n-success, *addOpt...)
}
}
}

View File

@@ -0,0 +1,334 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package observ_test
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
observ "go.opentelemetry.io/otel/exporters/prometheus/internal/observ"
mapi "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)
const ID = 0
var Scope = instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: observ.SchemaURL,
}
type errMeterProvider struct {
mapi.MeterProvider
err error
}
func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter {
return &errMeter{err: m.err}
}
type errMeter struct {
mapi.Meter
err error
}
func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) {
return nil, m.err
}
func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) {
return nil, m.err
}
func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) {
return nil, m.err
}
func TestNewInstrumentationObservabilityErrors(t *testing.T) {
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })
mp := &errMeterProvider{err: assert.AnError}
otel.SetMeterProvider(mp)
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
_, err := observ.NewInstrumentation(ID)
require.ErrorIs(t, err, assert.AnError, "new instrument errors should be joined")
assert.ErrorContains(t, err, "inflight metric")
assert.ErrorContains(t, err, "exported metric")
assert.ErrorContains(t, err, "operation duration metric")
assert.ErrorContains(t, err, "collection duration metric")
}
func TestNewInstrumentationObservabilityDisabled(t *testing.T) {
// Do not set OTEL_GO_X_OBSERVABILITY.
got, err := observ.NewInstrumentation(ID)
assert.NoError(t, err)
assert.Nil(t, got)
}
// setup installs a ManualReader MeterProvider and returns an instantiated
// Instrumentation plus a collector that returns the single ScopeMetrics group.
func setup(t *testing.T) (*observ.Instrumentation, func() metricdata.ScopeMetrics) {
t.Helper()
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
original := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(original) })
r := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(r))
otel.SetMeterProvider(mp)
inst, err := observ.NewInstrumentation(ID)
require.NoError(t, err)
require.NotNil(t, inst)
return inst, func() metricdata.ScopeMetrics {
var rm metricdata.ResourceMetrics
require.NoError(t, r.Collect(context.Background(), &rm))
require.Len(t, rm.ScopeMetrics, 1)
return rm.ScopeMetrics[0]
}
}
func set(err error) attribute.Set {
attrs := []attribute.KeyValue{
semconv.OTelComponentName(observ.ComponentName(ID)),
semconv.OTelComponentTypeKey.String(observ.ComponentType),
}
if err != nil {
attrs = append(attrs, semconv.ErrorType(err))
}
return attribute.NewSet(attrs...)
}
func scrapeInflight() metricdata.Metrics {
return metricdata.Metrics{
Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(),
Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(),
Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: set(nil), Value: 0},
},
},
}
}
func scrapeExported(success, total int64, err error) metricdata.Metrics {
dps := []metricdata.DataPoint[int64]{
{Attributes: set(nil), Value: success},
}
if err != nil {
dps = append(dps, metricdata.DataPoint[int64]{
Attributes: set(err),
Value: total - success,
})
}
return metricdata.Metrics{
Name: otelconv.SDKExporterMetricDataPointExported{}.Name(),
Description: otelconv.SDKExporterMetricDataPointExported{}.Description(),
Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(),
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: dps,
},
}
}
func operationDuration(err error) metricdata.Metrics {
return metricdata.Metrics{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{Attributes: set(err)},
},
},
}
}
func collectionDuration(err error) metricdata.Metrics {
return metricdata.Metrics{
Name: otelconv.SDKMetricReaderCollectionDuration{}.Name(),
Description: otelconv.SDKMetricReaderCollectionDuration{}.Description(),
Unit: otelconv.SDKMetricReaderCollectionDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[float64]{
{Attributes: set(err)},
},
},
}
}
func assertExportMetricsMetrics(t *testing.T, got metricdata.ScopeMetrics, total, success int64, err error) {
t.Helper()
assert.Equal(t, Scope, got.Scope, "unexpected scope")
m := got.Metrics
require.Len(t, m, 3, "expected 3 metrics (inflight, exported, operationDuration)")
o := metricdatatest.IgnoreTimestamp()
want := scrapeInflight()
metricdatatest.AssertEqual(t, want, m[0], o)
want = scrapeExported(success, total, err)
metricdatatest.AssertEqual(t, want, m[1], o)
want = operationDuration(err)
metricdatatest.AssertEqual(t, want, m[2], o, metricdatatest.IgnoreValue())
}
func assertCollectionOnly(t *testing.T, got metricdata.ScopeMetrics, err error) {
t.Helper()
assert.Equal(t, Scope, got.Scope, "unexpected scope")
m := got.Metrics
require.Len(t, m, 1, "expected only collectionDuration metric")
o := metricdatatest.IgnoreTimestamp()
want := collectionDuration(err)
metricdatatest.AssertEqual(t, want, m[0], o, metricdatatest.IgnoreValue())
}
func TestInstrumentationExportMetricsSuccess(t *testing.T) {
inst, collect := setup(t)
const n = 10
endOp := inst.RecordOperationDuration(context.Background())
end := inst.ExportMetrics(context.Background(), n)
end(n, nil)
endOp(nil)
assertExportMetricsMetrics(t, collect(), n, n, nil)
}
func TestInstrumentationExportMetricsAllErrored(t *testing.T) {
inst, collect := setup(t)
const n = 10
err := assert.AnError
endOp := inst.RecordOperationDuration(context.Background())
end := inst.ExportMetrics(context.Background(), n)
const success = 0
end(success, err)
endOp(err)
assertExportMetricsMetrics(t, collect(), n, success, err)
}
func TestInstrumentationExportMetricsPartialErrored(t *testing.T) {
inst, collect := setup(t)
const n = 10
err := assert.AnError
endOp := inst.RecordOperationDuration(context.Background())
end := inst.ExportMetrics(context.Background(), n)
const success = 5
end(success, err)
endOp(err)
assertExportMetricsMetrics(t, collect(), n, success, err)
}
func TestRecordCollectionDurationSuccess(t *testing.T) {
inst, collect := setup(t)
endCollection := inst.RecordCollectionDuration(context.Background())
endCollection(nil)
assertCollectionOnly(t, collect(), nil)
}
func TestRecordCollectionDurationError(t *testing.T) {
inst, collect := setup(t)
wantErr := assert.AnError
endCollection := inst.RecordCollectionDuration(context.Background())
endCollection(wantErr)
assertCollectionOnly(t, collect(), wantErr)
}
func BenchmarkInstrumentationExportMetrics(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
inst, err := observ.NewInstrumentation(ID)
if err != nil {
b.Fatalf("failed to create instrumentation: %v", err)
}
benchErr := errors.New("benchmark error")
ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
end := inst.ExportMetrics(ctx, 10)
end(4, benchErr)
}
}
func BenchmarkInstrumentationRecordOperationDuration(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
inst, err := observ.NewInstrumentation(ID)
if err != nil {
b.Fatalf("failed to create instrumentation: %v", err)
}
benchErr := errors.New("benchmark error")
ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
done := inst.RecordOperationDuration(ctx)
done(benchErr)
}
}
func BenchmarkInstrumentationRecordCollectionDuration(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
inst, err := observ.NewInstrumentation(ID)
if err != nil {
b.Fatalf("failed to create instrumentation: %v", err)
}
benchErr := errors.New("benchmark error")
ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
endCollection := inst.RecordCollectionDuration(ctx)
endCollection(benchErr)
}
}

View File

@@ -0,0 +1,9 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package internal provides internal utilities for the OpenTelemetry prometheus exporter.
package internal // import "go.opentelemetry.io/otel/exporters/prometheus/internal"
// Version is the current release version of the OpenTelemetry prometheus
// exporter in use.
const Version = "0.60.0"

View File

@@ -0,0 +1,37 @@
# Experimental Features
The `prometheus` exporter contains features that have not yet stabilized in the OpenTelemetry specification.
These features are added to the `prometheus` exporter prior to stabilization in the specification so that users can start experimenting with them and provide feedback.
These features may change in backwards incompatible ways as feedback is applied.
See the [Compatibility and Stability](#compatibility-and-stability) section for more information.
## Features
- [Observability](#observability)
### Observability
The `prometheus` exporter can be configured to provide observability about itself using OpenTelemetry metrics.
To opt-in, set the environment variable `OTEL_GO_X_OBSERVABILITY` to `true`.
When enabled, the SDK will create the following metrics using the global `MeterProvider`:
- `otel.sdk.exporter.metric_data_point.inflight`
- `otel.sdk.exporter.metric_data_point.exported`
- `otel.sdk.metric_reader.collection.duration`
- `otel.sdk.exporter.operation.duration`
Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics.
[Semantic conventions for OpenTelemetry SDK metrics]: https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/otel/sdk-metrics.md
## Compatibility and Stability
Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md).
These features may be removed or modified in successive version releases, including patch versions.
When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.
There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version.
If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support.

View File

@@ -0,0 +1,72 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package x documents experimental features for [go.opentelemetry.io/otel/exporters/prometheus].
package x // import "go.opentelemetry.io/otel/exporters/prometheus/internal/x"
import (
"os"
"strings"
)
// Observability is an experimental feature flag that determines if exporter
// observability metrics are enabled.
//
// To enable this feature set the OTEL_GO_X_OBSERVABILITY environment variable
// to the case-insensitive string value of "true" (i.e. "True" and "TRUE"
// will also enable this).
var Observability = newFeature(
[]string{"OBSERVABILITY"},
func(v string) (string, bool) {
if strings.EqualFold(v, "true") {
return v, true
}
return "", false
},
)
// Feature is an experimental feature control flag. It provides a uniform way
// to interact with these feature flags and parse their values.
type Feature[T any] struct {
keys []string
parse func(v string) (T, bool)
}
func newFeature[T any](suffix []string, parse func(string) (T, bool)) Feature[T] {
const envKeyRoot = "OTEL_GO_X_"
keys := make([]string, 0, len(suffix))
for _, s := range suffix {
keys = append(keys, envKeyRoot+s)
}
return Feature[T]{
keys: keys,
parse: parse,
}
}
// Keys returns the environment variable keys that can be set to enable the
// feature.
func (f Feature[T]) Keys() []string { return f.keys }
// Lookup returns the user configured value for the feature and true if the
// user has enabled the feature. Otherwise, if the feature is not enabled, a
// zero-value and false are returned.
func (f Feature[T]) Lookup() (v T, ok bool) {
// https://github.com/open-telemetry/opentelemetry-specification/blob/62effed618589a0bec416a87e559c0a9d96289bb/specification/configuration/sdk-environment-variables.md#parsing-empty-value
//
// > The SDK MUST interpret an empty value of an environment variable the
// > same way as when the variable is unset.
for _, key := range f.keys {
vRaw := os.Getenv(key)
if vRaw != "" {
return f.parse(vRaw)
}
}
return v, ok
}
// Enabled reports whether the feature is enabled.
func (f Feature[T]) Enabled() bool {
_, ok := f.Lookup()
return ok
}

View File

@@ -0,0 +1,59 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package x
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestObservability(t *testing.T) {
const key = "OTEL_GO_X_OBSERVABILITY"
require.Contains(t, Observability.Keys(), key)
t.Run("100", run(setenv(key, "100"), assertDisabled(Observability)))
t.Run("true", run(setenv(key, "true"), assertEnabled(Observability, "true")))
t.Run("True", run(setenv(key, "True"), assertEnabled(Observability, "True")))
t.Run("false", run(setenv(key, "false"), assertDisabled(Observability)))
t.Run("empty", run(assertDisabled(Observability)))
}
func run(steps ...func(*testing.T)) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
for _, step := range steps {
step(t)
}
}
}
func setenv(k, v string) func(t *testing.T) { //nolint:unparam // This is a reusable test utility function.
return func(t *testing.T) { t.Setenv(k, v) }
}
func assertEnabled[T any](f Feature[T], want T) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
assert.True(t, f.Enabled(), "not enabled")
v, ok := f.Lookup()
assert.True(t, ok, "Lookup state")
assert.Equal(t, want, v, "Lookup value")
}
}
func assertDisabled[T any](f Feature[T]) func(*testing.T) {
var zero T
return func(t *testing.T) {
t.Helper()
assert.False(t, f.Enabled(), "enabled")
v, ok := f.Lookup()
assert.False(t, ok, "Lookup state")
assert.Equal(t, zero, v, "Lookup value")
}
}

View File

@@ -46,3 +46,6 @@ modules:
go.opentelemetry.io/otel/exporters/stdout/stdouttrace:
version-refs:
- ./exporters/stdout/stdouttrace/internal/version.go
go.opentelemetry.io/otel/exporters/prometheus:
version-refs:
- ./exporters/prometheus/internal/version.go