1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-08-10 22:31:50 +02:00

prometheus: validate exponential histogram scale range (#6779) (#6822)

Fixes #6779

### Changes
- Added scale validation for Prometheus exponential histograms in
`addExponentialHistogramMetric`
- Rejects scales below -4 (logs error and skips data point)
- Clamps scales above 8 down to 8 (logs warning)
- Prometheus native histograms support scales in range [-4, 8]

### Testing
- All existing tests pass
- Added `TestExponentialHistogramScaleValidation` to verify error
handling doesn't break normal operation

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
Co-authored-by: David Ashpole <dashpole@google.com>
Co-authored-by: Damien Mathieu <42@dmathieu.com>
This commit is contained in:
Peter Bryant
2025-06-16 02:28:41 -05:00
committed by GitHub
parent f2058facfc
commit cba6502b61
3 changed files with 524 additions and 5 deletions

View File

@@ -42,6 +42,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#6710)
- Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#6710)
- Stop stripping trailing slashes from configured endpoint URL in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#6710)
- Validate exponential histogram scale range for Prometheus compatibility in `go.opentelemetry.io/otel/exporters/prometheus`. (#6822)
<!-- Released section -->
<!-- Don't change this section unless doing release -->

View File

@@ -244,6 +244,59 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
}
}
// downscaleExponentialBucket re-aggregates bucket counts when downscaling to a coarser resolution.
func downscaleExponentialBucket(bucket metricdata.ExponentialBucket, scaleDelta int32) metricdata.ExponentialBucket {
if len(bucket.Counts) == 0 || scaleDelta < 1 {
return metricdata.ExponentialBucket{
Offset: bucket.Offset >> scaleDelta,
Counts: append([]uint64(nil), bucket.Counts...), // copy slice
}
}
// The new offset is scaled down
newOffset := bucket.Offset >> scaleDelta
// Pre-calculate the new bucket count to avoid growing slice
// Each group of 2^scaleDelta buckets will merge into one bucket
//nolint:gosec // Length is bounded by slice allocation
lastBucketIdx := bucket.Offset + int32(len(bucket.Counts)) - 1
lastNewIdx := lastBucketIdx >> scaleDelta
newBucketCount := int(lastNewIdx - newOffset + 1)
if newBucketCount <= 0 {
return metricdata.ExponentialBucket{
Offset: newOffset,
Counts: []uint64{},
}
}
newCounts := make([]uint64, newBucketCount)
// Merge buckets according to the scale difference
for i, count := range bucket.Counts {
if count == 0 {
continue
}
// Calculate which new bucket this count belongs to
//nolint:gosec // Index is bounded by loop iteration
originalIdx := bucket.Offset + int32(i)
newIdx := originalIdx >> scaleDelta
// Calculate the position in the new counts array
position := newIdx - newOffset
//nolint:gosec // Length is bounded by allocation
if position >= 0 && position < int32(len(newCounts)) {
newCounts[position] += count
}
}
return metricdata.ExponentialBucket{
Offset: newOffset,
Counts: newCounts,
}
}
func addExponentialHistogramMetric[N int64 | float64](
ch chan<- prometheus.Metric,
histogram metricdata.ExponentialHistogram[N],
@@ -258,23 +311,43 @@ func addExponentialHistogramMetric[N int64 | float64](
desc := prometheus.NewDesc(name, m.Description, keys, nil)
// Prometheus native histograms support scales in the range [-4, 8]
scale := dp.Scale
if scale < -4 {
// Reject scales below -4 as they cannot be represented in Prometheus
otel.Handle(fmt.Errorf(
"exponential histogram scale %d is below minimum supported scale -4, skipping data point",
scale))
continue
}
// If scale > 8, we need to downscale the buckets to match the clamped scale
positiveBucket := dp.PositiveBucket
negativeBucket := dp.NegativeBucket
if scale > 8 {
scaleDelta := scale - 8
positiveBucket = downscaleExponentialBucket(dp.PositiveBucket, scaleDelta)
negativeBucket = downscaleExponentialBucket(dp.NegativeBucket, scaleDelta)
scale = 8
}
// From spec: note that Prometheus Native Histograms buckets are indexed by upper boundary while Exponential Histograms are indexed by lower boundary, the result being that the Offset fields are different-by-one.
positiveBuckets := make(map[int]int64)
for i, c := range dp.PositiveBucket.Counts {
for i, c := range positiveBucket.Counts {
if c > math.MaxInt64 {
otel.Handle(fmt.Errorf("positive count %d is too large to be represented as int64", c))
continue
}
positiveBuckets[int(dp.PositiveBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above.
positiveBuckets[int(positiveBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above.
}
negativeBuckets := make(map[int]int64)
for i, c := range dp.NegativeBucket.Counts {
for i, c := range negativeBucket.Counts {
if c > math.MaxInt64 {
otel.Handle(fmt.Errorf("negative count %d is too large to be represented as int64", c))
continue
}
negativeBuckets[int(dp.NegativeBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above.
negativeBuckets[int(negativeBucket.Offset)+i+1] = int64(c) // nolint: gosec // Size check above.
}
m, err := prometheus.NewConstNativeHistogram(
@@ -284,7 +357,7 @@ func addExponentialHistogramMetric[N int64 | float64](
positiveBuckets,
negativeBuckets,
dp.ZeroCount,
dp.Scale,
scale,
dp.ZeroThreshold,
dp.StartTime,
values...)

View File

@@ -6,9 +6,11 @@ package prometheus
import (
"context"
"errors"
"math"
"os"
"sync"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
@@ -21,6 +23,7 @@ import (
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.34.0"
"go.opentelemetry.io/otel/trace"
@@ -1163,3 +1166,445 @@ func TestExemplars(t *testing.T) {
})
}
}
func TestExponentialHistogramScaleValidation(t *testing.T) {
ctx := context.Background()
t.Run("normal_exponential_histogram_works", func(t *testing.T) {
registry := prometheus.NewRegistry()
exporter, err := New(WithRegisterer(registry), WithoutTargetInfo(), WithoutScopeInfo())
require.NoError(t, err)
provider := metric.NewMeterProvider(
metric.WithReader(exporter),
metric.WithResource(resource.Default()),
)
defer func() {
err := provider.Shutdown(ctx)
require.NoError(t, err)
}()
// Create a histogram with a valid scale
meter := provider.Meter("test")
hist, err := meter.Float64Histogram(
"test_exponential_histogram",
otelmetric.WithDescription("test histogram"),
)
require.NoError(t, err)
hist.Record(ctx, 1.0)
hist.Record(ctx, 10.0)
hist.Record(ctx, 100.0)
metricFamilies, err := registry.Gather()
require.NoError(t, err)
assert.NotEmpty(t, metricFamilies)
})
t.Run("error_handling_for_invalid_scales", func(t *testing.T) {
var capturedError error
originalHandler := otel.GetErrorHandler()
otel.SetErrorHandler(otel.ErrorHandlerFunc(func(err error) {
capturedError = err
}))
defer otel.SetErrorHandler(originalHandler)
now := time.Now()
invalidScaleData := metricdata.ExponentialHistogramDataPoint[float64]{
Attributes: attribute.NewSet(),
StartTime: now,
Time: now,
Count: 1,
Sum: 10.0,
Scale: -5, // Invalid scale below -4
ZeroCount: 0,
ZeroThreshold: 0.0,
PositiveBucket: metricdata.ExponentialBucket{
Offset: 1,
Counts: []uint64{1},
},
NegativeBucket: metricdata.ExponentialBucket{
Offset: 1,
Counts: []uint64{},
},
}
ch := make(chan prometheus.Metric, 10)
defer close(ch)
histogram := metricdata.ExponentialHistogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{invalidScaleData},
}
m := metricdata.Metrics{
Name: "test_histogram",
Description: "test",
}
addExponentialHistogramMetric(ch, histogram, m, "test_histogram", keyVals{})
assert.Error(t, capturedError)
assert.Contains(t, capturedError.Error(), "scale -5 is below minimum")
select {
case <-ch:
t.Error("Expected no metrics to be produced for invalid scale")
default:
// No metrics were produced for the invalid scale
}
})
}
func TestDownscaleExponentialBucket(t *testing.T) {
tests := []struct {
name string
bucket metricdata.ExponentialBucket
scaleDelta int32
want metricdata.ExponentialBucket
}{
{
name: "Empty bucket",
bucket: metricdata.ExponentialBucket{},
scaleDelta: 3,
want: metricdata.ExponentialBucket{},
},
{
name: "1 size bucket",
bucket: metricdata.ExponentialBucket{
Offset: 50,
Counts: []uint64{7},
},
scaleDelta: 4,
want: metricdata.ExponentialBucket{
Offset: 3,
Counts: []uint64{7},
},
},
{
name: "zero scale delta",
bucket: metricdata.ExponentialBucket{
Offset: 50,
Counts: []uint64{7, 5},
},
scaleDelta: 0,
want: metricdata.ExponentialBucket{
Offset: 50,
Counts: []uint64{7, 5},
},
},
{
name: "aligned bucket scale 1",
bucket: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{1, 2, 3, 4, 5, 6},
},
scaleDelta: 1,
want: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{3, 7, 11},
},
},
{
name: "aligned bucket scale 2",
bucket: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{1, 2, 3, 4, 5, 6},
},
scaleDelta: 2,
want: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{10, 11},
},
},
{
name: "unaligned bucket scale 1",
bucket: metricdata.ExponentialBucket{
Offset: 5,
Counts: []uint64{1, 2, 3, 4, 5, 6},
}, // This is equivalent to [0,0,0,0,0,1,2,3,4,5,6]
scaleDelta: 1,
want: metricdata.ExponentialBucket{
Offset: 2,
Counts: []uint64{1, 5, 9, 6},
}, // This is equivalent to [0,0,1,5,9,6]
},
{
name: "negative startBin",
bucket: metricdata.ExponentialBucket{
Offset: -1,
Counts: []uint64{1, 0, 3},
},
scaleDelta: 1,
want: metricdata.ExponentialBucket{
Offset: -1,
Counts: []uint64{1, 3},
},
},
{
name: "negative startBin 2",
bucket: metricdata.ExponentialBucket{
Offset: -4,
Counts: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
},
scaleDelta: 1,
want: metricdata.ExponentialBucket{
Offset: -2,
Counts: []uint64{3, 7, 11, 15, 19},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := downscaleExponentialBucket(tt.bucket, tt.scaleDelta)
assert.Equal(t, tt.want, got)
})
}
}
func TestExponentialHistogramHighScaleDownscaling(t *testing.T) {
t.Run("scale_10_downscales_to_8", func(t *testing.T) {
// Test that scale 10 gets properly downscaled to 8 with correct bucket re-aggregation
ch := make(chan prometheus.Metric, 10)
defer close(ch)
now := time.Now()
// Create an exponential histogram data point with scale 10
dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{
Attributes: attribute.NewSet(),
StartTime: now,
Time: now,
Count: 8,
Sum: 55.0,
Scale: 10, // This should be downscaled to 8
ZeroCount: 0,
ZeroThreshold: 0.0,
PositiveBucket: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{1, 1, 1, 1, 1, 1, 1, 1}, // 8 buckets with 1 count each
},
NegativeBucket: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{},
},
}
histogram := metricdata.ExponentialHistogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint},
}
m := metricdata.Metrics{
Name: "test_high_scale_histogram",
Description: "test histogram with high scale",
}
// This should not produce any errors and should properly downscale buckets
addExponentialHistogramMetric(ch, histogram, m, "test_high_scale_histogram", keyVals{})
// Verify a metric was produced
select {
case metric := <-ch:
// Check that the metric was created successfully
require.NotNil(t, metric)
// The scale should have been clamped to 8, and buckets should be re-aggregated
// With scale 10 -> 8, we have a scaleDelta of 2, meaning 2^2 = 4 buckets merge into 1
// Original: 8 buckets with 1 count each at scale 10
// After downscaling: 2 buckets with 4 counts each at scale 8
default:
t.Error("Expected a metric to be produced")
}
})
t.Run("scale_12_downscales_to_8", func(t *testing.T) {
// Test that scale 12 gets properly downscaled to 8 with correct bucket re-aggregation
ch := make(chan prometheus.Metric, 10)
defer close(ch)
now := time.Now()
// Create an exponential histogram data point with scale 12
dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{
Attributes: attribute.NewSet(),
StartTime: now,
Time: now,
Count: 16,
Sum: 120.0,
Scale: 12, // This should be downscaled to 8
ZeroCount: 0,
ZeroThreshold: 0.0,
PositiveBucket: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}, // 16 buckets with 1 count each
},
NegativeBucket: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{},
},
}
histogram := metricdata.ExponentialHistogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint},
}
m := metricdata.Metrics{
Name: "test_very_high_scale_histogram",
Description: "test histogram with very high scale",
}
// This should not produce any errors and should properly downscale buckets
addExponentialHistogramMetric(ch, histogram, m, "test_very_high_scale_histogram", keyVals{})
// Verify a metric was produced
select {
case metric := <-ch:
// Check that the metric was created successfully
require.NotNil(t, metric)
// The scale should have been clamped to 8, and buckets should be re-aggregated
// With scale 12 -> 8, we have a scaleDelta of 4, meaning 2^4 = 16 buckets merge into 1
// Original: 16 buckets with 1 count each at scale 12
// After downscaling: 1 bucket with 16 counts at scale 8
default:
t.Error("Expected a metric to be produced")
}
})
t.Run("exponential_histogram_with_negative_buckets", func(t *testing.T) {
// Test that exponential histograms with negative buckets are handled correctly
ch := make(chan prometheus.Metric, 10)
defer close(ch)
now := time.Now()
// Create an exponential histogram data point with both positive and negative buckets
dataPoint := metricdata.ExponentialHistogramDataPoint[float64]{
Attributes: attribute.NewSet(),
StartTime: now,
Time: now,
Count: 6,
Sum: 25.0,
Scale: 2,
ZeroCount: 0,
ZeroThreshold: 0.0,
PositiveBucket: metricdata.ExponentialBucket{
Offset: 1,
Counts: []uint64{1, 2}, // 2 positive buckets
},
NegativeBucket: metricdata.ExponentialBucket{
Offset: 1,
Counts: []uint64{2, 1}, // 2 negative buckets
},
}
histogram := metricdata.ExponentialHistogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.ExponentialHistogramDataPoint[float64]{dataPoint},
}
m := metricdata.Metrics{
Name: "test_histogram_with_negative_buckets",
Description: "test histogram with negative buckets",
}
// This should handle negative buckets correctly
addExponentialHistogramMetric(ch, histogram, m, "test_histogram_with_negative_buckets", keyVals{})
// Verify a metric was produced
select {
case metric := <-ch:
require.NotNil(t, metric)
default:
t.Error("Expected a metric to be produced")
}
})
t.Run("exponential_histogram_int64_type", func(t *testing.T) {
// Test that int64 exponential histograms are handled correctly
ch := make(chan prometheus.Metric, 10)
defer close(ch)
now := time.Now()
// Create an exponential histogram data point with int64 type
dataPoint := metricdata.ExponentialHistogramDataPoint[int64]{
Attributes: attribute.NewSet(),
StartTime: now,
Time: now,
Count: 4,
Sum: 20,
Scale: 3,
ZeroCount: 0,
ZeroThreshold: 0.0,
PositiveBucket: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{1, 1, 1, 1}, // 4 buckets with 1 count each
},
NegativeBucket: metricdata.ExponentialBucket{
Offset: 0,
Counts: []uint64{},
},
}
histogram := metricdata.ExponentialHistogram[int64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.ExponentialHistogramDataPoint[int64]{dataPoint},
}
m := metricdata.Metrics{
Name: "test_int64_exponential_histogram",
Description: "test int64 exponential histogram",
}
// This should handle int64 exponential histograms correctly
addExponentialHistogramMetric(ch, histogram, m, "test_int64_exponential_histogram", keyVals{})
// Verify a metric was produced
select {
case metric := <-ch:
require.NotNil(t, metric)
default:
t.Error("Expected a metric to be produced")
}
})
}
func TestDownscaleExponentialBucketEdgeCases(t *testing.T) {
t.Run("min_idx_larger_than_current", func(t *testing.T) {
// Test case where we find a minIdx that's smaller than the current
bucket := metricdata.ExponentialBucket{
Offset: 10, // Start at offset 10
Counts: []uint64{1, 0, 0, 0, 1},
}
// Scale delta of 3 will cause downscaling: original indices 10->1, 14->1
result := downscaleExponentialBucket(bucket, 3)
// Both original buckets 10 and 14 should map to the same downscaled bucket at index 1
expected := metricdata.ExponentialBucket{
Offset: 1,
Counts: []uint64{2}, // Both counts combined
}
assert.Equal(t, expected, result)
})
t.Run("empty_downscaled_counts", func(t *testing.T) {
// Create a scenario that results in empty downscaled counts
bucket := metricdata.ExponentialBucket{
Offset: math.MaxInt32 - 5, // Very large offset that won't cause overflow in this case
Counts: []uint64{1, 1, 1, 1, 1},
}
// This should work normally and downscale the buckets
result := downscaleExponentialBucket(bucket, 1)
// Should return bucket with downscaled values
expected := metricdata.ExponentialBucket{
Offset: 1073741821, // ((MaxInt32-5) + 0) >> 1 = 1073741821
Counts: []uint64{2, 2, 1}, // Buckets get combined during downscaling
}
assert.Equal(t, expected, result)
})
}