1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-05 22:54:18 +02:00

Change prometheus to not aggregate metrics and only export them. (#385)

* draft using stop aggregating on prometheus client (counters)

* remove prometheus client aggregations

Measures are being exported as summaries since histograms doesn't
exist on OpenTelemetry yet.

Better error handling must be done.

* make pre commit

* add simple error callback

* remove options from collector

* refactor exporter to smaller methods

* wording

* change to snapshot

* lock collection and checkpointset read

* remove histogram options and unexported fields from the Exporter

* documenting why prometheus uses a stateful batcher

* add todo for histograms

* change summaries objects to summary quantiles

* remove histogram buckets from tests

* wording

* rename 'lockedCheckpoint' to 'syncCheckpointSet'

* default summary quantiles should be defaulted to no buckets.

* add quantiles options

* refactor test.CheckpointSet and add docs

* flip aggregators merge

Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
This commit is contained in:
Gustavo Silva Paiva 2019-12-23 14:47:51 -03:00 committed by rghetia
parent 99cb01b246
commit 6f04903876
9 changed files with 232 additions and 532 deletions

View File

@ -38,14 +38,20 @@ var (
func initMeter() *push.Controller { func initMeter() *push.Controller {
selector := simple.NewWithExactMeasure() selector := simple.NewWithExactMeasure()
exporter, err := prometheus.NewExporter(prometheus.Options{ exporter, err := prometheus.NewExporter(prometheus.Options{})
DefaultHistogramBuckets: []float64{0., 10., 15., 20.},
})
if err != nil { if err != nil {
log.Panicf("failed to initialize metric stdout exporter %v", err) log.Panicf("failed to initialize metric stdout exporter %v", err)
} }
batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), false) // Prometheus needs to use a stateful batcher since counters (and histogram since they are a collection of Counters)
// are cumulative (i.e., monotonically increasing values) and should not be resetted after each export.
//
// Prometheus uses this approach to be resilient to scrape failures.
// If a Prometheus server tries to scrape metrics from a host and fails for some reason,
// it could try again on the next scrape and no data would be lost, only resolution.
//
// Gauges (or LastValues) and Summaries are an exception to this and have different behaviors.
batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), true)
pusher := push.New(batcher, exporter, time.Second) pusher := push.New(batcher, exporter, time.Second)
pusher.Start() pusher.Start()

View File

@ -1,94 +0,0 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"github.com/prometheus/client_golang/prometheus"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
type counters struct {
registerer prometheus.Registerer
counters map[metricKey]prometheus.Counter
counterVecs map[*export.Descriptor]*prometheus.CounterVec
}
func newCounters(registerer prometheus.Registerer) counters {
return counters{
registerer: registerer,
counters: make(map[metricKey]prometheus.Counter),
counterVecs: make(map[*export.Descriptor]*prometheus.CounterVec),
}
}
func (co *counters) export(sum aggregator.Sum, record export.Record, mKey metricKey) error {
value, err := sum.Sum()
if err != nil {
return err
}
c, err := co.getCounter(record, mKey)
if err != nil {
return err
}
desc := record.Descriptor()
c.Add(value.CoerceToFloat64(desc.NumberKind()))
return nil
}
func (co *counters) getCounter(record export.Record, mKey metricKey) (prometheus.Counter, error) {
if c, ok := co.counters[mKey]; ok {
return c, nil
}
desc := record.Descriptor()
counterVec, err := co.getCounterVec(desc, record.Labels())
if err != nil {
return nil, err
}
counter, err := counterVec.GetMetricWithLabelValues(labelValues(record.Labels())...)
if err != nil {
return nil, err
}
co.counters[mKey] = counter
return counter, nil
}
func (co *counters) getCounterVec(desc *export.Descriptor, labels export.Labels) (*prometheus.CounterVec, error) {
if c, ok := co.counterVecs[desc]; ok {
return c, nil
}
c := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: sanitize(desc.Name()),
Help: desc.Description(),
},
labelsKeys(labels.Ordered()),
)
if err := co.registerer.Register(c); err != nil {
return nil, err
}
co.counterVecs[desc] = c
return c, nil
}

View File

