diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e6e47443..febf6a41c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#6710) - Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#6710) - Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#6710) +- Validate exponential histogram scale range for Prometheus compatibility in `go.opentelemetry.io/otel/exporters/prometheus`. (#6822) diff --git a/exporters/prometheus/exporter.go b/exporters/prometheus/exporter.go index 1493864e9..7b44c12c5 100644 --- a/exporters/prometheus/exporter.go +++ b/exporters/prometheus/exporter.go @@ -244,6 +244,59 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { } } +// downscaleExponentialBucket re-aggregates bucket counts when downscaling to a coarser resolution. +func downscaleExponentialBucket(bucket metricdata.ExponentialBucket, scaleDelta int32) metricdata.ExponentialBucket { + if len(bucket.Counts) == 0 || scaleDelta < 1 { + return metricdata.ExponentialBucket{ + Offset: bucket.Offset >> scaleDelta, + Counts: append([]uint64(nil), bucket.Counts...), // copy slice + } + } + + // The new offset is scaled down + newOffset := bucket.Offset >> scaleDelta + + // Pre-calculate the new bucket count to avoid growing slice + // Each group of 2^scaleDelta buckets will merge into one bucket + //nolint:gosec // Length is bounded by slice allocation + lastBucketIdx := bucket.Offset + int32(len(bucket.Counts)) - 1 + lastNewIdx := lastBucketIdx >> scaleDelta + newBucketCount := int(lastNewIdx - newOffset + 1) + + if newBucketCount <= 0 { + return metricdata.ExponentialBucket{ + Offset: newOffset, + Counts: []uint64{}, + } + } + + newCounts := make([]uint64, newBucketCount) + + // Merge buckets according to the scale difference + for i, count := range bucket.Counts { + if count == 0 { + continue + } + + // Calculate which new bucket this count belongs to + //nolint:gosec // Index is bounded by loop iteration + originalIdx := bucket.Offset + int32(i) + newIdx := originalIdx >> scaleDelta + + // Calculate the position in the new counts array + position := newIdx - newOffset + //nolint:gosec // Length is bounded by allocation + if position >= 0 && position < int32(len(newCounts)) { + newCounts[position] += count + } + } + + return metricdata.ExponentialBucket{ + Offset: newOffset, + Counts: newCounts, + } +} + func addExponentialHistogramMetric[N int64 | float64]( ch chan<- prometheus.Metric, histogram metricdata.ExponentialHistogram[N], @@ -258,23 +311,43 @@ func addExponentialHistogramMetric[N int64 | float64]( desc := prometheus.NewDesc(name, m.Description, keys, nil) + // Prometheus native histograms support scales in the range [-4, 8] + scale := dp.Scale + if scale < -4 { + // Reject scales below -4 as they cannot be represented in Prometheus + otel.Handle(fmt.Errorf( + "exponential histogram scale %d is below minimum supported scale -4, skipping data point", + scale)) + continue + } + + // If scale > 8, we need to downscale the buckets to match the clamped scale + positiveBucket := dp.PositiveBucket + negativeBucket := dp.NegativeBucket + if scale > 8 { + scaleDelta := scale - 8 + positiveBucket = downscaleExponentialBucket(dp.PositiveBucket, scaleDelta) + negativeBucket = downscaleExponentialBucket(dp.NegativeBucket, scaleDelta) + scale = 8 + } + // From spec: note that Prometheus Native Histograms buckets are indexed by upper boundary while Exponential Histograms are indexed by lower boundary, the result being that the Offset fields are different-by-one. positiveBuckets := make(map[int]int64) - for i, c := range dp.PositiveBucket.Counts { + for i, c := range positiveBucket.Counts { if c > math.MaxInt64 { otel.Handle(fmt.Errorf("positive count %d is too large to be represented as int64", c)) continue } - positiveBuckets[int(dp.PositiveBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. + positiveBuckets[int(positiveBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. } negativeBuckets := make(map[int]int64) - for i, c := range dp.NegativeBucket.Counts { + for i, c := range negativeBucket.Counts { if c > math.MaxInt64 { otel.Handle(fmt.Errorf("negative count %d is too large to be represented as int64", c)) continue } - negativeBuckets[int(dp.NegativeBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. + negativeBuckets[int(negativeBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above. } m, err := prometheus.NewConstNativeHistogram( @@ -284,7 +357,7 @@ func addExponentialHistogramMetric[N int64 | float64]( positiveBuckets, negativeBuckets, dp.ZeroCount, - dp.Scale, + scale, dp.ZeroThreshold, dp.StartTime, values...) diff --git a/exporters/prometheus/exporter_test.go b/exporters/prometheus/exporter_test.go index 6cd3e88d5..be993fd89 100644 --- a/exporters/prometheus/exporter_test.go +++ b/exporters/prometheus/exporter_test.go @@ -6,9 +6,11 @@ package prometheus import ( "context" "errors" + "math" "os" "sync" "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" @@ -21,6 +23,7 @@ import ( "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.34.0" "go.opentelemetry.io/otel/trace" @@ -1163,3 +1166,445 @@ func TestExemplars(t *testing.T) { }) } } + +func TestExponentialHistogramScaleValidation(t *testing.T) { + ctx := context.Background() + + t.Run("normal_exponential_histogram_works", func(t *testing.T) { + registry := prometheus.NewRegistry() + exporter, err := New(WithRegisterer(registry), WithoutTargetInfo(), WithoutScopeInfo()) + require.NoError(t, err) + + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(resource.Default()), + ) + defer func() { + err := provider.Shutdown(ctx) + require.NoError(t, err) + }() + + // Create a histogram with a valid scale + meter := provider.Meter("test") + hist, err := meter.Float64Histogram( + "test_exponential_histogram", + otelmetric.WithDescription("test histogram"), + ) + require.NoError(t, err) + hist.Record(ctx, 1.0) + hist.Record(ctx, 10.0) + hist.Record(ctx, 100.0) + + metricFamilies, err := registry.Gather() + require.NoError(t, err) + assert.NotEmpty(t, metricFamilies) + }) + + t.Run("error_handling_for_invalid_scales", func(t *testing.T) { + var capturedError error + originalHandler := otel.GetErrorHandler() + otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) { + capturedError = err + })) + defer otel.SetErrorHandler(originalHandler) + + now := time.Now() + invalidScaleData := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 1, + Sum: 10.0, + Scale: -5, // Invalid scale below -4 + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{1}, + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{}, + }, + } + + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{invalidScaleData}, + } + + m := metricdata.Metrics{ + Name: "test_histogram", + Description: "test", + } + + addExponentialHistogramMetric(ch, histogram, m, "test_histogram", keyVals{}) + assert.Error(t, capturedError) + assert.Contains(t, capturedError.Error(), "scale -5 is below minimum") + select { + case <-ch: + t.Error("Expected no metrics to be produced for invalid scale") + default: + // No metrics were produced for the invalid scale + } + }) +} + +func TestDownscaleExponentialBucket(t *testing.T) { + tests := []struct { + name string + bucket metricdata.ExponentialBucket + scaleDelta int32 + want metricdata.ExponentialBucket + }{ + { + name: "Empty bucket", + bucket: metricdata.ExponentialBucket{}, + scaleDelta: 3, + want: metricdata.ExponentialBucket{}, + }, + { + name: "1 size bucket", + bucket: metricdata.ExponentialBucket{ + Offset: 50, + Counts: []uint64{7}, + }, + scaleDelta: 4, + want: metricdata.ExponentialBucket{ + Offset: 3, + Counts: []uint64{7}, + }, + }, + { + name: "zero scale delta", + bucket: metricdata.ExponentialBucket{ + Offset: 50, + Counts: []uint64{7, 5}, + }, + scaleDelta: 0, + want: metricdata.ExponentialBucket{ + Offset: 50, + Counts: []uint64{7, 5}, + }, + }, + { + name: "aligned bucket scale 1", + bucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + scaleDelta: 1, + want: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{3, 7, 11}, + }, + }, + { + name: "aligned bucket scale 2", + bucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + scaleDelta: 2, + want: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{10, 11}, + }, + }, + { + name: "unaligned bucket scale 1", + bucket: metricdata.ExponentialBucket{ + Offset: 5, + Counts: []uint64{1, 2, 3, 4, 5, 6}, + }, // This is equivalent to [0,0,0,0,0,1,2,3,4,5,6] + scaleDelta: 1, + want: metricdata.ExponentialBucket{ + Offset: 2, + Counts: []uint64{1, 5, 9, 6}, + }, // This is equivalent to [0,0,1,5,9,6] + }, + { + name: "negative startBin", + bucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 0, 3}, + }, + scaleDelta: 1, + want: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 3}, + }, + }, + { + name: "negative startBin 2", + bucket: metricdata.ExponentialBucket{ + Offset: -4, + Counts: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }, + scaleDelta: 1, + want: metricdata.ExponentialBucket{ + Offset: -2, + Counts: []uint64{3, 7, 11, 15, 19}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := downscaleExponentialBucket(tt.bucket, tt.scaleDelta) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestExponentialHistogramHighScaleDownscaling(t *testing.T) { + t.Run("scale_10_downscales_to_8", func(t *testing.T) { + // Test that scale 10 gets properly downscaled to 8 with correct bucket re-aggregation + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create an exponential histogram data point with scale 10 + dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 8, + Sum: 55.0, + Scale: 10, // This should be downscaled to 8 + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 1, 1, 1, 1, 1, 1, 1}, // 8 buckets with 1 count each + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{}, + }, + } + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_high_scale_histogram", + Description: "test histogram with high scale", + } + + // This should not produce any errors and should properly downscale buckets + addExponentialHistogramMetric(ch, histogram, m, "test_high_scale_histogram", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + // Check that the metric was created successfully + require.NotNil(t, metric) + + // The scale should have been clamped to 8, and buckets should be re-aggregated + // With scale 10 -> 8, we have a scaleDelta of 2, meaning 2^2 = 4 buckets merge into 1 + // Original: 8 buckets with 1 count each at scale 10 + // After downscaling: 2 buckets with 4 counts each at scale 8 + default: + t.Error("Expected a metric to be produced") + } + }) + + t.Run("scale_12_downscales_to_8", func(t *testing.T) { + // Test that scale 12 gets properly downscaled to 8 with correct bucket re-aggregation + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create an exponential histogram data point with scale 12 + dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 16, + Sum: 120.0, + Scale: 12, // This should be downscaled to 8 + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, // 16 buckets with 1 count each + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{}, + }, + } + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_very_high_scale_histogram", + Description: "test histogram with very high scale", + } + + // This should not produce any errors and should properly downscale buckets + addExponentialHistogramMetric(ch, histogram, m, "test_very_high_scale_histogram", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + // Check that the metric was created successfully + require.NotNil(t, metric) + + // The scale should have been clamped to 8, and buckets should be re-aggregated + // With scale 12 -> 8, we have a scaleDelta of 4, meaning 2^4 = 16 buckets merge into 1 + // Original: 16 buckets with 1 count each at scale 12 + // After downscaling: 1 bucket with 16 counts at scale 8 + default: + t.Error("Expected a metric to be produced") + } + }) + + t.Run("exponential_histogram_with_negative_buckets", func(t *testing.T) { + // Test that exponential histograms with negative buckets are handled correctly + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create an exponential histogram data point with both positive and negative buckets + dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 6, + Sum: 25.0, + Scale: 2, + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{1, 2}, // 2 positive buckets + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{2, 1}, // 2 negative buckets + }, + } + + histogram := metricdata.ExponentialHistogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_histogram_with_negative_buckets", + Description: "test histogram with negative buckets", + } + + // This should handle negative buckets correctly + addExponentialHistogramMetric(ch, histogram, m, "test_histogram_with_negative_buckets", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + require.NotNil(t, metric) + default: + t.Error("Expected a metric to be produced") + } + }) + + t.Run("exponential_histogram_int64_type", func(t *testing.T) { + // Test that int64 exponential histograms are handled correctly + ch := make(chan prometheus.Metric, 10) + defer close(ch) + + now := time.Now() + + // Create an exponential histogram data point with int64 type + dataPoint := metricdata.ExponentialHistogramDataPoint[int64]{ + Attributes: attribute.NewSet(), + StartTime: now, + Time: now, + Count: 4, + Sum: 20, + Scale: 3, + ZeroCount: 0, + ZeroThreshold: 0.0, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{1, 1, 1, 1}, // 4 buckets with 1 count each + }, + NegativeBucket: metricdata.ExponentialBucket{ + Offset: 0, + Counts: []uint64{}, + }, + } + + histogram := metricdata.ExponentialHistogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[int64]{dataPoint}, + } + + m := metricdata.Metrics{ + Name: "test_int64_exponential_histogram", + Description: "test int64 exponential histogram", + } + + // This should handle int64 exponential histograms correctly + addExponentialHistogramMetric(ch, histogram, m, "test_int64_exponential_histogram", keyVals{}) + + // Verify a metric was produced + select { + case metric := <-ch: + require.NotNil(t, metric) + default: + t.Error("Expected a metric to be produced") + } + }) +} + +func TestDownscaleExponentialBucketEdgeCases(t *testing.T) { + t.Run("min_idx_larger_than_current", func(t *testing.T) { + // Test case where we find a minIdx that's smaller than the current + bucket := metricdata.ExponentialBucket{ + Offset: 10, // Start at offset 10 + Counts: []uint64{1, 0, 0, 0, 1}, + } + + // Scale delta of 3 will cause downscaling: original indices 10->1, 14->1 + result := downscaleExponentialBucket(bucket, 3) + + // Both original buckets 10 and 14 should map to the same downscaled bucket at index 1 + expected := metricdata.ExponentialBucket{ + Offset: 1, + Counts: []uint64{2}, // Both counts combined + } + + assert.Equal(t, expected, result) + }) + + t.Run("empty_downscaled_counts", func(t *testing.T) { + // Create a scenario that results in empty downscaled counts + bucket := metricdata.ExponentialBucket{ + Offset: math.MaxInt32 - 5, // Very large offset that won't cause overflow in this case + Counts: []uint64{1, 1, 1, 1, 1}, + } + + // This should work normally and downscale the buckets + result := downscaleExponentialBucket(bucket, 1) + + // Should return bucket with downscaled values + expected := metricdata.ExponentialBucket{ + Offset: 1073741821, // ((MaxInt32-5) + 0) >> 1 = 1073741821 + Counts: []uint64{2, 2, 1}, // Buckets get combined during downscaling + } + + assert.Equal(t, expected, result) + }) +}