You've already forked opentelemetry-go
							
							
				mirror of
				https://github.com/open-telemetry/opentelemetry-go.git
				synced 2025-10-31 00:07:40 +02:00 
			
		
		
		
	prometheus: add histogram support to exporter (#601)
This PR adds histogram support to the prometheus exporter. - Adds a new aggregator selector that returns a histogram for `MeasureKind`. The selector can be constructed using `simple.NewWithHistogramMeasure` - Adds support for histogram aggregators in prometheus collect method. With this PR, the default selector is changed to use histograms. In order to support the prometheus histogram, the `aggregator.Histogram` interface is extended with the `Sum` method. fixes #487 Co-authored-by: Rahul Patel <rahulpa@google.com>
This commit is contained in:
		| @@ -43,7 +43,8 @@ type Exporter struct { | ||||
| 	snapshot export.CheckpointSet | ||||
| 	onError  func(error) | ||||
|  | ||||
| 	defaultSummaryQuantiles []float64 | ||||
| 	defaultSummaryQuantiles    []float64 | ||||
| 	defaultHistogramBoundaries []core.Number | ||||
| } | ||||
|  | ||||
| var _ export.Exporter = &Exporter{} | ||||
| @@ -73,6 +74,10 @@ type Config struct { | ||||
| 	// to use. Use nil to specify the system-default summary quantiles. | ||||
| 	DefaultSummaryQuantiles []float64 | ||||
|  | ||||
| 	// DefaultHistogramBoundaries defines the default histogram bucket | ||||
| 	// boundaries. | ||||
| 	DefaultHistogramBoundaries []core.Number | ||||
|  | ||||
| 	// OnError is a function that handle errors that may occur while exporting metrics. | ||||
| 	// TODO: This should be refactored or even removed once we have a better error handling mechanism. | ||||
| 	OnError func(error) | ||||
| @@ -100,11 +105,12 @@ func NewRawExporter(config Config) (*Exporter, error) { | ||||
| 	} | ||||
|  | ||||
| 	e := &Exporter{ | ||||
| 		handler:                 promhttp.HandlerFor(config.Gatherer, promhttp.HandlerOpts{}), | ||||
| 		registerer:              config.Registerer, | ||||
| 		gatherer:                config.Gatherer, | ||||
| 		defaultSummaryQuantiles: config.DefaultSummaryQuantiles, | ||||
| 		onError:                 config.OnError, | ||||
| 		handler:                    promhttp.HandlerFor(config.Gatherer, promhttp.HandlerOpts{}), | ||||
| 		registerer:                 config.Registerer, | ||||
| 		gatherer:                   config.Gatherer, | ||||
| 		defaultSummaryQuantiles:    config.DefaultSummaryQuantiles, | ||||
| 		defaultHistogramBoundaries: config.DefaultHistogramBoundaries, | ||||
| 		onError:                    config.OnError, | ||||
| 	} | ||||
|  | ||||
| 	c := newCollector(e) | ||||
| @@ -138,7 +144,7 @@ func InstallNewPipeline(config Config) (*push.Controller, http.HandlerFunc, erro | ||||
| // NewExportPipeline sets up a complete export pipeline with the recommended setup, | ||||
| // chaining a NewRawExporter into the recommended selectors and batchers. | ||||
| func NewExportPipeline(config Config, period time.Duration) (*push.Controller, http.HandlerFunc, error) { | ||||
| 	selector := simple.NewWithExactMeasure() | ||||
| 	selector := simple.NewWithHistogramMeasure(config.DefaultHistogramBoundaries) | ||||
| 	exporter, err := NewRawExporter(config) | ||||
| 	if err != nil { | ||||
| 		return nil, nil, err | ||||
| @@ -204,10 +210,9 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { | ||||
| 		labels := labelValues(record.Labels()) | ||||
| 		desc := c.toDesc(&record) | ||||
|  | ||||
| 		// TODO: implement histogram export when the histogram aggregation is done. | ||||
| 		//  https://github.com/open-telemetry/opentelemetry-go/issues/317 | ||||
|  | ||||
| 		if dist, ok := agg.(aggregator.Distribution); ok { | ||||
| 		if hist, ok := agg.(aggregator.Histogram); ok { | ||||
| 			c.exportHistogram(ch, hist, numberKind, desc, labels) | ||||
| 		} else if dist, ok := agg.(aggregator.Distribution); ok { | ||||
| 			// TODO: summaries values are never being resetted. | ||||
| 			//  As measures are recorded, new records starts to have less impact on these summaries. | ||||
| 			//  We should implement an solution that is similar to the Prometheus Clients | ||||
| @@ -287,6 +292,39 @@ func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.D | ||||
| 	ch <- m | ||||
| } | ||||
|  | ||||
| func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregator.Histogram, kind core.NumberKind, desc *prometheus.Desc, labels []string) { | ||||
| 	buckets, err := hist.Histogram() | ||||
| 	if err != nil { | ||||
| 		c.exp.onError(err) | ||||
| 		return | ||||
| 	} | ||||
| 	sum, err := hist.Sum() | ||||
| 	if err != nil { | ||||
| 		c.exp.onError(err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	var totalCount uint64 | ||||
| 	// counts maps from the bucket upper-bound to the cumulative count. | ||||
| 	// The bucket with upper-bound +inf is not included. | ||||
| 	counts := make(map[float64]uint64, len(buckets.Boundaries)) | ||||
| 	for i := range buckets.Boundaries { | ||||
| 		boundary := buckets.Boundaries[i].CoerceToFloat64(kind) | ||||
| 		totalCount += buckets.Counts[i].AsUint64() | ||||
| 		counts[boundary] = totalCount | ||||
| 	} | ||||
| 	// Include the +inf bucket in the total count. | ||||
| 	totalCount += buckets.Counts[len(buckets.Counts)-1].AsUint64() | ||||
|  | ||||
| 	m, err := prometheus.NewConstHistogram(desc, totalCount, sum.CoerceToFloat64(kind), counts, labels...) | ||||
| 	if err != nil { | ||||
| 		c.exp.onError(err) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	ch <- m | ||||
| } | ||||
|  | ||||
| func (c *collector) toDesc(record *export.Record) *prometheus.Desc { | ||||
| 	desc := record.Descriptor() | ||||
| 	labels := labelsKeys(record.Labels()) | ||||
|   | ||||
| @@ -49,6 +49,8 @@ func TestPrometheusExporter(t *testing.T) { | ||||
| 		"lastvalue", metric.ObserverKind, core.Float64NumberKind) | ||||
| 	measure := metric.NewDescriptor( | ||||
| 		"measure", metric.MeasureKind, core.Float64NumberKind) | ||||
| 	histogramMeasure := metric.NewDescriptor( | ||||
| 		"histogram_measure", metric.MeasureKind, core.Float64NumberKind) | ||||
|  | ||||
| 	labels := []core.KeyValue{ | ||||
| 		key.New("A").String("B"), | ||||
| @@ -70,6 +72,18 @@ func TestPrometheusExporter(t *testing.T) { | ||||
| 	expected = append(expected, `measure_sum{A="B",C="D"} 45`) | ||||
| 	expected = append(expected, `measure_count{A="B",C="D"} 3`) | ||||
|  | ||||
| 	boundaries := []core.Number{core.NewFloat64Number(-0.5), core.NewFloat64Number(1)} | ||||
| 	checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.6, labels...) | ||||
| 	checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.4, labels...) | ||||
| 	checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, 0.6, labels...) | ||||
| 	checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, 20, labels...) | ||||
|  | ||||
| 	expected = append(expected, `histogram_measure_bucket{A="B",C="D",le="+Inf"} 4`) | ||||
| 	expected = append(expected, `histogram_measure_bucket{A="B",C="D",le="-0.5"} 1`) | ||||
| 	expected = append(expected, `histogram_measure_bucket{A="B",C="D",le="1"} 3`) | ||||
| 	expected = append(expected, `histogram_measure_count{A="B",C="D"} 4`) | ||||
| 	expected = append(expected, `histogram_measure_sum{A="B",C="D"} 19.6`) | ||||
|  | ||||
| 	missingLabels := []core.KeyValue{ | ||||
| 		key.New("A").String("E"), | ||||
| 		key.New("C").String(""), | ||||
| @@ -88,6 +102,19 @@ func TestPrometheusExporter(t *testing.T) { | ||||
| 	expected = append(expected, `measure_count{A="E",C=""} 1`) | ||||
| 	expected = append(expected, `measure_sum{A="E",C=""} 19`) | ||||
|  | ||||
| 	boundaries = []core.Number{core.NewFloat64Number(0), core.NewFloat64Number(1)} | ||||
| 	checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.6, missingLabels...) | ||||
| 	checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.4, missingLabels...) | ||||
| 	checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.1, missingLabels...) | ||||
| 	checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, 15, missingLabels...) | ||||
| 	checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, 15, missingLabels...) | ||||
|  | ||||
| 	expected = append(expected, `histogram_measure_bucket{A="E",C="",le="+Inf"} 5`) | ||||
| 	expected = append(expected, `histogram_measure_bucket{A="E",C="",le="0"} 3`) | ||||
| 	expected = append(expected, `histogram_measure_bucket{A="E",C="",le="1"} 3`) | ||||
| 	expected = append(expected, `histogram_measure_count{A="E",C=""} 5`) | ||||
| 	expected = append(expected, `histogram_measure_sum{A="E",C=""} 28.9`) | ||||
|  | ||||
| 	compareExport(t, exporter, checkpointSet, expected) | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -23,6 +23,7 @@ import ( | ||||
| 	export "go.opentelemetry.io/otel/sdk/export/metric" | ||||
| 	"go.opentelemetry.io/otel/sdk/export/metric/aggregator" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/array" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" | ||||
| ) | ||||
| @@ -84,6 +85,10 @@ func (p *CheckpointSet) AddMeasure(desc *metric.Descriptor, v float64, labels .. | ||||
| 	p.updateAggregator(desc, array.New(), v, labels...) | ||||
| } | ||||
|  | ||||
| func (p *CheckpointSet) AddHistogramMeasure(desc *metric.Descriptor, boundaries []core.Number, v float64, labels ...core.KeyValue) { | ||||
| 	p.updateAggregator(desc, histogram.New(desc, boundaries), v, labels...) | ||||
| } | ||||
|  | ||||
| func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export.Aggregator, v float64, labels ...core.KeyValue) { | ||||
| 	ctx := context.Background() | ||||
| 	// Updates and checkpoint the new aggregator | ||||
|   | ||||
| @@ -75,6 +75,7 @@ type ( | ||||
|  | ||||
| 	// Histogram returns the count of events in pre-determined buckets. | ||||
| 	Histogram interface { | ||||
| 		Sum | ||||
| 		Histogram() (Buckets, error) | ||||
| 	} | ||||
|  | ||||
|   | ||||
| @@ -15,10 +15,12 @@ | ||||
| package simple // import "go.opentelemetry.io/otel/sdk/metric/selector/simple" | ||||
|  | ||||
| import ( | ||||
| 	"go.opentelemetry.io/otel/api/core" | ||||
| 	"go.opentelemetry.io/otel/api/metric" | ||||
| 	export "go.opentelemetry.io/otel/sdk/export/metric" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/array" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" | ||||
| ) | ||||
| @@ -29,12 +31,16 @@ type ( | ||||
| 	selectorSketch      struct { | ||||
| 		config *ddsketch.Config | ||||
| 	} | ||||
| 	selectorHistogram struct { | ||||
| 		boundaries []core.Number | ||||
| 	} | ||||
| ) | ||||
|  | ||||
| var ( | ||||
| 	_ export.AggregationSelector = selectorInexpensive{} | ||||
| 	_ export.AggregationSelector = selectorSketch{} | ||||
| 	_ export.AggregationSelector = selectorExact{} | ||||
| 	_ export.AggregationSelector = selectorHistogram{} | ||||
| ) | ||||
|  | ||||
| // NewWithInexpensiveMeasure returns a simple aggregation selector | ||||
| @@ -66,6 +72,14 @@ func NewWithExactMeasure() export.AggregationSelector { | ||||
| 	return selectorExact{} | ||||
| } | ||||
|  | ||||
| // NewWithHistogramMeasure returns a simple aggregation selector that uses counter, | ||||
| // histogram, and histogram aggregators for the three kinds of metric. This | ||||
| // selector uses more memory than the NewWithInexpensiveMeasure because it | ||||
| // uses a counter per bucket. | ||||
| func NewWithHistogramMeasure(boundaries []core.Number) export.AggregationSelector { | ||||
| 	return selectorHistogram{boundaries: boundaries} | ||||
| } | ||||
|  | ||||
| func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { | ||||
| 	switch descriptor.MetricKind() { | ||||
| 	case metric.ObserverKind: | ||||
| @@ -98,3 +112,14 @@ func (selectorExact) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega | ||||
| 		return sum.New() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { | ||||
| 	switch descriptor.MetricKind() { | ||||
| 	case metric.ObserverKind: | ||||
| 		fallthrough | ||||
| 	case metric.MeasureKind: | ||||
| 		return histogram.New(descriptor, s.boundaries) | ||||
| 	default: | ||||
| 		return sum.New() | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -23,6 +23,7 @@ import ( | ||||
| 	"go.opentelemetry.io/otel/api/metric" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/array" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/sum" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/selector/simple" | ||||
| @@ -54,3 +55,10 @@ func TestExactMeasure(t *testing.T) { | ||||
| 	require.NotPanics(t, func() { _ = ex.AggregatorFor(&testMeasureDesc).(*array.Aggregator) }) | ||||
| 	require.NotPanics(t, func() { _ = ex.AggregatorFor(&testObserverDesc).(*array.Aggregator) }) | ||||
| } | ||||
|  | ||||
| func TestHistogramMeasure(t *testing.T) { | ||||
| 	ex := simple.NewWithHistogramMeasure([]core.Number{}) | ||||
| 	require.NotPanics(t, func() { _ = ex.AggregatorFor(&testCounterDesc).(*sum.Aggregator) }) | ||||
| 	require.NotPanics(t, func() { _ = ex.AggregatorFor(&testMeasureDesc).(*histogram.Aggregator) }) | ||||
| 	require.NotPanics(t, func() { _ = ex.AggregatorFor(&testObserverDesc).(*histogram.Aggregator) }) | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user