diff --git a/example/prometheus/main.go b/example/prometheus/main.go index 07a57385b..d625f0a26 100644 --- a/example/prometheus/main.go +++ b/example/prometheus/main.go @@ -38,14 +38,20 @@ var ( func initMeter() *push.Controller { selector := simple.NewWithExactMeasure() - exporter, err := prometheus.NewExporter(prometheus.Options{ - DefaultHistogramBuckets: []float64{0., 10., 15., 20.}, - }) + exporter, err := prometheus.NewExporter(prometheus.Options{}) if err != nil { log.Panicf("failed to initialize metric stdout exporter %v", err) } - batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), false) + // Prometheus needs to use a stateful batcher since counters (and histogram since they are a collection of Counters) + // are cumulative (i.e., monotonically increasing values) and should not be resetted after each export. + // + // Prometheus uses this approach to be resilient to scrape failures. + // If a Prometheus server tries to scrape metrics from a host and fails for some reason, + // it could try again on the next scrape and no data would be lost, only resolution. + // + // Gauges (or LastValues) and Summaries are an exception to this and have different behaviors. + batcher := defaultkeys.New(selector, sdkmetric.NewDefaultLabelEncoder(), true) pusher := push.New(batcher, exporter, time.Second) pusher.Start() diff --git a/exporter/metric/prometheus/counter.go b/exporter/metric/prometheus/counter.go deleted file mode 100644 index e75c891de..000000000 --- a/exporter/metric/prometheus/counter.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prometheus - -import ( - "github.com/prometheus/client_golang/prometheus" - - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregator" -) - -type counters struct { - registerer prometheus.Registerer - counters map[metricKey]prometheus.Counter - counterVecs map[*export.Descriptor]*prometheus.CounterVec -} - -func newCounters(registerer prometheus.Registerer) counters { - return counters{ - registerer: registerer, - counters: make(map[metricKey]prometheus.Counter), - counterVecs: make(map[*export.Descriptor]*prometheus.CounterVec), - } -} - -func (co *counters) export(sum aggregator.Sum, record export.Record, mKey metricKey) error { - value, err := sum.Sum() - if err != nil { - return err - } - - c, err := co.getCounter(record, mKey) - if err != nil { - return err - } - - desc := record.Descriptor() - c.Add(value.CoerceToFloat64(desc.NumberKind())) - - return nil -} - -func (co *counters) getCounter(record export.Record, mKey metricKey) (prometheus.Counter, error) { - if c, ok := co.counters[mKey]; ok { - return c, nil - } - - desc := record.Descriptor() - counterVec, err := co.getCounterVec(desc, record.Labels()) - if err != nil { - return nil, err - } - - counter, err := counterVec.GetMetricWithLabelValues(labelValues(record.Labels())...) - if err != nil { - return nil, err - } - - co.counters[mKey] = counter - return counter, nil -} - -func (co *counters) getCounterVec(desc *export.Descriptor, labels export.Labels) (*prometheus.CounterVec, error) { - if c, ok := co.counterVecs[desc]; ok { - return c, nil - } - - c := prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: sanitize(desc.Name()), - Help: desc.Description(), - }, - labelsKeys(labels.Ordered()), - ) - - if err := co.registerer.Register(c); err != nil { - return nil, err - } - - co.counterVecs[desc] = c - return c, nil -} diff --git a/exporter/metric/prometheus/gauge.go b/exporter/metric/prometheus/gauge.go deleted file mode 100644 index 81463c6a8..000000000 --- a/exporter/metric/prometheus/gauge.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prometheus - -import ( - "github.com/prometheus/client_golang/prometheus" - - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregator" -) - -type gauges struct { - registerer prometheus.Registerer - gauges map[metricKey]prometheus.Gauge - gaugeVecs map[*export.Descriptor]*prometheus.GaugeVec -} - -func newGauges(registerer prometheus.Registerer) gauges { - return gauges{ - registerer: registerer, - gauges: make(map[metricKey]prometheus.Gauge), - gaugeVecs: make(map[*export.Descriptor]*prometheus.GaugeVec), - } -} - -func (ga *gauges) export(gauge aggregator.LastValue, record export.Record, mKey metricKey) error { - lv, _, err := gauge.LastValue() - if err != nil { - return err - } - - g, err := ga.getGauge(record, mKey) - if err != nil { - return err - } - - desc := record.Descriptor() - g.Set(lv.CoerceToFloat64(desc.NumberKind())) - - return nil -} - -func (ga *gauges) getGauge(record export.Record, mKey metricKey) (prometheus.Gauge, error) { - if c, ok := ga.gauges[mKey]; ok { - return c, nil - } - - desc := record.Descriptor() - gaugeVec, err := ga.getGaugeVec(desc, record.Labels()) - if err != nil { - return nil, err - } - - gauge, err := gaugeVec.GetMetricWithLabelValues(labelValues(record.Labels())...) - if err != nil { - return nil, err - } - - ga.gauges[mKey] = gauge - return gauge, nil -} - -func (ga *gauges) getGaugeVec(desc *export.Descriptor, labels export.Labels) (*prometheus.GaugeVec, error) { - if gv, ok := ga.gaugeVecs[desc]; ok { - return gv, nil - } - - g := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: sanitize(desc.Name()), - Help: desc.Description(), - }, - labelsKeys(labels.Ordered()), - ) - - if err := ga.registerer.Register(g); err != nil { - return nil, err - } - - ga.gaugeVecs[desc] = g - return g, nil -} diff --git a/exporter/metric/prometheus/histogram.go b/exporter/metric/prometheus/histogram.go deleted file mode 100644 index e680fdef1..000000000 --- a/exporter/metric/prometheus/histogram.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prometheus - -import ( - "github.com/prometheus/client_golang/prometheus" - - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregator" -) - -type histograms struct { - defaultHistogramBuckets []float64 - registerer prometheus.Registerer - histogram map[metricKey]prometheus.Observer - histogramVecs map[*export.Descriptor]*prometheus.HistogramVec -} - -func newHistograms(registerer prometheus.Registerer, defaultHistogramBuckets []float64) histograms { - return histograms{ - registerer: registerer, - histogram: make(map[metricKey]prometheus.Observer), - histogramVecs: make(map[*export.Descriptor]*prometheus.HistogramVec), - defaultHistogramBuckets: defaultHistogramBuckets, - } -} - -func (hi *histograms) export(points aggregator.Points, record export.Record, mKey metricKey) error { - values, err := points.Points() - if err != nil { - return err - } - - obs, err := hi.getHistogram(record, mKey) - if err != nil { - return err - } - - desc := record.Descriptor() - for _, v := range values { - obs.Observe(v.CoerceToFloat64(desc.NumberKind())) - } - return nil -} - -func (hi *histograms) getHistogram(record export.Record, mKey metricKey) (prometheus.Observer, error) { - if c, ok := hi.histogram[mKey]; ok { - return c, nil - } - - desc := record.Descriptor() - histogramVec, err := hi.getHistogramVec(desc, record.Labels()) - if err != nil { - return nil, err - } - - obs, err := histogramVec.GetMetricWithLabelValues(labelValues(record.Labels())...) - if err != nil { - return nil, err - } - - hi.histogram[mKey] = obs - return obs, nil -} - -func (hi *histograms) getHistogramVec(desc *export.Descriptor, labels export.Labels) (*prometheus.HistogramVec, error) { - if gv, ok := hi.histogramVecs[desc]; ok { - return gv, nil - } - - g := prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: sanitize(desc.Name()), - Help: desc.Description(), - Buckets: hi.defaultHistogramBuckets, - }, - labelsKeys(labels.Ordered()), - ) - - if err := hi.registerer.Register(g); err != nil { - return nil, err - } - - hi.histogramVecs[desc] = g - return g, nil -} diff --git a/exporter/metric/prometheus/prometheus.go b/exporter/metric/prometheus/prometheus.go index 5bc593f66..05fbd3981 100644 --- a/exporter/metric/prometheus/prometheus.go +++ b/exporter/metric/prometheus/prometheus.go @@ -16,6 +16,7 @@ package prometheus import ( "context" + "fmt" "net/http" "github.com/prometheus/client_golang/prometheus" @@ -26,11 +27,6 @@ import ( "go.opentelemetry.io/otel/sdk/export/metric/aggregator" ) -type metricKey struct { - desc *export.Descriptor - encoded string -} - // Exporter is an implementation of metric.Exporter that sends metrics to // Prometheus. type Exporter struct { @@ -39,11 +35,10 @@ type Exporter struct { registerer prometheus.Registerer gatherer prometheus.Gatherer - counters counters - gauges gauges - histograms histograms - summaries summaries - measureAggregation MeasureAggregation + snapshot export.CheckpointSet + onError func(error) + + defaultSummaryQuantiles []float64 } var _ export.Exporter = &Exporter{} @@ -69,27 +64,15 @@ type Options struct { // If not specified the Registry will be used as default. Gatherer prometheus.Gatherer - // DefaultHistogramBuckets is the default histogram buckets - // to use. Use nil to specify the system-default histogram buckets. - DefaultHistogramBuckets []float64 + // DefaultSummaryQuantiles is the default summary quantiles + // to use. Use nil to specify the system-default summary quantiles. + DefaultSummaryQuantiles []float64 - // DefaultSummaryObjectives is the default summary objectives - // to use. Use nil to specify the system-default summary objectives. - DefaultSummaryObjectives map[float64]float64 - - // MeasureAggregation defines how metric.Measure are exported. - // Possible values are 'Histogram' or 'Summary'. - // The default export representation for measures is Histograms. - MeasureAggregation MeasureAggregation + // OnError is a function that handle errors that may occur while exporting metrics. + // TODO: This should be refactored or even removed once we have a better error handling mechanism. + OnError func(error) } -type MeasureAggregation int - -const ( - Histogram MeasureAggregation = iota - Summary -) - // NewExporter returns a new prometheus exporter for prometheus metrics. func NewExporter(opts Options) (*Exporter, error) { if opts.Registry == nil { @@ -104,70 +87,166 @@ func NewExporter(opts Options) (*Exporter, error) { opts.Gatherer = opts.Registry } - return &Exporter{ - registerer: opts.Registerer, - gatherer: opts.Gatherer, - handler: promhttp.HandlerFor(opts.Gatherer, promhttp.HandlerOpts{}), - measureAggregation: opts.MeasureAggregation, + if opts.OnError == nil { + opts.OnError = func(err error) { + fmt.Println(err.Error()) + } + } - counters: newCounters(opts.Registerer), - gauges: newGauges(opts.Registerer), - histograms: newHistograms(opts.Registerer, opts.DefaultHistogramBuckets), - summaries: newSummaries(opts.Registerer, opts.DefaultSummaryObjectives), - }, nil + e := &Exporter{ + handler: promhttp.HandlerFor(opts.Gatherer, promhttp.HandlerOpts{}), + registerer: opts.Registerer, + gatherer: opts.Gatherer, + defaultSummaryQuantiles: opts.DefaultSummaryQuantiles, + } + + c := newCollector(e) + if err := opts.Registerer.Register(c); err != nil { + opts.OnError(fmt.Errorf("cannot register the collector: %w", err)) + } + + return e, nil } // Export exports the provide metric record to prometheus. func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { - var forEachError error - checkpointSet.ForEach(func(record export.Record) { + e.snapshot = checkpointSet + return nil +} + +// collector implements prometheus.Collector interface. +type collector struct { + exp *Exporter +} + +var _ prometheus.Collector = (*collector)(nil) + +func newCollector(exporter *Exporter) *collector { + return &collector{ + exp: exporter, + } +} + +func (c *collector) Describe(ch chan<- *prometheus.Desc) { + if c.exp.snapshot == nil { + return + } + + c.exp.snapshot.ForEach(func(record export.Record) { + ch <- c.toDesc(&record) + }) +} + +// Collect exports the last calculated CheckpointSet. +// +// Collect is invoked whenever prometheus.Gatherer is also invoked. +// For example, when the HTTP endpoint is invoked by Prometheus. +func (c *collector) Collect(ch chan<- prometheus.Metric) { + if c.exp.snapshot == nil { + return + } + + c.exp.snapshot.ForEach(func(record export.Record) { agg := record.Aggregator() + numberKind := record.Descriptor().NumberKind() + labels := labelValues(record.Labels()) + desc := c.toDesc(&record) - mKey := metricKey{ - desc: record.Descriptor(), - encoded: record.Labels().Encoded(), - } + // TODO: implement histogram export when the histogram aggregation is done. + // https://github.com/open-telemetry/opentelemetry-go/issues/317 - if points, ok := agg.(aggregator.Points); ok { - observerExporter := e.histograms.export - if e.measureAggregation == Summary { - observerExporter = e.summaries.export - } - - err := observerExporter(points, record, mKey) - if err != nil { - forEachError = err - } - return - } - - if sum, ok := agg.(aggregator.Sum); ok { - err := e.counters.export(sum, record, mKey) - if err != nil { - forEachError = err - } - return - } - - if gauge, ok := agg.(aggregator.LastValue); ok { - err := e.gauges.export(gauge, record, mKey) - if err != nil { - forEachError = err - } - return + if dist, ok := agg.(aggregator.Distribution); ok { + // TODO: summaries values are never being resetted. + // As measures are recorded, new records starts to have less impact on these summaries. + // We should implement an solution that is similar to the Prometheus Clients + // using a rolling window for summaries could be a solution. + // + // References: + // https://www.robustperception.io/how-does-a-prometheus-summary-work + // https://github.com/prometheus/client_golang/blob/fa4aa9000d2863904891d193dea354d23f3d712a/prometheus/summary.go#L135 + c.exportSummary(ch, dist, numberKind, desc, labels) + } else if sum, ok := agg.(aggregator.Sum); ok { + c.exportCounter(ch, sum, numberKind, desc, labels) + } else if gauge, ok := agg.(aggregator.LastValue); ok { + c.exportGauge(ch, gauge, numberKind, desc, labels) } }) +} - return forEachError +func (c *collector) exportGauge(ch chan<- prometheus.Metric, gauge aggregator.LastValue, kind core.NumberKind, desc *prometheus.Desc, labels []string) { + lastValue, _, err := gauge.LastValue() + if err != nil { + c.exp.onError(err) + return + } + + m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, lastValue.CoerceToFloat64(kind), labels...) + if err != nil { + c.exp.onError(err) + return + } + + ch <- m +} + +func (c *collector) exportCounter(ch chan<- prometheus.Metric, sum aggregator.Sum, kind core.NumberKind, desc *prometheus.Desc, labels []string) { + v, err := sum.Sum() + if err != nil { + c.exp.onError(err) + return + } + + m, err := prometheus.NewConstMetric(desc, prometheus.CounterValue, v.CoerceToFloat64(kind), labels...) + if err != nil { + c.exp.onError(err) + return + } + + ch <- m +} + +func (c *collector) exportSummary(ch chan<- prometheus.Metric, dist aggregator.Distribution, kind core.NumberKind, desc *prometheus.Desc, labels []string) { + count, err := dist.Count() + if err != nil { + c.exp.onError(err) + return + } + + var sum core.Number + sum, err = dist.Sum() + if err != nil { + c.exp.onError(err) + return + } + + quantiles := make(map[float64]float64) + for _, quantile := range c.exp.defaultSummaryQuantiles { + q, _ := dist.Quantile(quantile) + quantiles[quantile] = q.CoerceToFloat64(kind) + } + + m, err := prometheus.NewConstSummary(desc, uint64(count), sum.CoerceToFloat64(kind), quantiles, labels...) + if err != nil { + c.exp.onError(err) + return + } + + ch <- m +} + +func (c *collector) toDesc(metric *export.Record) *prometheus.Desc { + desc := metric.Descriptor() + labels := labelsKeys(metric.Labels()) + return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labels, nil) } func (e *Exporter) ServeHTTP(w http.ResponseWriter, r *http.Request) { e.handler.ServeHTTP(w, r) } -func labelsKeys(kvs []core.KeyValue) []string { - keys := make([]string, 0, len(kvs)) - for _, kv := range kvs { +func labelsKeys(labels export.Labels) []string { + keys := make([]string, 0, labels.Len()) + for _, kv := range labels.Ordered() { keys = append(keys, sanitize(string(kv.Key))) } return keys diff --git a/exporter/metric/prometheus/prometheus_test.go b/exporter/metric/prometheus/prometheus_test.go index 1cbd252d1..2df58d3f2 100644 --- a/exporter/metric/prometheus/prometheus_test.go +++ b/exporter/metric/prometheus/prometheus_test.go @@ -20,7 +20,7 @@ import ( func TestPrometheusExporter(t *testing.T) { exporter, err := prometheus.NewExporter(prometheus.Options{ - DefaultHistogramBuckets: []float64{0., 10., 15., 20.}, + DefaultSummaryQuantiles: []float64{0.5, 0.9, 0.99}, }) if err != nil { log.Panicf("failed to initialize metric stdout exporter %v", err) @@ -50,11 +50,9 @@ func TestPrometheusExporter(t *testing.T) { checkpointSet.AddMeasure(measure, 13, labels...) checkpointSet.AddMeasure(measure, 15, labels...) checkpointSet.AddMeasure(measure, 17, labels...) - expected = append(expected, `measure_bucket{A="B",C="D",le="0"} 0`) - expected = append(expected, `measure_bucket{A="B",C="D",le="10"} 0`) - expected = append(expected, `measure_bucket{A="B",C="D",le="15"} 2`) - expected = append(expected, `measure_bucket{A="B",C="D",le="20"} 3`) - expected = append(expected, `measure_bucket{A="B",C="D",le="+Inf"} 3`) + expected = append(expected, `measure{A="B",C="D",quantile="0.5"} 15`) + expected = append(expected, `measure{A="B",C="D",quantile="0.9"} 17`) + expected = append(expected, `measure{A="B",C="D",quantile="0.99"} 17`) expected = append(expected, `measure_sum{A="B",C="D"} 45`) expected = append(expected, `measure_count{A="B",C="D"} 3`) @@ -69,51 +67,6 @@ func TestPrometheusExporter(t *testing.T) { checkpointSet.AddGauge(gauge, 32, missingLabels...) expected = append(expected, `gauge{A="E",C=""} 32`) - checkpointSet.AddMeasure(measure, 19, missingLabels...) - expected = append(expected, `measure_bucket{A="E",C="",le="+Inf"} 1`) - expected = append(expected, `measure_bucket{A="E",C="",le="0"} 0`) - expected = append(expected, `measure_bucket{A="E",C="",le="10"} 0`) - expected = append(expected, `measure_bucket{A="E",C="",le="15"} 0`) - expected = append(expected, `measure_bucket{A="E",C="",le="20"} 1`) - expected = append(expected, `measure_count{A="E",C=""} 1`) - expected = append(expected, `measure_sum{A="E",C=""} 19`) - - compareExport(t, exporter, checkpointSet, expected) -} - -func TestPrometheusExporter_Summaries(t *testing.T) { - exporter, err := prometheus.NewExporter(prometheus.Options{ - MeasureAggregation: prometheus.Summary, - }) - if err != nil { - log.Panicf("failed to initialize metric stdout exporter %v", err) - } - - var expected []string - checkpointSet := test.NewCheckpointSet(metric.NewDefaultLabelEncoder()) - - measure := export.NewDescriptor( - "measure", export.MeasureKind, nil, "", "", core.Float64NumberKind, false) - - labels := []core.KeyValue{ - key.New("A").String("B"), - key.New("C").String("D"), - } - - checkpointSet.AddMeasure(measure, 13, labels...) - checkpointSet.AddMeasure(measure, 15, labels...) - checkpointSet.AddMeasure(measure, 17, labels...) - expected = append(expected, `measure_count{A="B",C="D"} 3`) - expected = append(expected, `measure_sum{A="B",C="D"} 45`) - expected = append(expected, `measure{A="B",C="D",quantile="0.5"} 15`) - expected = append(expected, `measure{A="B",C="D",quantile="0.9"} 17`) - expected = append(expected, `measure{A="B",C="D",quantile="0.99"} 17`) - - missingLabels := []core.KeyValue{ - key.New("A").String("E"), - key.New("C").String(""), - } - checkpointSet.AddMeasure(measure, 19, missingLabels...) expected = append(expected, `measure{A="E",C="",quantile="0.5"} 19`) expected = append(expected, `measure{A="E",C="",quantile="0.9"} 19`) diff --git a/exporter/metric/prometheus/summary.go b/exporter/metric/prometheus/summary.go deleted file mode 100644 index 7eed2f40c..000000000 --- a/exporter/metric/prometheus/summary.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2019, OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package prometheus - -import ( - "github.com/prometheus/client_golang/prometheus" - - export "go.opentelemetry.io/otel/sdk/export/metric" - "go.opentelemetry.io/otel/sdk/export/metric/aggregator" -) - -type summaries struct { - defaultSummaryObjectives map[float64]float64 - registerer prometheus.Registerer - summary map[metricKey]prometheus.Observer - summaryVec map[*export.Descriptor]*prometheus.SummaryVec -} - -func newSummaries(registerer prometheus.Registerer, defaultSummaryObjectives map[float64]float64) summaries { - return summaries{ - registerer: registerer, - summary: make(map[metricKey]prometheus.Observer), - summaryVec: make(map[*export.Descriptor]*prometheus.SummaryVec), - defaultSummaryObjectives: defaultSummaryObjectives, - } -} - -func (su *summaries) export(points aggregator.Points, record export.Record, mKey metricKey) error { - values, err := points.Points() - if err != nil { - return err - } - - obs, err := su.getSummary(record, mKey) - if err != nil { - return err - } - - desc := record.Descriptor() - for _, v := range values { - obs.Observe(v.CoerceToFloat64(desc.NumberKind())) - } - return nil -} - -func (su *summaries) getSummary(record export.Record, mKey metricKey) (prometheus.Observer, error) { - if c, ok := su.summary[mKey]; ok { - return c, nil - } - - desc := record.Descriptor() - histogramVec, err := su.getSummaryVec(desc, record.Labels()) - if err != nil { - return nil, err - } - - obs, err := histogramVec.GetMetricWithLabelValues(labelValues(record.Labels())...) - if err != nil { - return nil, err - } - - su.summary[mKey] = obs - return obs, nil -} - -func (su *summaries) getSummaryVec(desc *export.Descriptor, labels export.Labels) (*prometheus.SummaryVec, error) { - if gv, ok := su.summaryVec[desc]; ok { - return gv, nil - } - - g := prometheus.NewSummaryVec( - prometheus.SummaryOpts{ - Name: sanitize(desc.Name()), - Help: desc.Description(), - Objectives: su.defaultSummaryObjectives, - }, - labelsKeys(labels.Ordered()), - ) - - if err := su.registerer.Register(g); err != nil { - return nil, err - } - - su.summaryVec[desc] = g - return g, nil -} diff --git a/exporter/metric/test/test.go b/exporter/metric/test/test.go index 7424fdf43..6718294fd 100644 --- a/exporter/metric/test/test.go +++ b/exporter/metric/test/test.go @@ -12,24 +12,41 @@ import ( type CheckpointSet struct { encoder export.LabelEncoder + records map[string]export.Record updates []export.Record } +// NewCheckpointSet returns a test CheckpointSet that new records could be added. +// Records are grouped by their LabelSet. func NewCheckpointSet(encoder export.LabelEncoder) *CheckpointSet { return &CheckpointSet{ encoder: encoder, + records: make(map[string]export.Record), } } func (p *CheckpointSet) Reset() { + p.records = make(map[string]export.Record) p.updates = nil } -func (p *CheckpointSet) Add(desc *export.Descriptor, agg export.Aggregator, labels ...core.KeyValue) { +// Add a new descriptor to a Checkpoint. +// +// If there is an existing record with the same descriptor and LabelSet +// the stored aggregator will be returned and should be merged. +func (p *CheckpointSet) Add(desc *export.Descriptor, newAgg export.Aggregator, labels ...core.KeyValue) (agg export.Aggregator, added bool) { encoded := p.encoder.Encode(labels) elabels := export.NewLabels(labels, encoded, p.encoder) - p.updates = append(p.updates, export.NewRecord(desc, elabels, agg)) + key := desc.Name() + "_" + elabels.Encoded() + if record, ok := p.records[key]; ok { + return record.Aggregator(), false + } + + rec := export.NewRecord(desc, elabels, newAgg) + p.updates = append(p.updates, rec) + p.records[key] = rec + return newAgg, true } func createNumber(desc *export.Descriptor, v float64) core.Number { @@ -40,27 +57,29 @@ func createNumber(desc *export.Descriptor, v float64) core.Number { } func (p *CheckpointSet) AddGauge(desc *export.Descriptor, v float64, labels ...core.KeyValue) { - ctx := context.Background() - gagg := gauge.New() - _ = gagg.Update(ctx, createNumber(desc, v), desc) - gagg.Checkpoint(ctx, desc) - p.Add(desc, gagg, labels...) + p.updateAggregator(desc, gauge.New(), v, labels...) } func (p *CheckpointSet) AddCounter(desc *export.Descriptor, v float64, labels ...core.KeyValue) { - ctx := context.Background() - cagg := counter.New() - _ = cagg.Update(ctx, createNumber(desc, v), desc) - cagg.Checkpoint(ctx, desc) - p.Add(desc, cagg, labels...) + p.updateAggregator(desc, counter.New(), v, labels...) } func (p *CheckpointSet) AddMeasure(desc *export.Descriptor, v float64, labels ...core.KeyValue) { + p.updateAggregator(desc, array.New(), v, labels...) +} + +func (p *CheckpointSet) updateAggregator(desc *export.Descriptor, newAgg export.Aggregator, v float64, labels ...core.KeyValue) { ctx := context.Background() - magg := array.New() - _ = magg.Update(ctx, createNumber(desc, v), desc) - magg.Checkpoint(ctx, desc) - p.Add(desc, magg, labels...) + // Updates and checkpoint the new aggregator + _ = newAgg.Update(ctx, createNumber(desc, v), desc) + newAgg.Checkpoint(ctx, desc) + + // Try to add this aggregator to the CheckpointSet + agg, added := p.Add(desc, newAgg, labels...) + if !added { + // An aggregator already exist for this descriptor and label set, we should merge them. + _ = agg.Merge(newAgg, desc) + } } func (p *CheckpointSet) ForEach(f func(export.Record)) { diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index 774515abd..b6aa8fbc3 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -27,6 +27,7 @@ import ( // Controller organizes a periodic push of metric data. type Controller struct { lock sync.Mutex + collectLock sync.Mutex sdk *sdk.SDK errorHandler sdk.ErrorHandler batcher export.Batcher @@ -160,8 +161,12 @@ func (c *Controller) tick() { // TODO: either remove the context argument from Export() or // configure a timeout here? ctx := context.Background() - c.sdk.Collect(ctx) - err := c.exporter.Export(ctx, c.batcher.CheckpointSet()) + c.collect(ctx) + checkpointSet := syncCheckpointSet{ + mtx: &c.collectLock, + delegate: c.batcher.CheckpointSet(), + } + err := c.exporter.Export(ctx, checkpointSet) c.batcher.FinishedCollection() if err != nil { @@ -169,6 +174,28 @@ func (c *Controller) tick() { } } +func (c *Controller) collect(ctx context.Context) { + c.collectLock.Lock() + defer c.collectLock.Unlock() + + c.sdk.Collect(ctx) +} + +// syncCheckpointSet is a wrapper for a CheckpointSet to synchronize +// SDK's collection and reads of a CheckpointSet by an exporter. +type syncCheckpointSet struct { + mtx *sync.Mutex + delegate export.CheckpointSet +} + +var _ export.CheckpointSet = (*syncCheckpointSet)(nil) + +func (c syncCheckpointSet) ForEach(fn func(export.Record)) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.delegate.ForEach(fn) +} + func (realClock) Now() time.Time { return time.Now() }