@ -1,94 +0,0 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"github.com/prometheus/client_golang/prometheus"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
type gauges struct {
registerer prometheus.Registerer
gauges map[metricKey]prometheus.Gauge
gaugeVecs map[*export.Descriptor]*prometheus.GaugeVec
}
func newGauges(registerer prometheus.Registerer) gauges {
return gauges{
registerer: registerer,
gauges: make(map[metricKey]prometheus.Gauge),
gaugeVecs: make(map[*export.Descriptor]*prometheus.GaugeVec),
}
}
func (ga *gauges) export(gauge aggregator.LastValue, record export.Record, mKey metricKey) error {
lv, _, err := gauge.LastValue()
if err != nil {
return err
}
g, err := ga.getGauge(record, mKey)
if err != nil {
return err
}
desc := record.Descriptor()
g.Set(lv.CoerceToFloat64(desc.NumberKind()))
return nil
}
func (ga *gauges) getGauge(record export.Record, mKey metricKey) (prometheus.Gauge, error) {
if c, ok := ga.gauges[mKey]; ok {
return c, nil
}
desc := record.Descriptor()
gaugeVec, err := ga.getGaugeVec(desc, record.Labels())
if err != nil {
return nil, err
}
gauge, err := gaugeVec.GetMetricWithLabelValues(labelValues(record.Labels())...)
if err != nil {
return nil, err
}
ga.gauges[mKey] = gauge
return gauge, nil
}
func (ga *gauges) getGaugeVec(desc *export.Descriptor, labels export.Labels) (*prometheus.GaugeVec, error) {
if gv, ok := ga.gaugeVecs[desc]; ok {
return gv, nil
}
g := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: sanitize(desc.Name()),
Help: desc.Description(),
},
labelsKeys(labels.Ordered()),
)
if err := ga.registerer.Register(g); err != nil {
return nil, err
}
ga.gaugeVecs[desc] = g
return g, nil
}

View File

@ -1,98 +0,0 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"github.com/prometheus/client_golang/prometheus"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
type histograms struct {
defaultHistogramBuckets []float64
registerer prometheus.Registerer
histogram map[metricKey]prometheus.Observer
histogramVecs map[*export.Descriptor]*prometheus.HistogramVec
}
func newHistograms(registerer prometheus.Registerer, defaultHistogramBuckets []float64) histograms {
return histograms{
registerer: registerer,
histogram: make(map[metricKey]prometheus.Observer),
histogramVecs: make(map[*export.Descriptor]*prometheus.HistogramVec),
defaultHistogramBuckets: defaultHistogramBuckets,
}
}
func (hi *histograms) export(points aggregator.Points, record export.Record, mKey metricKey) error {
values, err := points.Points()
if err != nil {
return err
}
obs, err := hi.getHistogram(record, mKey)
if err != nil {
return err
}
desc := record.Descriptor()
for _, v := range values {
obs.Observe(v.CoerceToFloat64(desc.NumberKind()))
}
return nil
}
func (hi *histograms) getHistogram(record export.Record, mKey metricKey) (prometheus.Observer, error) {
if c, ok := hi.histogram[mKey]; ok {
return c, nil
}
desc := record.Descriptor()
histogramVec, err := hi.getHistogramVec(desc, record.Labels())
if err != nil {
return nil, err
}
obs, err := histogramVec.GetMetricWithLabelValues(labelValues(record.Labels())...)
if err != nil {
return nil, err
}
hi.histogram[mKey] = obs
return obs, nil
}
func (hi *histograms) getHistogramVec(desc *export.Descriptor, labels export.Labels) (*prometheus.HistogramVec, error) {
if gv, ok := hi.histogramVecs[desc]; ok {
return gv, nil
}
g := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: sanitize(desc.Name()),
Help: desc.Description(),
Buckets: hi.defaultHistogramBuckets,
},
labelsKeys(labels.Ordered()),
)
if err := hi.registerer.Register(g); err != nil {
return nil, err
}
hi.histogramVecs[desc] = g
return g, nil
}

View File

