1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-10 00:29:12 +02:00

Prometheus exporter: concurrent collect bugfix (#3899)

* Concurrent collect bugfix

* Used sync.Mutex and code cleanup

* Revert "Concurrent collect bugfix"

This reverts commit 1a30f233b6.

* Used sync.Mutex and re-grouped protected members

* Added test and updated changelog

* Updated changelog

* Take care of potential panic in otel.Handle

* Extracted critical section in a separate method and fixed nil scope info

* Lock the whole scope of the func

* Moved otel.Handle out of the critical section

* Fixed calling createScopeInfoMetric twice and updated changelog

* Fixed markdown linter errors

* Added test for nil scopeinfo

* Fix merge artifacts

* Fixed linter errors

* Protect the whole validateMetrics method wity mutex

* Update CHANGELOG.md

* Update exporters/prometheus/exporter.go

Co-authored-by: Robert Pająk <pellared@hotmail.com>

* Update CHANGELOG.md

* Document that Collect is concurrent-safe

* Update exporter_test.go

* Update exporters/prometheus/exporter_test.go

* Update exporters/prometheus/exporter.go

Co-authored-by: David Ashpole <dashpole@google.com>

---------

Co-authored-by: Robert Pająk <pellared@hotmail.com>
Co-authored-by: David Ashpole <dashpole@google.com>
This commit is contained in:
Max Chechel 2023-06-29 15:31:20 +04:00 committed by GitHub
parent 64e76f8be4
commit 457029232d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 275 additions and 67 deletions

View File

@ -209,6 +209,8 @@ This release drops the compatibility guarantee of [Go 1.18].
- Handle empty environment variable as it they were not set. (#3764)
- Clarify the `httpconv` and `netconv` packages in `go.opentelemetry.io/otel/semconv/*` provide tracing semantic conventions. (#3823)
- Fix race conditions in `go.opentelemetry.io/otel/exporters/metric/prometheus` that could cause a panic. (#3899)
- Fix sending nil `scopeInfo` to metrics channel in `go.opentelemetry.io/otel/exporters/metric/prometheus` that could cause a panic in `github.com/prometheus/client_golang/prometheus`. (#3899)
### Deprecated

View File

@ -59,14 +59,15 @@ var _ metric.Reader = &Exporter{}
type collector struct {
reader metric.Reader
disableTargetInfo bool
withoutUnits bool
targetInfo prometheus.Metric
disableScopeInfo bool
createTargetInfoOnce sync.Once
scopeInfos map[instrumentation.Scope]prometheus.Metric
metricFamilies map[string]*dto.MetricFamily
namespace string
withoutUnits bool
disableScopeInfo bool
namespace string
mu sync.Mutex // mu protects all members below from the concurrent access.
disableTargetInfo bool
targetInfo prometheus.Metric
scopeInfos map[instrumentation.Scope]prometheus.Metric
metricFamilies map[string]*dto.MetricFamily
}
// prometheus counters MUST have a _total suffix:
@ -113,6 +114,8 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
}
// Collect implements prometheus.Collector.
//
// This method is safe to call concurrently.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
// TODO (#3047): Use a sync.Pool instead of allocating metrics every Collect.
metrics := metricdata.ResourceMetrics{}
@ -124,16 +127,24 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
}
}
c.createTargetInfoOnce.Do(func() {
// Resource should be immutable, we don't need to compute again
targetInfo, err := c.createInfoMetric(targetInfoMetricName, targetInfoDescription, metrics.Resource)
if err != nil {
// If the target info metric is invalid, disable sending it.
otel.Handle(err)
c.disableTargetInfo = true
// Initialize (once) targetInfo and disableTargetInfo.
func() {
c.mu.Lock()
defer c.mu.Unlock()
if c.targetInfo == nil && !c.disableTargetInfo {
targetInfo, err := createInfoMetric(targetInfoMetricName, targetInfoDescription, metrics.Resource)
if err != nil {
// If the target info metric is invalid, disable sending it.
c.disableTargetInfo = true
otel.Handle(err)
return
}
c.targetInfo = targetInfo
}
c.targetInfo = targetInfo
})
}()
if !c.disableTargetInfo {
ch <- c.targetInfo
}
@ -142,48 +153,53 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
var keys, values [2]string
if !c.disableScopeInfo {
scopeInfo, ok := c.scopeInfos[scopeMetrics.Scope]
if !ok {
scopeInfo, err = createScopeInfoMetric(scopeMetrics.Scope)
if err != nil {
otel.Handle(err)
}
c.scopeInfos[scopeMetrics.Scope] = scopeInfo
scopeInfo, err := c.scopeInfo(scopeMetrics.Scope)
if err != nil {
otel.Handle(err)
continue
}
ch <- scopeInfo
keys = scopeInfoKeys
values = [2]string{scopeMetrics.Scope.Name, scopeMetrics.Scope.Version}
}
for _, m := range scopeMetrics.Metrics {
typ, name := c.metricTypeAndName(m)
if typ == nil {
continue
}
drop, help := c.validateMetrics(name, m.Description, typ)
if drop {
continue
}
if help != "" {
m.Description = help
}
switch v := m.Data.(type) {
case metricdata.Histogram[int64]:
addHistogramMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addHistogramMetric(ch, v, m, keys, values, name)
case metricdata.Histogram[float64]:
addHistogramMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addHistogramMetric(ch, v, m, keys, values, name)
case metricdata.Sum[int64]:
addSumMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addSumMetric(ch, v, m, keys, values, name)
case metricdata.Sum[float64]:
addSumMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addSumMetric(ch, v, m, keys, values, name)
case metricdata.Gauge[int64]:
addGaugeMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addGaugeMetric(ch, v, m, keys, values, name)
case metricdata.Gauge[float64]:
addGaugeMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies)
addGaugeMetric(ch, v, m, keys, values, name)
}
}
}
}
func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) {
func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string) {
// TODO(https://github.com/open-telemetry/opentelemetry-go/issues/3163): support exemplars
drop, help := validateMetrics(name, m.Description, dto.MetricType_HISTOGRAM.Enum(), mfs)
if drop {
return
}
if help != "" {
m.Description = help
}
for _, dp := range histogram.DataPoints {
keys, values := getAttrs(dp.Attributes, ks, vs)
@ -204,24 +220,10 @@ func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogra
}
}
func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) {
func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string) {
valueType := prometheus.CounterValue
metricType := dto.MetricType_COUNTER
if !sum.IsMonotonic {
valueType = prometheus.GaugeValue
metricType = dto.MetricType_GAUGE
}
if sum.IsMonotonic {
// Add _total suffix for counters
name += counterSuffix
}
drop, help := validateMetrics(name, m.Description, metricType.Enum(), mfs)
if drop {
return
}
if help != "" {
m.Description = help
}
for _, dp := range sum.DataPoints {
@ -237,15 +239,7 @@ func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata
}
}
func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) {
drop, help := validateMetrics(name, m.Description, dto.MetricType_GAUGE.Enum(), mfs)
if drop {
return
}
if help != "" {
m.Description = help
}
func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string) {
for _, dp := range gauge.DataPoints {
keys, values := getAttrs(dp.Attributes, ks, vs)
@ -293,7 +287,7 @@ func getAttrs(attrs attribute.Set, ks, vs [2]string) ([]string, []string) {
return keys, values
}
func (c *collector) createInfoMetric(name, description string, res *resource.Resource) (prometheus.Metric, error) {
func createInfoMetric(name, description string, res *resource.Resource) (prometheus.Metric, error) {
keys, values := getAttrs(*res.Set(), [2]string{}, [2]string{})
desc := prometheus.NewDesc(name, description, keys, nil)
return prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(1), values...)
@ -386,16 +380,63 @@ func sanitizeName(n string) string {
return b.String()
}
func validateMetrics(name, description string, metricType *dto.MetricType, mfs map[string]*dto.MetricFamily) (drop bool, help string) {
emf, exist := mfs[name]
func (c *collector) metricTypeAndName(m metricdata.Metrics) (*dto.MetricType, string) {
name := c.getName(m)
switch v := m.Data.(type) {
case metricdata.Histogram[int64], metricdata.Histogram[float64]:
return dto.MetricType_HISTOGRAM.Enum(), name
case metricdata.Sum[float64]:
if v.IsMonotonic {
return dto.MetricType_COUNTER.Enum(), name + counterSuffix
}
return dto.MetricType_GAUGE.Enum(), name
case metricdata.Sum[int64]:
if v.IsMonotonic {
return dto.MetricType_COUNTER.Enum(), name + counterSuffix
}
return dto.MetricType_GAUGE.Enum(), name
case metricdata.Gauge[int64], metricdata.Gauge[float64]:
return dto.MetricType_GAUGE.Enum(), name
}
return nil, ""
}
func (c *collector) scopeInfo(scope instrumentation.Scope) (prometheus.Metric, error) {
c.mu.Lock()
defer c.mu.Unlock()
scopeInfo, ok := c.scopeInfos[scope]
if ok {
return scopeInfo, nil
}
scopeInfo, err := createScopeInfoMetric(scope)
if err != nil {
return nil, fmt.Errorf("cannot create scope info metric: %w", err)
}
c.scopeInfos[scope] = scopeInfo
return scopeInfo, nil
}
func (c *collector) validateMetrics(name, description string, metricType *dto.MetricType) (drop bool, help string) {
c.mu.Lock()
defer c.mu.Unlock()
emf, exist := c.metricFamilies[name]
if !exist {
mfs[name] = &dto.MetricFamily{
c.metricFamilies[name] = &dto.MetricFamily{
Name: proto.String(name),
Help: proto.String(description),
Type: metricType,
}
return false, ""
}
if emf.GetType() != *metricType {
global.Error(
errors.New("instrument type conflict"),

View File

@ -17,15 +17,19 @@ package prometheus
import (
"context"
"os"
"strings"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/resource"
@ -672,3 +676,164 @@ func TestDuplicateMetrics(t *testing.T) {
})
}
}
func TestCollectConcurrentSafe(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := newConfig(WithRegisterer(registry))
reader := metric.NewManualReader(cfg.manualReaderOptions()...)
collector := &collector{
reader: reader,
disableTargetInfo: false,
withoutUnits: true,
disableScopeInfo: cfg.disableScopeInfo,
scopeInfos: make(map[instrumentation.Scope]prometheus.Metric),
metricFamilies: make(map[string]*dto.MetricFamily),
}
err := cfg.registerer.Register(collector)
require.NoError(t, err)
ctx := context.Background()
// initialize resource
res, err := resource.New(ctx,
resource.WithAttributes(semconv.ServiceName("prometheus_test")),
resource.WithAttributes(semconv.TelemetrySDKVersion("latest")),
)
require.NoError(t, err)
res, err = resource.Merge(resource.Default(), res)
require.NoError(t, err)
exporter := &Exporter{Reader: reader}
// initialize provider
provider := metric.NewMeterProvider(
metric.WithReader(exporter),
metric.WithResource(res),
)
// initialize two meter a, b
meterA := provider.Meter("ma", otelmetric.WithInstrumentationVersion("v0.1.0"))
meterB := provider.Meter("mb", otelmetric.WithInstrumentationVersion("v0.1.0"))
fooA, err := meterA.Int64Counter("foo",
otelmetric.WithUnit("By"),
otelmetric.WithDescription("meter counter foo"))
assert.NoError(t, err)
opt := otelmetric.WithAttributes(
attribute.Key("A").String("B"),
)
fooA.Add(ctx, 100, opt)
fooB, err := meterB.Int64Counter("foo",
otelmetric.WithUnit("By"),
otelmetric.WithDescription("meter counter foo"))
assert.NoError(t, err)
fooB.Add(ctx, 100, opt)
concurrencyLevel := 100
ch := make(chan prometheus.Metric, concurrencyLevel)
for i := 0; i < concurrencyLevel; i++ {
go func() {
collector.Collect(ch)
}()
}
for ; concurrencyLevel > 0; concurrencyLevel-- {
select {
case <-ch:
concurrencyLevel--
if concurrencyLevel == 0 {
return
}
case <-time.After(time.Second):
t.Fatal("timeout")
}
}
}
func TesInvalidInsrtrumentForPrometheusIsIgnored(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := newConfig(WithRegisterer(registry))
reader := metric.NewManualReader(cfg.manualReaderOptions()...)
collector := &collector{
reader: reader,
disableTargetInfo: false,
withoutUnits: true,
disableScopeInfo: false,
scopeInfos: make(map[instrumentation.Scope]prometheus.Metric),
metricFamilies: make(map[string]*dto.MetricFamily),
}
err := cfg.registerer.Register(collector)
require.NoError(t, err)
ctx := context.Background()
// initialize resource
res, err := resource.New(ctx,
resource.WithAttributes(semconv.ServiceName("prometheus_test")),
resource.WithAttributes(semconv.TelemetrySDKVersion("latest")),
)
require.NoError(t, err)
res, err = resource.Merge(resource.Default(), res)
require.NoError(t, err)
exporter := &Exporter{Reader: reader}
// initialize provider
provider := metric.NewMeterProvider(
metric.WithReader(exporter),
metric.WithResource(res),
)
// invalid label or metric name leads to error returned from
// createScopeInfoMetric
invalidName := string([]byte{0xff, 0xfe, 0xfd})
validName := "validName"
meterA := provider.Meter(invalidName, otelmetric.WithInstrumentationVersion("v0.1.0"))
counterA, err := meterA.Int64Counter("with-invalid-description",
otelmetric.WithUnit("By"),
otelmetric.WithDescription(invalidName))
assert.NoError(t, err)
counterA.Add(ctx, 100, otelmetric.WithAttributes(
attribute.Key(invalidName).String(invalidName),
))
meterB := provider.Meter(validName, otelmetric.WithInstrumentationVersion("v0.1.0"))
counterB, err := meterB.Int64Counter(validName,
otelmetric.WithUnit("By"),
otelmetric.WithDescription(validName))
assert.NoError(t, err)
counterB.Add(ctx, 100, otelmetric.WithAttributes(
attribute.Key(validName).String(validName),
))
ch := make(chan prometheus.Metric)
go collector.Collect(ch)
for {
select {
case m := <-ch:
require.NotNil(t, m)
if strings.Contains(m.Desc().String(), validName) {
return
}
case <-time.After(time.Second):
t.Fatalf("timeout")
}
}
}