From 8ef02a61aace5b7bc74b4c1120f5be36f4c5e4e8 Mon Sep 17 00:00:00 2001 From: Oncilla Date: Wed, 1 Apr 2020 23:36:37 +0200 Subject: [PATCH] prometheus: add histogram support to exporter (#601) This PR adds histogram support to the prometheus exporter. - Adds a new aggregator selector that returns a histogram for `MeasureKind`. The selector can be constructed using `simple.NewWithHistogramMeasure` - Adds support for histogram aggregators in prometheus collect method. With this PR, the default selector is changed to use histograms. In order to support the prometheus histogram, the `aggregator.Histogram` interface is extended with the `Sum` method. fixes #487 Co-authored-by: Rahul Patel --- exporters/metric/prometheus/prometheus.go | 60 +++++++++++++++---- .../metric/prometheus/prometheus_test.go | 27 +++++++++ exporters/metric/test/test.go | 5 ++ sdk/export/metric/aggregator/aggregator.go | 1 + sdk/metric/selector/simple/simple.go | 25 ++++++++ sdk/metric/selector/simple/simple_test.go | 8 +++ 6 files changed, 115 insertions(+), 11 deletions(-) diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 38740b881..8c4f75443 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -43,7 +43,8 @@ type Exporter struct { snapshot export.CheckpointSet onError func(error) - defaultSummaryQuantiles []float64 + defaultSummaryQuantiles []float64 + defaultHistogramBoundaries []core.Number } var _ export.Exporter = &Exporter{} @@ -73,6 +74,10 @@ type Config struct { // to use. Use nil to specify the system-default summary quantiles. DefaultSummaryQuantiles []float64 + // DefaultHistogramBoundaries defines the default histogram bucket + // boundaries. + DefaultHistogramBoundaries []core.Number + // OnError is a function that handle errors that may occur while exporting metrics. // TODO: This should be refactored or even removed once we have a better error handling mechanism. OnError func(error) @@ -100,11 +105,12 @@ func NewRawExporter(config Config) (*Exporter, error) { } e := &Exporter{ - handler: promhttp.HandlerFor(config.Gatherer, promhttp.HandlerOpts{}), - registerer: config.Registerer, - gatherer: config.Gatherer, - defaultSummaryQuantiles: config.DefaultSummaryQuantiles, - onError: config.OnError, + handler: promhttp.HandlerFor(config.Gatherer, promhttp.HandlerOpts{}), + registerer: config.Registerer, + gatherer: config.Gatherer, + defaultSummaryQuantiles: config.DefaultSummaryQuantiles, + defaultHistogramBoundaries: config.DefaultHistogramBoundaries, + onError: config.OnError, } c := newCollector(e) @@ -138,7 +144,7 @@ func InstallNewPipeline(config Config) (*push.Controller, http.HandlerFunc, erro // NewExportPipeline sets up a complete export pipeline with the recommended setup, // chaining a NewRawExporter into the recommended selectors and batchers. func NewExportPipeline(config Config, period time.Duration) (*push.Controller, http.HandlerFunc, error) { - selector := simple.NewWithExactMeasure() + selector := simple.NewWithHistogramMeasure(config.DefaultHistogramBoundaries) exporter, err := NewRawExporter(config) if err != nil { return nil, nil, err @@ -204,10 +210,9 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { labels := labelValues(record.Labels()) desc := c.toDesc(&record) - // TODO: implement histogram export when the histogram aggregation is done. - // https://github.com/open-telemetry/opentelemetry-go/issues/317 - - if dist, ok := agg.(aggregator.Distribution); ok { + if hist, ok := agg.(aggregator.Histogram); ok { + c.exportHistogram(ch, hist, numberKind, desc, labels) + } else if dist, ok := agg.(aggregator.Distribution); ok { // TODO: summaries values are never being resetted. // As measures are recorded, new records starts to have less impact on these summaries. // We should implement an solution that is similar to the Prometheus Clients @@ -287,6 +292,39 @@ func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.D ch <- m } +func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregator.Histogram, kind core.NumberKind, desc *prometheus.Desc, labels []string) { + buckets, err := hist.Histogram() + if err != nil { + c.exp.onError(err) + return + } + sum, err := hist.Sum() + if err != nil { + c.exp.onError(err) + return + } + + var totalCount uint64 + // counts maps from the bucket upper-bound to the cumulative count. + // The bucket with upper-bound +inf is not included. + counts := make(map[float64]uint64, len(buckets.Boundaries)) + for i := range buckets.Boundaries { + boundary := buckets.Boundaries[i].CoerceToFloat64(kind) + totalCount += buckets.Counts[i].AsUint64() + counts[boundary] = totalCount + } + // Include the +inf bucket in the total count. + totalCount += buckets.Counts[len(buckets.Counts)-1].AsUint64() + + m, err := prometheus.NewConstHistogram(desc, totalCount, sum.CoerceToFloat64(kind), counts, labels...) + if err != nil { + c.exp.onError(err) + return + } + + ch <- m +} + func (c *collector) toDesc(record *export.Record) *prometheus.Desc { desc := record.Descriptor() labels := labelsKeys(record.Labels()) diff --git a/exporters/metric/prometheus/prometheus_test.go b/exporters/metric/prometheus/prometheus_test.go index aad70ff84..3cc0c053c 100644 --- a/exporters/metric/prometheus/prometheus_test.go +++ b/exporters/metric/prometheus/prometheus_test.go @@ -49,6 +49,8 @@ func TestPrometheusExporter(t *testing.T) { "lastvalue", metric.ObserverKind, core.Float64NumberKind) measure := metric.NewDescriptor( "measure", metric.MeasureKind, core.Float64NumberKind) + histogramMeasure := metric.NewDescriptor( + "histogram_measure", metric.MeasureKind, core.Float64NumberKind) labels := []core.KeyValue{ key.New("A").String("B"), @@ -70,6 +72,18 @@ func TestPrometheusExporter(t *testing.T) { expected = append(expected, `measure_sum{A="B",C="D"} 45`) expected = append(expected, `measure_count{A="B",C="D"} 3`) + boundaries := []core.Number{core.NewFloat64Number(-0.5), core.NewFloat64Number(1)} + checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.6, labels...) + checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.4, labels...) + checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, 0.6, labels...) + checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, 20, labels...) + + expected = append(expected, `histogram_measure_bucket{A="B",C="D",le="+Inf"} 4`) + expected = append(expected, `histogram_measure_bucket{A="B",C="D",le="-0.5"} 1`) + expected = append(expected, `histogram_measure_bucket{A="B",C="D",le="1"} 3`) + expected = append(expected, `histogram_measure_count{A="B",C="D"} 4`) + expected = append(expected, `histogram_measure_sum{A="B",C="D"} 19.6`) + missingLabels := []core.KeyValue{ key.New("A").String("E"), key.New("C").String(""), @@ -88,6 +102,19 @@ func TestPrometheusExporter(t *testing.T) { expected = append(expected, `measure_count{A="E",C=""} 1`) expected = append(expected, `measure_sum{A="E",C=""} 19`) + boundaries = []core.Number{core.NewFloat64Number(0), core.NewFloat64Number(1)} + checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.6, missingLabels...) + checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.4, missingLabels...) + checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, -0.1, missingLabels...) + checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, 15, missingLabels...) + checkpointSet.AddHistogramMeasure(&histogramMeasure, boundaries, 15, missingLabels...) + + expected = append(expected, `histogram_measure_bucket{A="E",C="",le="+Inf"} 5`) + expected = append(expected, `histogram_measure_bucket{A="E",C="",le="0"} 3`) + expected = append(expected, `histogram_measure_bucket{A="E",C="",le="1"} 3`) + expected = append(expected, `histogram_measure_count{A="E",C=""} 5`) + expected = append(expected, `histogram_measure_sum{A="E",C=""} 28.9`) + compareExport(t, exporter, checkpointSet, expected) } diff --git a/exporters/metric/test/test.go b/exporters/metric/test/test.go index ae2994449..88dca0f3b 100644 --- a/exporters/metric/test/test.go +++ b/exporters/metric/test/test.go @@ -23,6 +23,7 @@ import ( export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/array" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" ) @@ -84,6 +85,10 @@ func (p *CheckpointSet) AddMeasure(desc *metric.Descriptor, v float64, labels .. p.updateAggregator(desc, array.New(), v, labels...) } +func (p *CheckpointSet) AddHistogramMeasure(desc *metric.Descriptor, boundaries []core.Number, v float64, labels ...core.KeyValue) { + p.updateAggregator(desc, histogram.New(desc, boundaries), v, labels...) +} + func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export.Aggregator, v float64, labels ...core.KeyValue) { ctx := context.Background() // Updates and checkpoint the new aggregator diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 52359a144..3922aad8e 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -75,6 +75,7 @@ type ( // Histogram returns the count of events in pre-determined buckets. Histogram interface { + Sum Histogram() (Buckets, error) } diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index a41382b79..efd259267 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -15,10 +15,12 @@ package simple // import "go.opentelemetry.io/otel/sdk/metric/selector/simple" import ( + "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/api/metric" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" ) @@ -29,12 +31,16 @@ type ( selectorSketch struct { config *ddsketch.Config } + selectorHistogram struct { + boundaries []core.Number + } ) var ( _ export.AggregationSelector = selectorInexpensive{} _ export.AggregationSelector = selectorSketch{} _ export.AggregationSelector = selectorExact{} + _ export.AggregationSelector = selectorHistogram{} ) // NewWithInexpensiveMeasure returns a simple aggregation selector @@ -66,6 +72,14 @@ func NewWithExactMeasure() export.AggregationSelector { return selectorExact{} } +// NewWithHistogramMeasure returns a simple aggregation selector that uses counter, +// histogram, and histogram aggregators for the three kinds of metric. This +// selector uses more memory than the NewWithInexpensiveMeasure because it +// uses a counter per bucket. +func NewWithHistogramMeasure(boundaries []core.Number) export.AggregationSelector { + return selectorHistogram{boundaries: boundaries} +} + func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { switch descriptor.MetricKind() { case metric.ObserverKind: @@ -98,3 +112,14 @@ func (selectorExact) AggregatorFor(descriptor *metric.Descriptor) export.Aggrega return sum.New() } } + +func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator { + switch descriptor.MetricKind() { + case metric.ObserverKind: + fallthrough + case metric.MeasureKind: + return histogram.New(descriptor, s.boundaries) + default: + return sum.New() + } +} diff --git a/sdk/metric/selector/simple/simple_test.go b/sdk/metric/selector/simple/simple_test.go index 5be7f17c9..0815e5ecc 100644 --- a/sdk/metric/selector/simple/simple_test.go +++ b/sdk/metric/selector/simple/simple_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel/api/metric" "go.opentelemetry.io/otel/sdk/metric/aggregator/array" "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" "go.opentelemetry.io/otel/sdk/metric/selector/simple" @@ -54,3 +55,10 @@ func TestExactMeasure(t *testing.T) { require.NotPanics(t, func() { _ = ex.AggregatorFor(&testMeasureDesc).(*array.Aggregator) }) require.NotPanics(t, func() { _ = ex.AggregatorFor(&testObserverDesc).(*array.Aggregator) }) } + +func TestHistogramMeasure(t *testing.T) { + ex := simple.NewWithHistogramMeasure([]core.Number{}) + require.NotPanics(t, func() { _ = ex.AggregatorFor(&testCounterDesc).(*sum.Aggregator) }) + require.NotPanics(t, func() { _ = ex.AggregatorFor(&testMeasureDesc).(*histogram.Aggregator) }) + require.NotPanics(t, func() { _ = ex.AggregatorFor(&testObserverDesc).(*histogram.Aggregator) }) +}