diff --git a/CHANGELOG.md b/CHANGELOG.md index 3a46d2fa4..ee35cf426 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `WithInstrumentationAttributeSet` option to `go.opentelemetry.io/otel/log`, `go.opentelemetry.io/otel/metric`, and `go.opentelemetry.io/otel/trace` packages. This provides a concurrent-safe and performant alternative to `WithInstrumentationAttributes` by accepting a pre-constructed `attribute.Set`. (#7287) - Greatly reduce the cost of recording metrics in `go.opentelemetry.io/otel/sdk/metric` using hashing for map keys. (#7175) +- Add experimental observability for the prometheus exporter in `go.opentelemetry.io/otel/exporters/prometheus`. + Check the `go.opentelemetry.io/otel/exporters/prometheus/internal/x` package documentation for more information. (#7345) ### Fixed diff --git a/exporters/prometheus/exporter.go b/exporters/prometheus/exporter.go index 0f29c0abb..c4c1c8789 100644 --- a/exporters/prometheus/exporter.go +++ b/exporters/prometheus/exporter.go @@ -20,6 +20,8 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/prometheus/internal/counter" + "go.opentelemetry.io/otel/exporters/prometheus/internal/observ" "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -92,6 +94,8 @@ type collector struct { metricNamer otlptranslator.MetricNamer labelNamer otlptranslator.LabelNamer unitNamer otlptranslator.UnitNamer + + inst *observ.Instrumentation } // New returns a Prometheus Exporter. @@ -137,7 +141,10 @@ func New(opts ...Option) (*Exporter, error) { Reader: reader, } - return e, nil + var err error + collector.inst, err = observ.NewInstrumentation(counter.NextExporterID()) + + return e, err } // Describe implements prometheus.Collector. @@ -153,9 +160,26 @@ func (*collector) Describe(chan<- *prometheus.Desc) { // // This method is safe to call concurrently. func (c *collector) Collect(ch chan<- prometheus.Metric) { + var err error + // Blocked by this issue: Propagate context.Context through Gather and Collect (#1538) + // https://github.com/prometheus/client_golang/issues/1538. + ctx := context.TODO() + + if c.inst != nil { + endOp := c.inst.RecordOperationDuration(ctx) + defer func() { endOp(err) }() + } + metrics := metricsPool.Get().(*metricdata.ResourceMetrics) defer metricsPool.Put(metrics) - err := c.reader.Collect(context.TODO(), metrics) + + endCollection := func(error) {} + if c.inst != nil { + endCollection = c.inst.RecordCollectionDuration(ctx) + } + err = c.reader.Collect(ctx, metrics) + endCollection(err) + if err != nil { if errors.Is(err, metric.ErrReaderShutdown) { return @@ -174,15 +198,16 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { defer c.mu.Unlock() if c.targetInfo == nil && !c.disableTargetInfo { - targetInfo, err := c.createInfoMetric( + targetInfo, e := c.createInfoMetric( otlptranslator.TargetInfoMetricName, targetInfoDescription, metrics.Resource, ) - if err != nil { + if e != nil { // If the target info metric is invalid, disable sending it. c.disableTargetInfo = true - otel.Handle(err) + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to createInfoMetric: %w", e)) return } @@ -195,14 +220,15 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { } if c.resourceAttributesFilter != nil && len(c.resourceKeyVals.keys) == 0 { - err := c.createResourceAttributes(metrics.Resource) - if err != nil { - otel.Handle(err) + e := c.createResourceAttributes(metrics.Resource) + if e != nil { + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to createResourceAttributes: %w", e)) return } } - for _, scopeMetrics := range metrics.ScopeMetrics { + for j, scopeMetrics := range metrics.ScopeMetrics { n := len(c.resourceKeyVals.keys) + 2 // resource attrs + scope name + scope version kv := keyVals{ keys: make([]string, 0, n), @@ -213,9 +239,10 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { kv.keys = append(kv.keys, scopeNameLabel, scopeVersionLabel, scopeSchemaLabel) kv.vals = append(kv.vals, scopeMetrics.Scope.Name, scopeMetrics.Scope.Version, scopeMetrics.Scope.SchemaURL) - attrKeys, attrVals, err := getAttrs(scopeMetrics.Scope.Attributes, c.labelNamer) - if err != nil { - otel.Handle(err) + attrKeys, attrVals, e := getAttrs(scopeMetrics.Scope.Attributes, c.labelNamer) + if e != nil { + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to getAttrs for ScopeMetrics %d: %w", j, e)) continue } for i := range attrKeys { @@ -228,16 +255,17 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { kv.keys = append(kv.keys, c.resourceKeyVals.keys...) kv.vals = append(kv.vals, c.resourceKeyVals.vals...) - for _, m := range scopeMetrics.Metrics { + for k, m := range scopeMetrics.Metrics { typ := c.metricType(m) if typ == nil { continue } - name, err := c.getName(m) - if err != nil { + name, e := c.getName(m) + if e != nil { // TODO(#7066): Handle this error better. It's not clear this can be // reached, bad metric names should / will be caught at creation time. - otel.Handle(err) + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to getAttrs for ScopeMetrics %d, Metrics %d: %w", j, k, e)) continue } @@ -252,21 +280,21 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { switch v := m.Data.(type) { case metricdata.Histogram[int64]: - addHistogramMetric(ch, v, m, name, kv, c.labelNamer) + addHistogramMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx) case metricdata.Histogram[float64]: - addHistogramMetric(ch, v, m, name, kv, c.labelNamer) + addHistogramMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx) case metricdata.ExponentialHistogram[int64]: - addExponentialHistogramMetric(ch, v, m, name, kv, c.labelNamer) + addExponentialHistogramMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx) case metricdata.ExponentialHistogram[float64]: - addExponentialHistogramMetric(ch, v, m, name, kv, c.labelNamer) + addExponentialHistogramMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx) case metricdata.Sum[int64]: - addSumMetric(ch, v, m, name, kv, c.labelNamer) + addSumMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx) case metricdata.Sum[float64]: - addSumMetric(ch, v, m, name, kv, c.labelNamer) + addSumMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx) case metricdata.Gauge[int64]: - addGaugeMetric(ch, v, m, name, kv, c.labelNamer) + addGaugeMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx) case metricdata.Gauge[float64]: - addGaugeMetric(ch, v, m, name, kv, c.labelNamer) + addGaugeMetric(ch, v, m, name, kv, c.labelNamer, c.inst, ctx) } } } @@ -332,11 +360,21 @@ func addExponentialHistogramMetric[N int64 | float64]( name string, kv keyVals, labelNamer otlptranslator.LabelNamer, + inst *observ.Instrumentation, + ctx context.Context, ) { - for _, dp := range histogram.DataPoints { - keys, values, err := getAttrs(dp.Attributes, labelNamer) - if err != nil { - otel.Handle(err) + var err error + var success int64 + if inst != nil { + end := inst.ExportMetrics(ctx, int64(len(histogram.DataPoints))) + defer func() { end(success, err) }() + } + + for j, dp := range histogram.DataPoints { + keys, values, e := getAttrs(dp.Attributes, labelNamer) + if e != nil { + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to getAttrs for histogram.DataPoints %d: %w", j, e)) continue } keys = append(keys, kv.keys...) @@ -348,9 +386,12 @@ func addExponentialHistogramMetric[N int64 | float64]( scale := dp.Scale if scale < -4 { // Reject scales below -4 as they cannot be represented in Prometheus - otel.Handle(fmt.Errorf( + e := fmt.Errorf( "exponential histogram scale %d is below minimum supported scale -4, skipping data point", - scale)) + scale, + ) + otel.Handle(e) + err = errors.Join(err, e) continue } @@ -368,7 +409,9 @@ func addExponentialHistogramMetric[N int64 | float64]( positiveBuckets := make(map[int]int64) for i, c := range positiveBucket.Counts { if c > math.MaxInt64 { - otel.Handle(fmt.Errorf("positive count %d is too large to be represented as int64", c)) + e := fmt.Errorf("positive count %d is too large to be represented as int64", c) + otel.Handle(e) + err = errors.Join(err, e) continue } positiveBuckets[int(positiveBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. @@ -377,13 +420,15 @@ func addExponentialHistogramMetric[N int64 | float64]( negativeBuckets := make(map[int]int64) for i, c := range negativeBucket.Counts { if c > math.MaxInt64 { - otel.Handle(fmt.Errorf("negative count %d is too large to be represented as int64", c)) + e := fmt.Errorf("negative count %d is too large to be represented as int64", c) + otel.Handle(e) + err = errors.Join(err, e) continue } negativeBuckets[int(negativeBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. } - m, err := prometheus.NewConstNativeHistogram( + m, e := prometheus.NewConstNativeHistogram( desc, dp.Count, float64(dp.Sum), @@ -394,12 +439,18 @@ func addExponentialHistogramMetric[N int64 | float64]( dp.ZeroThreshold, dp.StartTime, values...) - if err != nil { - otel.Handle(err) + if e != nil { + otel.Handle(e) + err = errors.Join( + err, + fmt.Errorf("failed to NewConstNativeHistogram for histogram.DataPoints %d: %w", j, e), + ) continue } m = addExemplars(m, dp.Exemplars, labelNamer) ch <- m + + success++ } } @@ -410,11 +461,21 @@ func addHistogramMetric[N int64 | float64]( name string, kv keyVals, labelNamer otlptranslator.LabelNamer, + inst *observ.Instrumentation, + ctx context.Context, ) { - for _, dp := range histogram.DataPoints { - keys, values, err := getAttrs(dp.Attributes, labelNamer) - if err != nil { - otel.Handle(err) + var err error + var success int64 + if inst != nil { + end := inst.ExportMetrics(ctx, int64(len(histogram.DataPoints))) + defer func() { end(success, err) }() + } + + for j, dp := range histogram.DataPoints { + keys, values, e := getAttrs(dp.Attributes, labelNamer) + if e != nil { + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to getAttrs for histogram.DataPoints %d: %w", j, e)) continue } keys = append(keys, kv.keys...) @@ -428,13 +489,16 @@ func addHistogramMetric[N int64 | float64]( cumulativeCount += dp.BucketCounts[i] buckets[bound] = cumulativeCount } - m, err := prometheus.NewConstHistogram(desc, dp.Count, float64(dp.Sum), buckets, values...) - if err != nil { - otel.Handle(err) + m, e := prometheus.NewConstHistogram(desc, dp.Count, float64(dp.Sum), buckets, values...) + if e != nil { + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to NewConstMetric for histogram.DataPoints %d: %w", j, e)) continue } m = addExemplars(m, dp.Exemplars, labelNamer) ch <- m + + success++ } } @@ -445,25 +509,36 @@ func addSumMetric[N int64 | float64]( name string, kv keyVals, labelNamer otlptranslator.LabelNamer, + inst *observ.Instrumentation, + ctx context.Context, ) { + var err error + var success int64 + if inst != nil { + end := inst.ExportMetrics(ctx, int64(len(sum.DataPoints))) + defer func() { end(success, err) }() + } + valueType := prometheus.CounterValue if !sum.IsMonotonic { valueType = prometheus.GaugeValue } - for _, dp := range sum.DataPoints { - keys, values, err := getAttrs(dp.Attributes, labelNamer) - if err != nil { - otel.Handle(err) + for i, dp := range sum.DataPoints { + keys, values, e := getAttrs(dp.Attributes, labelNamer) + if e != nil { + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to getAttrs for sum.DataPoints %d: %w", i, e)) continue } keys = append(keys, kv.keys...) values = append(values, kv.vals...) desc := prometheus.NewDesc(name, m.Description, keys, nil) - m, err := prometheus.NewConstMetric(desc, valueType, float64(dp.Value), values...) - if err != nil { - otel.Handle(err) + m, e := prometheus.NewConstMetric(desc, valueType, float64(dp.Value), values...) + if e != nil { + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to NewConstMetric for sum.DataPoints %d: %w", i, e)) continue } // GaugeValues don't support Exemplars at this time @@ -472,6 +547,8 @@ func addSumMetric[N int64 | float64]( m = addExemplars(m, dp.Exemplars, labelNamer) } ch <- m + + success++ } } @@ -482,23 +559,36 @@ func addGaugeMetric[N int64 | float64]( name string, kv keyVals, labelNamer otlptranslator.LabelNamer, + inst *observ.Instrumentation, + ctx context.Context, ) { - for _, dp := range gauge.DataPoints { - keys, values, err := getAttrs(dp.Attributes, labelNamer) - if err != nil { - otel.Handle(err) + var err error + var success int64 + if inst != nil { + end := inst.ExportMetrics(ctx, int64(len(gauge.DataPoints))) + defer func() { end(success, err) }() + } + + for i, dp := range gauge.DataPoints { + keys, values, e := getAttrs(dp.Attributes, labelNamer) + if e != nil { + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to getAttrs for gauge.DataPoints %d: %w", i, e)) continue } keys = append(keys, kv.keys...) values = append(values, kv.vals...) desc := prometheus.NewDesc(name, m.Description, keys, nil) - m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(dp.Value), values...) - if err != nil { - otel.Handle(err) + m, e := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(dp.Value), values...) + if e != nil { + otel.Handle(e) + err = errors.Join(err, fmt.Errorf("failed to NewConstMetric for gauge.DataPoints %d: %w", i, e)) continue } ch <- m + + success++ } } diff --git a/exporters/prometheus/exporter_test.go b/exporters/prometheus/exporter_test.go index 139431d00..e745ead01 100644 --- a/exporters/prometheus/exporter_test.go +++ b/exporters/prometheus/exporter_test.go @@ -22,7 +22,9 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/prometheus/internal/observ" otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" @@ -1349,6 +1351,8 @@ func TestExponentialHistogramScaleValidation(t *testing.T) { "test_histogram", keyVals{}, otlptranslator.LabelNamer{}, + nil, + context.Background(), ) assert.Error(t, capturedError) assert.Contains(t, capturedError.Error(), "scale -5 is below minimum") @@ -1513,6 +1517,8 @@ func TestExponentialHistogramHighScaleDownscaling(t *testing.T) { "test_high_scale_histogram", keyVals{}, otlptranslator.LabelNamer{}, + nil, + context.Background(), ) // Verify a metric was produced @@ -1575,6 +1581,8 @@ func TestExponentialHistogramHighScaleDownscaling(t *testing.T) { "test_very_high_scale_histogram", keyVals{}, otlptranslator.LabelNamer{}, + nil, + context.Background(), ) // Verify a metric was produced @@ -1637,6 +1645,8 @@ func TestExponentialHistogramHighScaleDownscaling(t *testing.T) { "test_histogram_with_negative_buckets", keyVals{}, otlptranslator.LabelNamer{}, + nil, + context.Background(), ) // Verify a metric was produced @@ -1693,6 +1703,8 @@ func TestExponentialHistogramHighScaleDownscaling(t *testing.T) { "test_int64_exponential_histogram", keyVals{}, otlptranslator.LabelNamer{}, + nil, + context.Background(), ) // Verify a metric was produced @@ -1895,3 +1907,544 @@ func TestEscapingErrorHandling(t *testing.T) { }) } } + +func TestExporterSelfInstrumentation(t *testing.T) { + testCases := []struct { + name string + enableObservability bool + recordMetrics func(ctx context.Context, meter otelmetric.Meter) + expectedObservMetrics []string + expectedMainMetrics int + checkMetrics func(t *testing.T, mainMetrics []*dto.MetricFamily, observMetrics metricdata.ScopeMetrics) + }{ + { + name: "self instrumentation disabled", + enableObservability: false, + recordMetrics: func(ctx context.Context, meter otelmetric.Meter) { + counter, err := meter.Int64Counter("test_counter") + require.NoError(t, err) + counter.Add(ctx, 1) + }, + expectedObservMetrics: []string{}, // No observability metrics expected + expectedMainMetrics: 2, // test counter + target_info + }, + { + name: "self instrumentation enabled with counter", + enableObservability: true, + recordMetrics: func(ctx context.Context, meter otelmetric.Meter) { + counter, err := meter.Int64Counter("test_counter", otelmetric.WithDescription("test counter")) + require.NoError(t, err) + counter.Add(ctx, 1, otelmetric.WithAttributes(attribute.String("key", "value"))) + }, + expectedObservMetrics: []string{ + "otel.sdk.exporter.metric_data_point.inflight", + "otel.sdk.exporter.metric_data_point.exported", + "otel.sdk.exporter.operation.duration", + "otel.sdk.metric_reader.collection.duration", + }, + expectedMainMetrics: 2, // test counter + target_info + checkMetrics: func(t *testing.T, _ []*dto.MetricFamily, observMetrics metricdata.ScopeMetrics) { + // Check that exported metrics include success count + for _, m := range observMetrics.Metrics { + if m.Name == "otel.sdk.exporter.metric_data_point.exported" { + sum, ok := m.Data.(metricdata.Sum[int64]) + require.True(t, ok) + require.Len(t, sum.DataPoints, 1) + require.Equal(t, int64(1), sum.DataPoints[0].Value) + } + } + }, + }, + { + name: "self instrumentation enabled with gauge", + enableObservability: true, + recordMetrics: func(ctx context.Context, meter otelmetric.Meter) { + gauge, err := meter.Float64Gauge("test_gauge", otelmetric.WithDescription("test gauge")) + require.NoError(t, err) + gauge.Record(ctx, 42.5, otelmetric.WithAttributes(attribute.String("key", "value"))) + }, + expectedObservMetrics: []string{ + "otel.sdk.exporter.metric_data_point.inflight", + "otel.sdk.exporter.metric_data_point.exported", + "otel.sdk.exporter.operation.duration", + "otel.sdk.metric_reader.collection.duration", + }, + expectedMainMetrics: 2, + }, + { + name: "self instrumentation enabled with histogram", + enableObservability: true, + recordMetrics: func(ctx context.Context, meter otelmetric.Meter) { + histogram, err := meter.Float64Histogram("test_histogram", otelmetric.WithDescription("test histogram")) + require.NoError(t, err) + histogram.Record(ctx, 1.5, otelmetric.WithAttributes(attribute.String("key", "value"))) + histogram.Record(ctx, 2.5, otelmetric.WithAttributes(attribute.String("key", "value"))) + }, + expectedObservMetrics: []string{ + "otel.sdk.exporter.metric_data_point.inflight", + "otel.sdk.exporter.metric_data_point.exported", + "otel.sdk.exporter.operation.duration", + "otel.sdk.metric_reader.collection.duration", + }, + expectedMainMetrics: 2, + }, + { + name: "self instrumentation with multiple metrics", + enableObservability: true, + recordMetrics: func(ctx context.Context, meter otelmetric.Meter) { + counter, err := meter.Int64Counter("test_counter") + require.NoError(t, err) + counter.Add(ctx, 5, otelmetric.WithAttributes(attribute.String("type", "requests"))) + counter.Add(ctx, 3, otelmetric.WithAttributes(attribute.String("type", "errors"))) + + gauge, err := meter.Float64Gauge("test_gauge") + require.NoError(t, err) + gauge.Record(ctx, 100.0, otelmetric.WithAttributes(attribute.String("status", "active"))) + + histogram, err := meter.Float64Histogram("test_histogram") + require.NoError(t, err) + histogram.Record(ctx, 0.1) + histogram.Record(ctx, 0.2) + histogram.Record(ctx, 0.3) + }, + expectedObservMetrics: []string{ + "otel.sdk.exporter.metric_data_point.inflight", + "otel.sdk.exporter.metric_data_point.exported", + "otel.sdk.exporter.operation.duration", + "otel.sdk.metric_reader.collection.duration", + }, + expectedMainMetrics: 4, // 3 test metrics + target_info + checkMetrics: func(t *testing.T, _ []*dto.MetricFamily, observMetrics metricdata.ScopeMetrics) { + // Check that exported metrics track multiple data points + for _, m := range observMetrics.Metrics { + if m.Name == "otel.sdk.exporter.metric_data_point.exported" { + sum, ok := m.Data.(metricdata.Sum[int64]) + require.True(t, ok) + require.Len(t, sum.DataPoints, 1) + // Counter: 2 data points, Gauge: 1 data point, Histogram: 1 data point = 4 total + require.Equal(t, int64(4), sum.DataPoints[0].Value) + } + } + }, + }, + { + name: "self instrumentation enabled with up-down counter", + enableObservability: true, + recordMetrics: func(ctx context.Context, meter otelmetric.Meter) { + upDownCounter, err := meter.Int64UpDownCounter( + "test_updown_counter", + otelmetric.WithDescription("test up-down counter"), + ) + require.NoError(t, err) + upDownCounter.Add(ctx, 10, otelmetric.WithAttributes(attribute.String("direction", "up"))) + upDownCounter.Add(ctx, -5, otelmetric.WithAttributes(attribute.String("direction", "down"))) + }, + expectedObservMetrics: []string{ + "otel.sdk.exporter.metric_data_point.inflight", + "otel.sdk.exporter.metric_data_point.exported", + "otel.sdk.exporter.operation.duration", + "otel.sdk.metric_reader.collection.duration", + }, + expectedMainMetrics: 2, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + if tc.enableObservability { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + } else { + t.Setenv("OTEL_GO_X_OBSERVABILITY", "") + } + + // Setup observability metric collection + var observReader *metric.ManualReader + var observMetricsFunc func() metricdata.ScopeMetrics + + if tc.enableObservability { + originalMP := otel.GetMeterProvider() + defer otel.SetMeterProvider(originalMP) + + observReader = metric.NewManualReader() + observMP := metric.NewMeterProvider(metric.WithReader(observReader)) + otel.SetMeterProvider(observMP) + + observMetricsFunc = func() metricdata.ScopeMetrics { + var rm metricdata.ResourceMetrics + err := observReader.Collect(context.Background(), &rm) + require.NoError(t, err) + if len(rm.ScopeMetrics) == 0 { + return metricdata.ScopeMetrics{} + } + return rm.ScopeMetrics[0] + } + } + + ctx := context.Background() + registry := prometheus.NewRegistry() + + exporter, err := New(WithRegisterer(registry)) + require.NoError(t, err) + + provider := metric.NewMeterProvider(metric.WithReader(exporter)) + meter := provider.Meter("test", otelmetric.WithInstrumentationVersion("v1.0.0")) + + // Record the test metrics + tc.recordMetrics(ctx, meter) + + // Collect main metrics + mainMetrics, err := registry.Gather() + require.NoError(t, err) + + // Verify the number of main metric families + assert.Len(t, mainMetrics, tc.expectedMainMetrics) + + // Collect and check observability metrics if enabled + if tc.enableObservability { + observMetrics := observMetricsFunc() + + // Check that expected observability metrics are present + observedMetrics := make(map[string]bool) + for _, m := range observMetrics.Metrics { + observedMetrics[m.Name] = true + } + + for _, expectedMetric := range tc.expectedObservMetrics { + assert.True( + t, + observedMetrics[expectedMetric], + "Expected observability metric %s not found", + expectedMetric, + ) + } + + // Verify observability metrics have expected structure + expectedScope := instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: observ.SchemaURL, + } + assert.Equal(t, expectedScope, observMetrics.Scope, "Expected observability scope") + assert.Len( + t, + observMetrics.Metrics, + len(tc.expectedObservMetrics), + "Expected number of observability metrics", + ) + + // Run custom metric checks if provided + if tc.checkMetrics != nil { + tc.checkMetrics(t, mainMetrics, observMetrics) + } + } + }) + } +} + +func TestExporterSelfInstrumentationErrors(t *testing.T) { + testCases := []struct { + name string + setupError func() (metric.Reader, func()) + expectedMinMetrics int // Minimum expected metrics in error scenarios + checkErrorAttributes func(t *testing.T, observMetrics metricdata.ScopeMetrics) + }{ + { + name: "reader shutdown error", + setupError: func() (metric.Reader, func()) { + reader := metric.NewManualReader() + return reader, func() { _ = reader.Shutdown(context.Background()) } + }, + expectedMinMetrics: 1, // At least some metrics should be present + }, + { + name: "reader not registered error", + setupError: func() (metric.Reader, func()) { + reader := metric.NewManualReader() + // Don't register the reader with a provider + return reader, func() {} + }, + expectedMinMetrics: 1, // At least some metrics should be present + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Enable observability + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + // Setup observability metric collection + originalMP := otel.GetMeterProvider() + defer otel.SetMeterProvider(originalMP) + + observReader := metric.NewManualReader() + observMP := metric.NewMeterProvider(metric.WithReader(observReader)) + otel.SetMeterProvider(observMP) + + registry := prometheus.NewRegistry() + + reader, cleanup := tc.setupError() + defer cleanup() + + // Create exporter with the error-prone reader + cfg := newConfig() + cfg.registerer = registry + + collector := &collector{ + reader: reader, + disableTargetInfo: cfg.disableTargetInfo, + withoutUnits: cfg.withoutUnits, + withoutCounterSuffixes: cfg.withoutCounterSuffixes, + disableScopeInfo: cfg.disableScopeInfo, + metricFamilies: make(map[string]*dto.MetricFamily), + namespace: cfg.namespace, + resourceAttributesFilter: cfg.resourceAttributesFilter, + metricNamer: otlptranslator.NewMetricNamer(cfg.namespace, cfg.translationStrategy), + } + + var err error + collector.inst, err = observ.NewInstrumentation(0) + require.NoError(t, err) + + err = registry.Register(collector) + require.NoError(t, err) + + // Trigger collection which should encounter the error + _, err = registry.Gather() + require.NoError(t, err) + + // Collect observability metrics + var observMetrics metricdata.ResourceMetrics + err = observReader.Collect(context.Background(), &observMetrics) + require.NoError(t, err) + + if len(observMetrics.ScopeMetrics) > 0 { + // Verify observability metrics are still present (at least some) + scopeMetrics := observMetrics.ScopeMetrics[0] + foundObservMetrics := 0 + for _, m := range scopeMetrics.Metrics { + switch m.Name { + case "otel.sdk.exporter.metric_data_point.inflight", + "otel.sdk.exporter.metric_data_point.exported", + "otel.sdk.exporter.operation.duration", + "otel.sdk.metric_reader.collection.duration": + foundObservMetrics++ + } + } + assert.GreaterOrEqual( + t, + foundObservMetrics, + tc.expectedMinMetrics, + "Should have at least some observability metrics even with errors", + ) + + if tc.checkErrorAttributes != nil { + tc.checkErrorAttributes(t, scopeMetrics) + } + } + }) + } +} + +func TestExporterSelfInstrumentationConcurrency(t *testing.T) { + // Enable observability + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + // Setup observability metric collection + originalMP := otel.GetMeterProvider() + defer otel.SetMeterProvider(originalMP) + + observReader := metric.NewManualReader() + observMP := metric.NewMeterProvider(metric.WithReader(observReader)) + otel.SetMeterProvider(observMP) + + ctx := context.Background() + registry := prometheus.NewRegistry() + + exporter, err := New(WithRegisterer(registry)) + require.NoError(t, err) + + provider := metric.NewMeterProvider(metric.WithReader(exporter)) + meter := provider.Meter("concurrent_test", otelmetric.WithInstrumentationVersion("v1.0.0")) + + counter, err := meter.Int64Counter("concurrent_counter") + require.NoError(t, err) + + // Run concurrent operations + const numGoroutines = 10 + const numOperations = 100 + var wg sync.WaitGroup + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + for j := 0; j < numOperations; j++ { + counter.Add(ctx, 1, otelmetric.WithAttributes(attribute.Int("goroutine", id))) + + // Occasionally trigger collection + if j%10 == 0 { + _, _ = registry.Gather() + } + } + }(i) + } + + wg.Wait() + + // Final collection + _, err = registry.Gather() + require.NoError(t, err) + + // Collect observability metrics + var observMetrics metricdata.ResourceMetrics + err = observReader.Collect(context.Background(), &observMetrics) + require.NoError(t, err) + + if len(observMetrics.ScopeMetrics) > 0 { + scopeMetrics := observMetrics.ScopeMetrics[0] + // Verify observability metrics are present and have reasonable values + for _, m := range scopeMetrics.Metrics { + switch m.Name { + case "otel.sdk.exporter.metric_data_point.exported": + sum, ok := m.Data.(metricdata.Sum[int64]) + require.True(t, ok) + require.NotEmpty(t, sum.DataPoints) + // Should have exported many data points + assert.Positive(t, sum.DataPoints[0].Value) + case "otel.sdk.exporter.operation.duration": + hist, ok := m.Data.(metricdata.Histogram[float64]) + require.True(t, ok) + require.NotEmpty(t, hist.DataPoints) + // Should have recorded operation durations + assert.Positive(t, hist.DataPoints[0].Count) + case "otel.sdk.metric_reader.collection.duration": + hist, ok := m.Data.(metricdata.Histogram[float64]) + require.True(t, ok) + require.NotEmpty(t, hist.DataPoints) + // Should have recorded collection durations + assert.Positive(t, hist.DataPoints[0].Count) + } + } + } +} + +func TestExporterSelfInstrumentationExemplarHandling(t *testing.T) { + // Enable observability + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + // Setup observability metric collection + originalMP := otel.GetMeterProvider() + defer otel.SetMeterProvider(originalMP) + + observReader := metric.NewManualReader() + observMP := metric.NewMeterProvider(metric.WithReader(observReader)) + otel.SetMeterProvider(observMP) + + ctx := context.Background() + registry := prometheus.NewRegistry() + + exporter, err := New(WithRegisterer(registry)) + require.NoError(t, err) + + provider := metric.NewMeterProvider(metric.WithReader(exporter)) + meter := provider.Meter("exemplar_test", otelmetric.WithInstrumentationVersion("v1.0.0")) + + // Create trace context for exemplars + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, + SpanID: [8]byte{1, 2, 3, 4, 5, 6, 7, 8}, + TraceFlags: trace.FlagsSampled, + }) + ctx = trace.ContextWithSpanContext(ctx, sc) + + histogram, err := meter.Float64Histogram("test_histogram_with_exemplars") + require.NoError(t, err) + + // Record values that should generate exemplars + histogram.Record(ctx, 1.0, otelmetric.WithAttributes(attribute.String("key", "value1"))) + histogram.Record(ctx, 2.0, otelmetric.WithAttributes(attribute.String("key", "value2"))) + + // Collect metrics + got, err := registry.Gather() + require.NoError(t, err) + + // Verify that metrics are collected without errors even when exemplars are present + foundTestHistogram := false + + for _, mf := range got { + if *mf.Name == "test_histogram_with_exemplars" { + foundTestHistogram = true + assert.Equal(t, dto.MetricType_HISTOGRAM, *mf.Type) + } + } + + assert.True(t, foundTestHistogram, "Test histogram should be present") + + // Collect observability metrics + var observMetrics metricdata.ResourceMetrics + err = observReader.Collect(context.Background(), &observMetrics) + require.NoError(t, err) + + if len(observMetrics.ScopeMetrics) > 0 { + scopeMetrics := observMetrics.ScopeMetrics[0] + foundObservabilityMetrics := 0 + for _, m := range scopeMetrics.Metrics { + switch m.Name { + case "otel.sdk.exporter.metric_data_point.inflight", + "otel.sdk.exporter.metric_data_point.exported", + "otel.sdk.exporter.operation.duration", + "otel.sdk.metric_reader.collection.duration": + foundObservabilityMetrics++ + } + } + assert.Equal(t, 4, foundObservabilityMetrics, "All observability metrics should be present") + } +} + +func TestExporterSelfInstrumentationInitErrors(t *testing.T) { + // Test when NewInstrumentation returns an error + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + // Set up a meter provider that will cause NewInstrumentation to fail + original := otel.GetMeterProvider() + defer otel.SetMeterProvider(original) + + errMP := &errMeterProvider{err: assert.AnError} + otel.SetMeterProvider(errMP) + + registry := prometheus.NewRegistry() + + // This should fail during exporter creation due to instrumentation init error + _, err := New(WithRegisterer(registry)) + require.ErrorIs(t, err, assert.AnError, "Expected NewInstrumentation error to be propagated") +} + +// Helper types for testing NewInstrumentation errors. +type errMeterProvider struct { + otelmetric.MeterProvider + err error +} + +func (m *errMeterProvider) Meter(string, ...otelmetric.MeterOption) otelmetric.Meter { + return &errMeter{err: m.err} +} + +type errMeter struct { + otelmetric.Meter + err error +} + +func (m *errMeter) Int64UpDownCounter( + string, + ...otelmetric.Int64UpDownCounterOption, +) (otelmetric.Int64UpDownCounter, error) { + return nil, m.err +} + +func (m *errMeter) Int64Counter(string, ...otelmetric.Int64CounterOption) (otelmetric.Int64Counter, error) { + return nil, m.err +} + +func (m *errMeter) Float64Histogram(string, ...otelmetric.Float64HistogramOption) (otelmetric.Float64Histogram, error) { + return nil, m.err +} diff --git a/exporters/prometheus/internal/counter/counter.go b/exporters/prometheus/internal/counter/counter.go new file mode 100644 index 000000000..87808d548 --- /dev/null +++ b/exporters/prometheus/internal/counter/counter.go @@ -0,0 +1,31 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package counter provides a simple counter for generating unique IDs. +// +// This package is used to generate unique IDs while allowing testing packages +// to reset the counter. +package counter // import "go.opentelemetry.io/otel/exporters/prometheus/internal/counter" + +import "sync/atomic" + +// exporterN is a global 0-based count of the number of exporters created. +var exporterN atomic.Int64 + +// NextExporterID returns the next unique ID for an exporter. +func NextExporterID() int64 { + const inc = 1 + return exporterN.Add(inc) - inc +} + +// SetExporterID sets the exporter ID counter to v and returns the previous +// value. +// +// This function is useful for testing purposes, allowing you to reset the +// counter. It should not be used in production code. +func SetExporterID(v int64) int64 { + return exporterN.Swap(v) +} diff --git a/exporters/prometheus/internal/counter/counter_test.go b/exporters/prometheus/internal/counter/counter_test.go new file mode 100644 index 000000000..f3e380d33 --- /dev/null +++ b/exporters/prometheus/internal/counter/counter_test.go @@ -0,0 +1,65 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package counter + +import ( + "sync" + "testing" +) + +func TestNextExporterID(t *testing.T) { + SetExporterID(0) + + var expected int64 + for range 10 { + id := NextExporterID() + if id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } + expected++ + } +} + +func TestSetExporterID(t *testing.T) { + SetExporterID(0) + + prev := SetExporterID(42) + if prev != 0 { + t.Errorf("SetExporterID(42) returned %d; want 0", prev) + } + + id := NextExporterID() + if id != 42 { + t.Errorf("NextExporterID() = %d; want 42", id) + } +} + +func TestNextExporterIDConcurrentSafe(t *testing.T) { + SetExporterID(0) + + const goroutines = 100 + const increments = 10 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + for range increments { + NextExporterID() + } + }() + } + + wg.Wait() + + expected := int64(goroutines * increments) + if id := NextExporterID(); id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } +} \ No newline at end of file diff --git a/exporters/prometheus/internal/gen.go b/exporters/prometheus/internal/gen.go new file mode 100644 index 000000000..8cde0168a --- /dev/null +++ b/exporters/prometheus/internal/gen.go @@ -0,0 +1,9 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package internal provides internal functionality for the prometheus +// package. +package internal // import "go.opentelemetry.io/otel/exporters/prometheus/internal" + +//go:generate gotmpl --body=../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/prometheus/internal/counter\" }" --out=counter/counter.go +//go:generate gotmpl --body=../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go diff --git a/exporters/prometheus/internal/observ/instrumentation.go b/exporters/prometheus/internal/observ/instrumentation.go new file mode 100644 index 000000000..df1bdaa4b --- /dev/null +++ b/exporters/prometheus/internal/observ/instrumentation.go @@ -0,0 +1,224 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package observ provides experimental observability instrumentation +// for the prometheus exporter. +package observ // import "go.opentelemetry.io/otel/exporters/prometheus/internal/observ" + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/prometheus/internal" + "go.opentelemetry.io/otel/exporters/prometheus/internal/x" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +const ( + // ComponentType uniquely identifies the OpenTelemetry Exporter component + // being instrumented. + ComponentType = "go.opentelemetry.io/otel/exporters/prometheus/prometheus.Exporter" + + // ScopeName is the unique name of the meter used for instrumentation. + ScopeName = "go.opentelemetry.io/otel/exporters/prometheus/internal/observ" + + // SchemaURL is the schema URL of the metrics produced by this + // instrumentation. + SchemaURL = semconv.SchemaURL + + // Version is the current version of this instrumentation. + // + // This matches the version of the exporter. + Version = internal.Version +) + +var ( + measureAttrsPool = &sync.Pool{ + New: func() any { + // "component.name" + "component.type" + "error.type" + const n = 1 + 1 + 1 + s := make([]attribute.KeyValue, 0, n) + // Return a pointer to a slice instead of a slice itself + // to avoid allocations on every call. + return &s + }, + } + + addOptPool = &sync.Pool{ + New: func() any { + const n = 1 // WithAttributeSet + o := make([]metric.AddOption, 0, n) + return &o + }, + } + + recordOptPool = &sync.Pool{ + New: func() any { + const n = 1 // WithAttributeSet + o := make([]metric.RecordOption, 0, n) + return &o + }, + } +) + +func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) } + +func put[T any](p *sync.Pool, s *[]T) { + *s = (*s)[:0] // Reset. + p.Put(s) +} + +func ComponentName(id int64) string { + return fmt.Sprintf("%s/%d", ComponentType, id) +} + +type Instrumentation struct { + inflightMetric metric.Int64UpDownCounter + exportedMetric metric.Int64Counter + operationDuration metric.Float64Histogram + collectionDuration metric.Float64Histogram + + attrs []attribute.KeyValue + setOpt metric.MeasurementOption +} + +func NewInstrumentation(id int64) (*Instrumentation, error) { + if !x.Observability.Enabled() { + return nil, nil + } + + i := &Instrumentation{ + attrs: []attribute.KeyValue{ + semconv.OTelComponentName(ComponentName(id)), + semconv.OTelComponentTypeKey.String(ComponentType), + }, + } + + s := attribute.NewSet(i.attrs...) + i.setOpt = metric.WithAttributeSet(s) + + mp := otel.GetMeterProvider() + m := mp.Meter( + ScopeName, + metric.WithInstrumentationVersion(Version), + metric.WithSchemaURL(SchemaURL), + ) + + var err, e error + + inflightMetric, e := otelconv.NewSDKExporterMetricDataPointInflight(m) + if e != nil { + e = fmt.Errorf("failed to create inflight metric: %w", e) + err = errors.Join(err, e) + } + i.inflightMetric = inflightMetric.Inst() + + exportedMetric, e := otelconv.NewSDKExporterMetricDataPointExported(m) + if e != nil { + e = fmt.Errorf("failed to create exported metric: %w", e) + err = errors.Join(err, e) + } + i.exportedMetric = exportedMetric.Inst() + + operationDuration, e := otelconv.NewSDKExporterOperationDuration(m) + if e != nil { + e = fmt.Errorf("failed to create operation duration metric: %w", e) + err = errors.Join(err, e) + } + i.operationDuration = operationDuration.Inst() + + collectionDuration, e := otelconv.NewSDKMetricReaderCollectionDuration(m) + if e != nil { + e = fmt.Errorf("failed to create collection duration metric: %w", e) + err = errors.Join(err, e) + } + i.collectionDuration = collectionDuration.Inst() + + return i, err +} + +// RecordDurationDone is a function that is called when a call to an Exporters' +// RecordOperationDuration or RecordCollectionDuration completes. +// +// Any error that is encountered is provided as err. +type RecordDurationDone func(error) + +func (i *Instrumentation) RecordOperationDuration(ctx context.Context) RecordDurationDone { + return i.recordDuration(ctx, i.operationDuration) +} + +func (i *Instrumentation) RecordCollectionDuration(ctx context.Context) RecordDurationDone { + return i.recordDuration(ctx, i.collectionDuration) +} + +func (i *Instrumentation) recordDuration( + ctx context.Context, + h metric.Float64Histogram, +) RecordDurationDone { + start := time.Now() + + return func(err error) { + recordOpt := get[metric.RecordOption](recordOptPool) + defer put(recordOptPool, recordOpt) + *recordOpt = append(*recordOpt, i.setOpt) + + if err != nil { + attrs := get[attribute.KeyValue](measureAttrsPool) + defer put(measureAttrsPool, attrs) + *attrs = append(*attrs, i.attrs...) + *attrs = append(*attrs, semconv.ErrorType(err)) + + set := attribute.NewSet(*attrs...) + *recordOpt = append((*recordOpt)[:0], metric.WithAttributeSet(set)) + } + + h.Record(ctx, time.Since(start).Seconds(), *recordOpt...) + } +} + +// ExportMetricsDone is a function that is called when a call to an Exporter's +// export methods completes. +// +// The number of successful exports is provided as success. Any error that is +// encountered is provided as err. +type ExportMetricsDone func(success int64, err error) + +func (i *Instrumentation) ExportMetrics(ctx context.Context, n int64) ExportMetricsDone { + addOpt := get[metric.AddOption](addOptPool) + defer put(addOptPool, addOpt) + *addOpt = append(*addOpt, i.setOpt) + + i.inflightMetric.Add(ctx, n, *addOpt...) + + return i.end(ctx, n) +} + +func (i *Instrumentation) end(ctx context.Context, n int64) ExportMetricsDone { + return func(success int64, err error) { + addOpt := get[metric.AddOption](addOptPool) + defer put(addOptPool, addOpt) + *addOpt = append(*addOpt, i.setOpt) + + i.inflightMetric.Add(ctx, -n, *addOpt...) + i.exportedMetric.Add(ctx, success, *addOpt...) + + if err != nil { + attrs := get[attribute.KeyValue](measureAttrsPool) + defer put(measureAttrsPool, attrs) + *attrs = append(*attrs, i.attrs...) + *attrs = append(*attrs, semconv.ErrorType(err)) + + set := attribute.NewSet(*attrs...) + + *addOpt = append((*addOpt)[:0], metric.WithAttributeSet(set)) + i.exportedMetric.Add(ctx, n-success, *addOpt...) + } + } +} diff --git a/exporters/prometheus/internal/observ/instrumentation_test.go b/exporters/prometheus/internal/observ/instrumentation_test.go new file mode 100644 index 000000000..2ce906e19 --- /dev/null +++ b/exporters/prometheus/internal/observ/instrumentation_test.go @@ -0,0 +1,334 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ_test + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + observ "go.opentelemetry.io/otel/exporters/prometheus/internal/observ" + mapi "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +const ID = 0 + +var Scope = instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: observ.SchemaURL, +} + +type errMeterProvider struct { + mapi.MeterProvider + err error +} + +func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter { + return &errMeter{err: m.err} +} + +type errMeter struct { + mapi.Meter + err error +} + +func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) { + return nil, m.err +} + +func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) { + return nil, m.err +} + +func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) { + return nil, m.err +} + +func TestNewInstrumentationObservabilityErrors(t *testing.T) { + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + mp := &errMeterProvider{err: assert.AnError} + otel.SetMeterProvider(mp) + + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + _, err := observ.NewInstrumentation(ID) + require.ErrorIs(t, err, assert.AnError, "new instrument errors should be joined") + + assert.ErrorContains(t, err, "inflight metric") + assert.ErrorContains(t, err, "exported metric") + assert.ErrorContains(t, err, "operation duration metric") + assert.ErrorContains(t, err, "collection duration metric") +} + +func TestNewInstrumentationObservabilityDisabled(t *testing.T) { + // Do not set OTEL_GO_X_OBSERVABILITY. + got, err := observ.NewInstrumentation(ID) + assert.NoError(t, err) + assert.Nil(t, got) +} + +// setup installs a ManualReader MeterProvider and returns an instantiated +// Instrumentation plus a collector that returns the single ScopeMetrics group. +func setup(t *testing.T) (*observ.Instrumentation, func() metricdata.ScopeMetrics) { + t.Helper() + + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + original := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(original) }) + + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + inst, err := observ.NewInstrumentation(ID) + require.NoError(t, err) + require.NotNil(t, inst) + + return inst, func() metricdata.ScopeMetrics { + var rm metricdata.ResourceMetrics + require.NoError(t, r.Collect(context.Background(), &rm)) + require.Len(t, rm.ScopeMetrics, 1) + return rm.ScopeMetrics[0] + } +} + +func set(err error) attribute.Set { + attrs := []attribute.KeyValue{ + semconv.OTelComponentName(observ.ComponentName(ID)), + semconv.OTelComponentTypeKey.String(observ.ComponentType), + } + if err != nil { + attrs = append(attrs, semconv.ErrorType(err)) + } + return attribute.NewSet(attrs...) +} + +func scrapeInflight() metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(), + Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: set(nil), Value: 0}, + }, + }, + } +} + +func scrapeExported(success, total int64, err error) metricdata.Metrics { + dps := []metricdata.DataPoint[int64]{ + {Attributes: set(nil), Value: success}, + } + if err != nil { + dps = append(dps, metricdata.DataPoint[int64]{ + Attributes: set(err), + Value: total - success, + }) + } + return metricdata.Metrics{ + Name: otelconv.SDKExporterMetricDataPointExported{}.Name(), + Description: otelconv.SDKExporterMetricDataPointExported{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } +} + +func operationDuration(err error) metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: set(err)}, + }, + }, + } +} + +func collectionDuration(err error) metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKMetricReaderCollectionDuration{}.Name(), + Description: otelconv.SDKMetricReaderCollectionDuration{}.Description(), + Unit: otelconv.SDKMetricReaderCollectionDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: set(err)}, + }, + }, + } +} + +func assertExportMetricsMetrics(t *testing.T, got metricdata.ScopeMetrics, total, success int64, err error) { + t.Helper() + + assert.Equal(t, Scope, got.Scope, "unexpected scope") + + m := got.Metrics + require.Len(t, m, 3, "expected 3 metrics (inflight, exported, operationDuration)") + + o := metricdatatest.IgnoreTimestamp() + + want := scrapeInflight() + metricdatatest.AssertEqual(t, want, m[0], o) + + want = scrapeExported(success, total, err) + metricdatatest.AssertEqual(t, want, m[1], o) + + want = operationDuration(err) + metricdatatest.AssertEqual(t, want, m[2], o, metricdatatest.IgnoreValue()) +} + +func assertCollectionOnly(t *testing.T, got metricdata.ScopeMetrics, err error) { + t.Helper() + + assert.Equal(t, Scope, got.Scope, "unexpected scope") + + m := got.Metrics + require.Len(t, m, 1, "expected only collectionDuration metric") + + o := metricdatatest.IgnoreTimestamp() + want := collectionDuration(err) + metricdatatest.AssertEqual(t, want, m[0], o, metricdatatest.IgnoreValue()) +} + +func TestInstrumentationExportMetricsSuccess(t *testing.T) { + inst, collect := setup(t) + + const n = 10 + endOp := inst.RecordOperationDuration(context.Background()) + end := inst.ExportMetrics(context.Background(), n) + + end(n, nil) + endOp(nil) + + assertExportMetricsMetrics(t, collect(), n, n, nil) +} + +func TestInstrumentationExportMetricsAllErrored(t *testing.T) { + inst, collect := setup(t) + + const n = 10 + err := assert.AnError + + endOp := inst.RecordOperationDuration(context.Background()) + end := inst.ExportMetrics(context.Background(), n) + + const success = 0 + end(success, err) + endOp(err) + + assertExportMetricsMetrics(t, collect(), n, success, err) +} + +func TestInstrumentationExportMetricsPartialErrored(t *testing.T) { + inst, collect := setup(t) + + const n = 10 + err := assert.AnError + + endOp := inst.RecordOperationDuration(context.Background()) + end := inst.ExportMetrics(context.Background(), n) + + const success = 5 + end(success, err) + endOp(err) + + assertExportMetricsMetrics(t, collect(), n, success, err) +} + +func TestRecordCollectionDurationSuccess(t *testing.T) { + inst, collect := setup(t) + + endCollection := inst.RecordCollectionDuration(context.Background()) + endCollection(nil) + + assertCollectionOnly(t, collect(), nil) +} + +func TestRecordCollectionDurationError(t *testing.T) { + inst, collect := setup(t) + + wantErr := assert.AnError + endCollection := inst.RecordCollectionDuration(context.Background()) + endCollection(wantErr) + + assertCollectionOnly(t, collect(), wantErr) +} + +func BenchmarkInstrumentationExportMetrics(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + inst, err := observ.NewInstrumentation(ID) + if err != nil { + b.Fatalf("failed to create instrumentation: %v", err) + } + + benchErr := errors.New("benchmark error") + ctx := context.Background() + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + end := inst.ExportMetrics(ctx, 10) + end(4, benchErr) + } +} + +func BenchmarkInstrumentationRecordOperationDuration(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + inst, err := observ.NewInstrumentation(ID) + if err != nil { + b.Fatalf("failed to create instrumentation: %v", err) + } + + benchErr := errors.New("benchmark error") + ctx := context.Background() + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + done := inst.RecordOperationDuration(ctx) + done(benchErr) + } +} + +func BenchmarkInstrumentationRecordCollectionDuration(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + inst, err := observ.NewInstrumentation(ID) + if err != nil { + b.Fatalf("failed to create instrumentation: %v", err) + } + + benchErr := errors.New("benchmark error") + ctx := context.Background() + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + endCollection := inst.RecordCollectionDuration(ctx) + endCollection(benchErr) + } +} diff --git a/exporters/prometheus/internal/version.go b/exporters/prometheus/internal/version.go new file mode 100644 index 000000000..79b603c84 --- /dev/null +++ b/exporters/prometheus/internal/version.go @@ -0,0 +1,9 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package internal provides internal utilities for the OpenTelemetry prometheus exporter. +package internal // import "go.opentelemetry.io/otel/exporters/prometheus/internal" + +// Version is the current release version of the OpenTelemetry prometheus +// exporter in use. +const Version = "0.60.0" diff --git a/exporters/prometheus/internal/x/README.md b/exporters/prometheus/internal/x/README.md new file mode 100644 index 000000000..f8c24eda2 --- /dev/null +++ b/exporters/prometheus/internal/x/README.md @@ -0,0 +1,37 @@ +# Experimental Features + +The `prometheus` exporter contains features that have not yet stabilized in the OpenTelemetry specification. +These features are added to the `prometheus` exporter prior to stabilization in the specification so that users can start experimenting with them and provide feedback. + +These features may change in backwards incompatible ways as feedback is applied. +See the [Compatibility and Stability](#compatibility-and-stability) section for more information. + +## Features + +- [Observability](#observability) + +### Observability + +The `prometheus` exporter can be configured to provide observability about itself using OpenTelemetry metrics. + +To opt-in, set the environment variable `OTEL_GO_X_OBSERVABILITY` to `true`. + +When enabled, the SDK will create the following metrics using the global `MeterProvider`: + +- `otel.sdk.exporter.metric_data_point.inflight` +- `otel.sdk.exporter.metric_data_point.exported` +- `otel.sdk.metric_reader.collection.duration` +- `otel.sdk.exporter.operation.duration` + +Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics. + +[Semantic conventions for OpenTelemetry SDK metrics]: https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/otel/sdk-metrics.md + +## Compatibility and Stability + +Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../VERSIONING.md). +These features may be removed or modified in successive version releases, including patch versions. + +When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release. +There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version. +If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support. diff --git a/exporters/prometheus/internal/x/x.go b/exporters/prometheus/internal/x/x.go new file mode 100644 index 000000000..de4fd5b09 --- /dev/null +++ b/exporters/prometheus/internal/x/x.go @@ -0,0 +1,72 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package x documents experimental features for [go.opentelemetry.io/otel/exporters/prometheus]. +package x // import "go.opentelemetry.io/otel/exporters/prometheus/internal/x" + +import ( + "os" + "strings" +) + +// Observability is an experimental feature flag that determines if exporter +// observability metrics are enabled. +// +// To enable this feature set the OTEL_GO_X_OBSERVABILITY environment variable +// to the case-insensitive string value of "true" (i.e. "True" and "TRUE" +// will also enable this). +var Observability = newFeature( + []string{"OBSERVABILITY"}, + func(v string) (string, bool) { + if strings.EqualFold(v, "true") { + return v, true + } + return "", false + }, +) + +// Feature is an experimental feature control flag. It provides a uniform way +// to interact with these feature flags and parse their values. +type Feature[T any] struct { + keys []string + parse func(v string) (T, bool) +} + +func newFeature[T any](suffix []string, parse func(string) (T, bool)) Feature[T] { + const envKeyRoot = "OTEL_GO_X_" + keys := make([]string, 0, len(suffix)) + for _, s := range suffix { + keys = append(keys, envKeyRoot+s) + } + return Feature[T]{ + keys: keys, + parse: parse, + } +} + +// Keys returns the environment variable keys that can be set to enable the +// feature. +func (f Feature[T]) Keys() []string { return f.keys } + +// Lookup returns the user configured value for the feature and true if the +// user has enabled the feature. Otherwise, if the feature is not enabled, a +// zero-value and false are returned. +func (f Feature[T]) Lookup() (v T, ok bool) { + // https://github.com/open-telemetry/opentelemetry-specification/blob/62effed618589a0bec416a87e559c0a9d96289bb/specification/configuration/sdk-environment-variables.md#parsing-empty-value + // + // > The SDK MUST interpret an empty value of an environment variable the + // > same way as when the variable is unset. + for _, key := range f.keys { + vRaw := os.Getenv(key) + if vRaw != "" { + return f.parse(vRaw) + } + } + return v, ok +} + +// Enabled reports whether the feature is enabled. +func (f Feature[T]) Enabled() bool { + _, ok := f.Lookup() + return ok +} diff --git a/exporters/prometheus/internal/x/x_test.go b/exporters/prometheus/internal/x/x_test.go new file mode 100644 index 000000000..c02cf3499 --- /dev/null +++ b/exporters/prometheus/internal/x/x_test.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestObservability(t *testing.T) { + const key = "OTEL_GO_X_OBSERVABILITY" + require.Contains(t, Observability.Keys(), key) + + t.Run("100", run(setenv(key, "100"), assertDisabled(Observability))) + t.Run("true", run(setenv(key, "true"), assertEnabled(Observability, "true"))) + t.Run("True", run(setenv(key, "True"), assertEnabled(Observability, "True"))) + t.Run("false", run(setenv(key, "false"), assertDisabled(Observability))) + t.Run("empty", run(assertDisabled(Observability))) +} + +func run(steps ...func(*testing.T)) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + for _, step := range steps { + step(t) + } + } +} + +func setenv(k, v string) func(t *testing.T) { //nolint:unparam // This is a reusable test utility function. + return func(t *testing.T) { t.Setenv(k, v) } +} + +func assertEnabled[T any](f Feature[T], want T) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + assert.True(t, f.Enabled(), "not enabled") + + v, ok := f.Lookup() + assert.True(t, ok, "Lookup state") + assert.Equal(t, want, v, "Lookup value") + } +} + +func assertDisabled[T any](f Feature[T]) func(*testing.T) { + var zero T + return func(t *testing.T) { + t.Helper() + + assert.False(t, f.Enabled(), "enabled") + + v, ok := f.Lookup() + assert.False(t, ok, "Lookup state") + assert.Equal(t, zero, v, "Lookup value") + } +} diff --git a/versions.yaml b/versions.yaml index f8f3c0348..1c1a4f5a1 100644 --- a/versions.yaml +++ b/versions.yaml @@ -46,3 +46,6 @@ modules: go.opentelemetry.io/otel/exporters/stdout/stdouttrace: version-refs: - ./exporters/stdout/stdouttrace/internal/version.go + go.opentelemetry.io/otel/exporters/prometheus: + version-refs: + - ./exporters/prometheus/internal/version.go