@ -16,6 +16,7 @@ package prometheus
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -26,11 +27,6 @@ import (
"go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/export/metric/aggregator"
) )
type metricKey struct {
desc *export.Descriptor
encoded string
}
// Exporter is an implementation of metric.Exporter that sends metrics to // Exporter is an implementation of metric.Exporter that sends metrics to
// Prometheus. // Prometheus.
type Exporter struct { type Exporter struct {
@ -39,11 +35,10 @@ type Exporter struct {
registerer prometheus.Registerer registerer prometheus.Registerer
gatherer prometheus.Gatherer gatherer prometheus.Gatherer
counters counters snapshot export.CheckpointSet
gauges gauges onError func(error)
histograms histograms
summaries summaries defaultSummaryQuantiles []float64
measureAggregation MeasureAggregation
} }
var _ export.Exporter = &Exporter{} var _ export.Exporter = &Exporter{}
@ -69,27 +64,15 @@ type Options struct {
// If not specified the Registry will be used as default. // If not specified the Registry will be used as default.
Gatherer prometheus.Gatherer Gatherer prometheus.Gatherer
// DefaultHistogramBuckets is the default histogram buckets // DefaultSummaryQuantiles is the default summary quantiles
// to use. Use nil to specify the system-default histogram buckets. // to use. Use nil to specify the system-default summary quantiles.
DefaultHistogramBuckets []float64 DefaultSummaryQuantiles []float64
// DefaultSummaryObjectives is the default summary objectives // OnError is a function that handle errors that may occur while exporting metrics.
// to use. Use nil to specify the system-default summary objectives. // TODO: This should be refactored or even removed once we have a better error handling mechanism.
DefaultSummaryObjectives map[float64]float64 OnError func(error)
// MeasureAggregation defines how metric.Measure are exported.
// Possible values are 'Histogram' or 'Summary'.
// The default export representation for measures is Histograms.
MeasureAggregation MeasureAggregation
} }
type MeasureAggregation int
const (
Histogram MeasureAggregation = iota
Summary
)
// NewExporter returns a new prometheus exporter for prometheus metrics. // NewExporter returns a new prometheus exporter for prometheus metrics.
func NewExporter(opts Options) (*Exporter, error) { func NewExporter(opts Options) (*Exporter, error) {
if opts.Registry == nil { if opts.Registry == nil {
@ -104,70 +87,166 @@ func NewExporter(opts Options) (*Exporter, error) {
opts.Gatherer = opts.Registry opts.Gatherer = opts.Registry
} }
return &Exporter{ if opts.OnError == nil {
registerer: opts.Registerer, opts.OnError = func(err error) {
gatherer: opts.Gatherer, fmt.Println(err.Error())
handler: promhttp.HandlerFor(opts.Gatherer, promhttp.HandlerOpts{}), }
measureAggregation: opts.MeasureAggregation, }
counters: newCounters(opts.Registerer), e := &Exporter{
gauges: newGauges(opts.Registerer), handler: promhttp.HandlerFor(opts.Gatherer, promhttp.HandlerOpts{}),
histograms: newHistograms(opts.Registerer, opts.DefaultHistogramBuckets), registerer: opts.Registerer,
summaries: newSummaries(opts.Registerer, opts.DefaultSummaryObjectives), gatherer: opts.Gatherer,
}, nil defaultSummaryQuantiles: opts.DefaultSummaryQuantiles,
}
c := newCollector(e)
if err := opts.Registerer.Register(c); err != nil {
opts.OnError(fmt.Errorf("cannot register the collector: %w", err))
}
return e, nil
} }
// Export exports the provide metric record to prometheus. // Export exports the provide metric record to prometheus.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
var forEachError error e.snapshot = checkpointSet
checkpointSet.ForEach(func(record export.Record) { return nil
}
// collector implements prometheus.Collector interface.
type collector struct {
exp *Exporter
}
var _ prometheus.Collector = (*collector)(nil)
func newCollector(exporter *Exporter) *collector {
return &collector{
exp: exporter,
}
}
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
if c.exp.snapshot == nil {
return
}
c.exp.snapshot.ForEach(func(record export.Record) {
ch <- c.toDesc(&record)
})
}
// Collect exports the last calculated CheckpointSet.
//
// Collect is invoked whenever prometheus.Gatherer is also invoked.
// For example, when the HTTP endpoint is invoked by Prometheus.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
if c.exp.snapshot == nil {
return
}
c.exp.snapshot.ForEach(func(record export.Record) {
agg := record.Aggregator() agg := record.Aggregator()
numberKind := record.Descriptor().NumberKind()
labels := labelValues(record.Labels())
desc := c.toDesc(&record)
mKey := metricKey{ // TODO: implement histogram export when the histogram aggregation is done.
desc: record.Descriptor(), // https://github.com/open-telemetry/opentelemetry-go/issues/317
encoded: record.Labels().Encoded(),
}
if points, ok := agg.(aggregator.Points); ok { if dist, ok := agg.(aggregator.Distribution); ok {
observerExporter := e.histograms.export // TODO: summaries values are never being resetted.
if e.measureAggregation == Summary { // As measures are recorded, new records starts to have less impact on these summaries.
observerExporter = e.summaries.export // We should implement an solution that is similar to the Prometheus Clients
} // using a rolling window for summaries could be a solution.
//
err := observerExporter(points, record, mKey) // References:
if err != nil { // https://www.robustperception.io/how-does-a-prometheus-summary-work
forEachError = err // https://github.com/prometheus/client_golang/blob/fa4aa9000d2863904891d193dea354d23f3d712a/prometheus/summary.go#L135
} c.exportSummary(ch, dist, numberKind, desc, labels)
return } else if sum, ok := agg.(aggregator.Sum); ok {
} c.exportCounter(ch, sum, numberKind, desc, labels)
} else if gauge, ok := agg.(aggregator.LastValue); ok {
if sum, ok := agg.(aggregator.Sum); ok { c.exportGauge(ch, gauge, numberKind, desc, labels)
err := e.counters.export(sum, record, mKey)
if err != nil {
forEachError = err
}
return
}
if gauge, ok := agg.(aggregator.LastValue); ok {
err := e.gauges.export(gauge, record, mKey)
if err != nil {
forEachError = err
}
return
} }
}) })
}
return forEachError func (c *collector) exportGauge(ch chan<- prometheus.Metric, gauge aggregator.LastValue, kind core.NumberKind, desc *prometheus.Desc, labels []string) {
lastValue, _, err := gauge.LastValue()
if err != nil {
c.exp.onError(err)
return
}
m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, lastValue.CoerceToFloat64(kind), labels...)
if err != nil {
c.exp.onError(err)
return
}
ch <- m
}
func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregator.Sum, kind core.NumberKind, desc *prometheus.Desc, labels []string) {
v, err := sum.Sum()
if err != nil {
c.exp.onError(err)
return
}
m, err := prometheus.NewConstMetric(desc, prometheus.CounterValue, v.CoerceToFloat64(kind), labels...)
if err != nil {
c.exp.onError(err)
return
}
ch <- m
}
func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.Distribution, kind core.NumberKind, desc *prometheus.Desc, labels []string) {
count, err := dist.Count()
if err != nil {
c.exp.onError(err)
return
}
var sum core.Number
sum, err = dist.Sum()
if err != nil {
c.exp.onError(err)
return
}
quantiles := make(map[float64]float64)
for _, quantile := range c.exp.defaultSummaryQuantiles {
q, _ := dist.Quantile(quantile)
quantiles[quantile] = q.CoerceToFloat64(kind)
}
m, err := prometheus.NewConstSummary(desc, uint64(count), sum.CoerceToFloat64(kind), quantiles, labels...)
if err != nil {
c.exp.onError(err)
return
}
ch <- m
}
func (c *collector) toDesc(metric *export.Record) *prometheus.Desc {
desc := metric.Descriptor()
labels := labelsKeys(metric.Labels())
return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labels, nil)
} }
func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
e.handler.ServeHTTP(w, r) e.handler.ServeHTTP(w, r)
} }
func labelsKeys(kvs []core.KeyValue) []string { func labelsKeys(labels export.Labels) []string {
keys := make([]string, 0, len(kvs)) keys := make([]string, 0, labels.Len())
for _, kv := range kvs { for _, kv := range labels.Ordered() {
keys = append(keys, sanitize(string(kv.Key))) keys = append(keys, sanitize(string(kv.Key)))
} }
return keys return keys

View File

@ -20,7 +20,7 @@ import (
func TestPrometheusExporter(t *testing.T) { func TestPrometheusExporter(t *testing.T) {
exporter, err := prometheus.NewExporter(prometheus.Options{ exporter, err := prometheus.NewExporter(prometheus.Options{
DefaultHistogramBuckets: []float64{0., 10., 15., 20.}, DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99},
}) })
if err != nil { if err != nil {
log.Panicf("failed to initialize metric stdout exporter %v", err) log.Panicf("failed to initialize metric stdout exporter %v", err)
@ -50,11 +50,9 @@ func TestPrometheusExporter(t *testing.T) {
checkpointSet.AddMeasure(measure, 13, labels...) checkpointSet.AddMeasure(measure, 13, labels...)
checkpointSet.AddMeasure(measure, 15, labels...) checkpointSet.AddMeasure(measure, 15, labels...)
checkpointSet.AddMeasure(measure, 17, labels...) checkpointSet.AddMeasure(measure, 17, labels...)
expected = append(expected, `measure_bucket{A="B",C="D",le="0"} 0`) expected = append(expected, `measure{A="B",C="D",quantile="0.5"} 15`)
expected = append(expected, `measure_bucket{A="B",C="D",le="10"} 0`) expected = append(expected, `measure{A="B",C="D",quantile="0.9"} 17`)
expected = append(expected, `measure_bucket{A="B",C="D",le="15"} 2`) expected = append(expected, `measure{A="B",C="D",quantile="0.99"} 17`)
expected = append(expected, `measure_bucket{A="B",C="D",le="20"} 3`)
expected = append(expected, `measure_bucket{A="B",C="D",le="+Inf"} 3`)
expected = append(expected, `measure_sum{A="B",C="D"} 45`) expected = append(expected, `measure_sum{A="B",C="D"} 45`)
expected = append(expected, `measure_count{A="B",C="D"} 3`) expected = append(expected, `measure_count{A="B",C="D"} 3`)
@ -69,51 +67,6 @@ func TestPrometheusExporter(t *testing.T) {
checkpointSet.AddGauge(gauge, 32, missingLabels...) checkpointSet.AddGauge(gauge, 32, missingLabels...)
expected = append(expected, `gauge{A="E",C=""} 32`) expected = append(expected, `gauge{A="E",C=""} 32`)
checkpointSet.AddMeasure(measure, 19, missingLabels...)
expected = append(expected, `measure_bucket{A="E",C="",le="+Inf"} 1`)
expected = append(expected, `measure_bucket{A="E",C="",le="0"} 0`)
expected = append(expected, `measure_bucket{A="E",C="",le="10"} 0`)
expected = append(expected, `measure_bucket{A="E",C="",le="15"} 0`)
expected = append(expected, `measure_bucket{A="E",C="",le="20"} 1`)
expected = append(expected, `measure_count{A="E",C=""} 1`)
expected = append(expected, `measure_sum{A="E",C=""} 19`)
compareExport(t, exporter, checkpointSet, expected)
}
func TestPrometheusExporter_Summaries(t *testing.T) {
exporter, err := prometheus.NewExporter(prometheus.Options{
MeasureAggregation: prometheus.Summary,
})
if err != nil {
log.Panicf("failed to initialize metric stdout exporter %v", err)
}
var expected []string
checkpointSet := test.NewCheckpointSet(metric.NewDefaultLabelEncoder())
measure := export.NewDescriptor(
"measure", export.MeasureKind, nil, "", "", core.Float64NumberKind, false)
labels := []core.KeyValue{
key.New("A").String("B"),
key.New("C").String("D"),
}
checkpointSet.AddMeasure(measure, 13, labels...)
checkpointSet.AddMeasure(measure, 15, labels...)
checkpointSet.AddMeasure(measure, 17, labels...)
expected = append(expected, `measure_count{A="B",C="D"} 3`)
expected = append(expected, `measure_sum{A="B",C="D"} 45`)
expected = append(expected, `measure{A="B",C="D",quantile="0.5"} 15`)
expected = append(expected, `measure{A="B",C="D",quantile="0.9"} 17`)
expected = append(expected, `measure{A="B",C="D",quantile="0.99"} 17`)
missingLabels := []core.KeyValue{
key.New("A").String("E"),
key.New("C").String(""),
}
checkpointSet.AddMeasure(measure, 19, missingLabels...) checkpointSet.AddMeasure(measure, 19, missingLabels...)
expected = append(expected, `measure{A="E",C="",quantile="0.5"} 19`) expected = append(expected, `measure{A="E",C="",quantile="0.5"} 19`)
expected = append(expected, `measure{A="E",C="",quantile="0.9"} 19`) expected = append(expected, `measure{A="E",C="",quantile="0.9"} 19`)

View File

@ -1,98 +0,0 @@
// Copyright 2019, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prometheus
import (
"github.com/prometheus/client_golang/prometheus"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
type summaries struct {
defaultSummaryObjectives map[float64]float64
registerer prometheus.Registerer
summary map[metricKey]prometheus.Observer
summaryVec map[*export.Descriptor]*prometheus.SummaryVec
}
func newSummaries(registerer prometheus.Registerer, defaultSummaryObjectives map[float64]float64) summaries {
return summaries{
registerer: registerer,
summary: make(map[metricKey]prometheus.Observer),
summaryVec: make(map[*export.Descriptor]*prometheus.SummaryVec),
defaultSummaryObjectives: defaultSummaryObjectives,
}
}
func (su *summaries) export(points aggregator.Points, record export.Record, mKey metricKey) error {
values, err := points.Points()
if err != nil {
return err
}
obs, err := su.getSummary(record, mKey)
if err != nil {
return err
}
desc := record.Descriptor()
for _, v := range values {
obs.Observe(v.CoerceToFloat64(desc.NumberKind()))
}
return nil
}
func (su *summaries) getSummary(record export.Record, mKey metricKey) (prometheus.Observer, error) {
if c, ok := su.summary[mKey]; ok {
return c, nil
}
desc := record.Descriptor()
histogramVec, err := su.getSummaryVec(desc, record.Labels())
if err != nil {
return nil, err
}
obs, err := histogramVec.GetMetricWithLabelValues(labelValues(record.Labels())...)
if err != nil {
return nil, err
}
su.summary[mKey] = obs
return obs, nil
}
func (su *summaries) getSummaryVec(desc *export.Descriptor, labels export.Labels) (*prometheus.SummaryVec, error) {
if gv, ok := su.summaryVec[desc]; ok {
return gv, nil
}
g := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: sanitize(desc.Name()),
Help: desc.Description(),
Objectives: su.defaultSummaryObjectives,
},
labelsKeys(labels.Ordered()),
)
if err := su.registerer.Register(g); err != nil {
return nil, err
}
su.summaryVec[desc] = g
return g, nil
}

View File

@ -12,24 +12,41 @@ import (
type CheckpointSet struct { type CheckpointSet struct {
encoder export.LabelEncoder encoder export.LabelEncoder
records map[string]export.Record
updates []export.Record updates []export.Record
} }
// NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their LabelSet.
func NewCheckpointSet(encoder export.LabelEncoder) *CheckpointSet { func NewCheckpointSet(encoder export.LabelEncoder) *CheckpointSet {
return &CheckpointSet{ return &CheckpointSet{
encoder: encoder, encoder: encoder,
records: make(map[string]export.Record),
} }
} }
func (p *CheckpointSet) Reset() { func (p *CheckpointSet) Reset() {
p.records = make(map[string]export.Record)
p.updates = nil p.updates = nil
} }
func (p *CheckpointSet) Add(desc *export.Descriptor, agg export.Aggregator, labels ...core.KeyValue) { // Add a new descriptor to a Checkpoint.
//
// If there is an existing record with the same descriptor and LabelSet
// the stored aggregator will be returned and should be merged.
func (p *CheckpointSet) Add(desc *export.Descriptor, newAgg export.Aggregator, labels ...core.KeyValue) (agg export.Aggregator, added bool) {
encoded := p.encoder.Encode(labels) encoded := p.encoder.Encode(labels)
elabels := export.NewLabels(labels, encoded, p.encoder) elabels := export.NewLabels(labels, encoded, p.encoder)
p.updates = append(p.updates, export.NewRecord(desc, elabels, agg)) key := desc.Name() + "_" + elabels.Encoded()
if record, ok := p.records[key]; ok {
return record.Aggregator(), false
}
rec := export.NewRecord(desc, elabels, newAgg)
p.updates = append(p.updates, rec)
p.records[key] = rec
return newAgg, true
} }
func createNumber(desc *export.Descriptor, v float64) core.Number { func createNumber(desc *export.Descriptor, v float64) core.Number {
@ -40,27 +57,29 @@ func createNumber(desc *export.Descriptor, v float64) core.Number {
} }
func (p *CheckpointSet) AddGauge(desc *export.Descriptor, v float64, labels ...core.KeyValue) { func (p *CheckpointSet) AddGauge(desc *export.Descriptor, v float64, labels ...core.KeyValue) {
ctx := context.Background() p.updateAggregator(desc, gauge.New(), v, labels...)
gagg := gauge.New()
_ = gagg.Update(ctx, createNumber(desc, v), desc)
gagg.Checkpoint(ctx, desc)
p.Add(desc, gagg, labels...)
} }
func (p *CheckpointSet) AddCounter(desc *export.Descriptor, v float64, labels ...core.KeyValue) { func (p *CheckpointSet) AddCounter(desc *export.Descriptor, v float64, labels ...core.KeyValue) {
ctx := context.Background() p.updateAggregator(desc, counter.New(), v, labels...)
cagg := counter.New()
_ = cagg.Update(ctx, createNumber(desc, v), desc)
cagg.Checkpoint(ctx, desc)
p.Add(desc, cagg, labels...)
} }
func (p *CheckpointSet) AddMeasure(desc *export.Descriptor, v float64, labels ...core.KeyValue) { func (p *CheckpointSet) AddMeasure(desc *export.Descriptor, v float64, labels ...core.KeyValue) {
p.updateAggregator(desc, array.New(), v, labels...)
}
func (p *CheckpointSet) updateAggregator(desc *export.Descriptor, newAgg export.Aggregator, v float64, labels ...core.KeyValue) {
ctx := context.Background() ctx := context.Background()
magg := array.New() // Updates and checkpoint the new aggregator
_ = magg.Update(ctx, createNumber(desc, v), desc) _ = newAgg.Update(ctx, createNumber(desc, v), desc)
magg.Checkpoint(ctx, desc) newAgg.Checkpoint(ctx, desc)
p.Add(desc, magg, labels...)
// Try to add this aggregator to the CheckpointSet
agg, added := p.Add(desc, newAgg, labels...)
if !added {
// An aggregator already exist for this descriptor and label set, we should merge them.
_ = agg.Merge(newAgg, desc)
}
} }
func (p *CheckpointSet) ForEach(f func(export.Record)) { func (p *CheckpointSet) ForEach(f func(export.Record)) {

View File

@ -27,6 +27,7 @@ import (
// Controller organizes a periodic push of metric data. // Controller organizes a periodic push of metric data.
type Controller struct { type Controller struct {
lock sync.Mutex lock sync.Mutex
collectLock sync.Mutex
sdk *sdk.SDK sdk *sdk.SDK
errorHandler sdk.ErrorHandler errorHandler sdk.ErrorHandler
batcher export.Batcher batcher export.Batcher
@ -160,8 +161,12 @@ func (c *Controller) tick() {
// TODO: either remove the context argument from Export() or // TODO: either remove the context argument from Export() or
// configure a timeout here? // configure a timeout here?
ctx := context.Background() ctx := context.Background()
c.sdk.Collect(ctx) c.collect(ctx)
err := c.exporter.Export(ctx, c.batcher.CheckpointSet()) checkpointSet := syncCheckpointSet{
mtx: &c.collectLock,
delegate: c.batcher.CheckpointSet(),
}
err := c.exporter.Export(ctx, checkpointSet)
c.batcher.FinishedCollection() c.batcher.FinishedCollection()
if err != nil { if err != nil {
@ -169,6 +174,28 @@ func (c *Controller) tick() {
} }
} }
func (c *Controller) collect(ctx context.Context) {
c.collectLock.Lock()
defer c.collectLock.Unlock()
c.sdk.Collect(ctx)
}
// syncCheckpointSet is a wrapper for a CheckpointSet to synchronize
// SDK's collection and reads of a CheckpointSet by an exporter.
type syncCheckpointSet struct {
mtx *sync.Mutex
delegate export.CheckpointSet
}
var _ export.CheckpointSet = (*syncCheckpointSet)(nil)
func (c syncCheckpointSet) ForEach(fn func(export.Record)) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.delegate.ForEach(fn)
}
func (realClock) Now() time.Time { func (realClock) Now() time.Time {
return time.Now() return time.Now()
} }