diff --git a/CHANGELOG.md b/CHANGELOG.md index 94604fe68..3e75102b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -209,6 +209,8 @@ This release drops the compatibility guarantee of [Go 1.18]. - Handle empty environment variable as it they were not set. (#3764) - Clarify the `httpconv` and `netconv` packages in `go.opentelemetry.io/otel/semconv/*` provide tracing semantic conventions. (#3823) +- Fix race conditions in `go.opentelemetry.io/otel/exporters/metric/prometheus` that could cause a panic. (#3899) +- Fix sending nil `scopeInfo` to metrics channel in `go.opentelemetry.io/otel/exporters/metric/prometheus` that could cause a panic in `github.com/prometheus/client_golang/prometheus`. (#3899) ### Deprecated diff --git a/exporters/prometheus/exporter.go b/exporters/prometheus/exporter.go index 653d8c381..4fdbcfa17 100644 --- a/exporters/prometheus/exporter.go +++ b/exporters/prometheus/exporter.go @@ -59,14 +59,15 @@ var _ metric.Reader = &Exporter{} type collector struct { reader metric.Reader - disableTargetInfo bool - withoutUnits bool - targetInfo prometheus.Metric - disableScopeInfo bool - createTargetInfoOnce sync.Once - scopeInfos map[instrumentation.Scope]prometheus.Metric - metricFamilies map[string]*dto.MetricFamily - namespace string + withoutUnits bool + disableScopeInfo bool + namespace string + + mu sync.Mutex // mu protects all members below from the concurrent access. + disableTargetInfo bool + targetInfo prometheus.Metric + scopeInfos map[instrumentation.Scope]prometheus.Metric + metricFamilies map[string]*dto.MetricFamily } // prometheus counters MUST have a _total suffix: @@ -113,6 +114,8 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) { } // Collect implements prometheus.Collector. +// +// This method is safe to call concurrently. func (c *collector) Collect(ch chan<- prometheus.Metric) { // TODO (#3047): Use a sync.Pool instead of allocating metrics every Collect. metrics := metricdata.ResourceMetrics{} @@ -124,16 +127,24 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { } } - c.createTargetInfoOnce.Do(func() { - // Resource should be immutable, we don't need to compute again - targetInfo, err := c.createInfoMetric(targetInfoMetricName, targetInfoDescription, metrics.Resource) - if err != nil { - // If the target info metric is invalid, disable sending it. - otel.Handle(err) - c.disableTargetInfo = true + // Initialize (once) targetInfo and disableTargetInfo. + func() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.targetInfo == nil && !c.disableTargetInfo { + targetInfo, err := createInfoMetric(targetInfoMetricName, targetInfoDescription, metrics.Resource) + if err != nil { + // If the target info metric is invalid, disable sending it. + c.disableTargetInfo = true + otel.Handle(err) + return + } + + c.targetInfo = targetInfo } - c.targetInfo = targetInfo - }) + }() + if !c.disableTargetInfo { ch <- c.targetInfo } @@ -142,48 +153,53 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) { var keys, values [2]string if !c.disableScopeInfo { - scopeInfo, ok := c.scopeInfos[scopeMetrics.Scope] - if !ok { - scopeInfo, err = createScopeInfoMetric(scopeMetrics.Scope) - if err != nil { - otel.Handle(err) - } - c.scopeInfos[scopeMetrics.Scope] = scopeInfo + scopeInfo, err := c.scopeInfo(scopeMetrics.Scope) + if err != nil { + otel.Handle(err) + continue } + ch <- scopeInfo + keys = scopeInfoKeys values = [2]string{scopeMetrics.Scope.Name, scopeMetrics.Scope.Version} } for _, m := range scopeMetrics.Metrics { + typ, name := c.metricTypeAndName(m) + if typ == nil { + continue + } + + drop, help := c.validateMetrics(name, m.Description, typ) + if drop { + continue + } + + if help != "" { + m.Description = help + } + switch v := m.Data.(type) { case metricdata.Histogram[int64]: - addHistogramMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies) + addHistogramMetric(ch, v, m, keys, values, name) case metricdata.Histogram[float64]: - addHistogramMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies) + addHistogramMetric(ch, v, m, keys, values, name) case metricdata.Sum[int64]: - addSumMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies) + addSumMetric(ch, v, m, keys, values, name) case metricdata.Sum[float64]: - addSumMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies) + addSumMetric(ch, v, m, keys, values, name) case metricdata.Gauge[int64]: - addGaugeMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies) + addGaugeMetric(ch, v, m, keys, values, name) case metricdata.Gauge[float64]: - addGaugeMetric(ch, v, m, keys, values, c.getName(m), c.metricFamilies) + addGaugeMetric(ch, v, m, keys, values, name) } } } } -func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) { +func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string) { // TODO(https://github.com/open-telemetry/opentelemetry-go/issues/3163): support exemplars - drop, help := validateMetrics(name, m.Description, dto.MetricType_HISTOGRAM.Enum(), mfs) - if drop { - return - } - if help != "" { - m.Description = help - } - for _, dp := range histogram.DataPoints { keys, values := getAttrs(dp.Attributes, ks, vs) @@ -204,24 +220,10 @@ func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogra } } -func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) { +func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string) { valueType := prometheus.CounterValue - metricType := dto.MetricType_COUNTER if !sum.IsMonotonic { valueType = prometheus.GaugeValue - metricType = dto.MetricType_GAUGE - } - if sum.IsMonotonic { - // Add _total suffix for counters - name += counterSuffix - } - - drop, help := validateMetrics(name, m.Description, metricType.Enum(), mfs) - if drop { - return - } - if help != "" { - m.Description = help } for _, dp := range sum.DataPoints { @@ -237,15 +239,7 @@ func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata } } -func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string, mfs map[string]*dto.MetricFamily) { - drop, help := validateMetrics(name, m.Description, dto.MetricType_GAUGE.Enum(), mfs) - if drop { - return - } - if help != "" { - m.Description = help - } - +func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string) { for _, dp := range gauge.DataPoints { keys, values := getAttrs(dp.Attributes, ks, vs) @@ -293,7 +287,7 @@ func getAttrs(attrs attribute.Set, ks, vs [2]string) ([]string, []string) { return keys, values } -func (c *collector) createInfoMetric(name, description string, res *resource.Resource) (prometheus.Metric, error) { +func createInfoMetric(name, description string, res *resource.Resource) (prometheus.Metric, error) { keys, values := getAttrs(*res.Set(), [2]string{}, [2]string{}) desc := prometheus.NewDesc(name, description, keys, nil) return prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(1), values...) @@ -386,16 +380,63 @@ func sanitizeName(n string) string { return b.String() } -func validateMetrics(name, description string, metricType *dto.MetricType, mfs map[string]*dto.MetricFamily) (drop bool, help string) { - emf, exist := mfs[name] +func (c *collector) metricTypeAndName(m metricdata.Metrics) (*dto.MetricType, string) { + name := c.getName(m) + + switch v := m.Data.(type) { + case metricdata.Histogram[int64], metricdata.Histogram[float64]: + return dto.MetricType_HISTOGRAM.Enum(), name + case metricdata.Sum[float64]: + if v.IsMonotonic { + return dto.MetricType_COUNTER.Enum(), name + counterSuffix + } + return dto.MetricType_GAUGE.Enum(), name + case metricdata.Sum[int64]: + if v.IsMonotonic { + return dto.MetricType_COUNTER.Enum(), name + counterSuffix + } + return dto.MetricType_GAUGE.Enum(), name + case metricdata.Gauge[int64], metricdata.Gauge[float64]: + return dto.MetricType_GAUGE.Enum(), name + } + + return nil, "" +} + +func (c *collector) scopeInfo(scope instrumentation.Scope) (prometheus.Metric, error) { + c.mu.Lock() + defer c.mu.Unlock() + + scopeInfo, ok := c.scopeInfos[scope] + if ok { + return scopeInfo, nil + } + + scopeInfo, err := createScopeInfoMetric(scope) + if err != nil { + return nil, fmt.Errorf("cannot create scope info metric: %w", err) + } + + c.scopeInfos[scope] = scopeInfo + + return scopeInfo, nil +} + +func (c *collector) validateMetrics(name, description string, metricType *dto.MetricType) (drop bool, help string) { + c.mu.Lock() + defer c.mu.Unlock() + + emf, exist := c.metricFamilies[name] + if !exist { - mfs[name] = &dto.MetricFamily{ + c.metricFamilies[name] = &dto.MetricFamily{ Name: proto.String(name), Help: proto.String(description), Type: metricType, } return false, "" } + if emf.GetType() != *metricType { global.Error( errors.New("instrument type conflict"), diff --git a/exporters/prometheus/exporter_test.go b/exporters/prometheus/exporter_test.go index 3f014971d..2af7d71ba 100644 --- a/exporters/prometheus/exporter_test.go +++ b/exporters/prometheus/exporter_test.go @@ -17,15 +17,19 @@ package prometheus import ( "context" "os" + "strings" "testing" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" otelmetric "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/resource" @@ -672,3 +676,164 @@ func TestDuplicateMetrics(t *testing.T) { }) } } + +func TestCollectConcurrentSafe(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := newConfig(WithRegisterer(registry)) + + reader := metric.NewManualReader(cfg.manualReaderOptions()...) + + collector := &collector{ + reader: reader, + disableTargetInfo: false, + withoutUnits: true, + disableScopeInfo: cfg.disableScopeInfo, + scopeInfos: make(map[instrumentation.Scope]prometheus.Metric), + metricFamilies: make(map[string]*dto.MetricFamily), + } + + err := cfg.registerer.Register(collector) + require.NoError(t, err) + + ctx := context.Background() + + // initialize resource + res, err := resource.New(ctx, + resource.WithAttributes(semconv.ServiceName("prometheus_test")), + resource.WithAttributes(semconv.TelemetrySDKVersion("latest")), + ) + require.NoError(t, err) + res, err = resource.Merge(resource.Default(), res) + require.NoError(t, err) + + exporter := &Exporter{Reader: reader} + + // initialize provider + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(res), + ) + + // initialize two meter a, b + meterA := provider.Meter("ma", otelmetric.WithInstrumentationVersion("v0.1.0")) + meterB := provider.Meter("mb", otelmetric.WithInstrumentationVersion("v0.1.0")) + + fooA, err := meterA.Int64Counter("foo", + otelmetric.WithUnit("By"), + otelmetric.WithDescription("meter counter foo")) + assert.NoError(t, err) + + opt := otelmetric.WithAttributes( + attribute.Key("A").String("B"), + ) + + fooA.Add(ctx, 100, opt) + + fooB, err := meterB.Int64Counter("foo", + otelmetric.WithUnit("By"), + otelmetric.WithDescription("meter counter foo")) + assert.NoError(t, err) + fooB.Add(ctx, 100, opt) + + concurrencyLevel := 100 + ch := make(chan prometheus.Metric, concurrencyLevel) + + for i := 0; i < concurrencyLevel; i++ { + go func() { + collector.Collect(ch) + }() + } + + for ; concurrencyLevel > 0; concurrencyLevel-- { + select { + case <-ch: + concurrencyLevel-- + if concurrencyLevel == 0 { + return + } + case <-time.After(time.Second): + t.Fatal("timeout") + } + } +} + +func TesInvalidInsrtrumentForPrometheusIsIgnored(t *testing.T) { + registry := prometheus.NewRegistry() + cfg := newConfig(WithRegisterer(registry)) + + reader := metric.NewManualReader(cfg.manualReaderOptions()...) + + collector := &collector{ + reader: reader, + disableTargetInfo: false, + withoutUnits: true, + disableScopeInfo: false, + scopeInfos: make(map[instrumentation.Scope]prometheus.Metric), + metricFamilies: make(map[string]*dto.MetricFamily), + } + + err := cfg.registerer.Register(collector) + require.NoError(t, err) + + ctx := context.Background() + + // initialize resource + res, err := resource.New(ctx, + resource.WithAttributes(semconv.ServiceName("prometheus_test")), + resource.WithAttributes(semconv.TelemetrySDKVersion("latest")), + ) + require.NoError(t, err) + res, err = resource.Merge(resource.Default(), res) + require.NoError(t, err) + + exporter := &Exporter{Reader: reader} + + // initialize provider + provider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithResource(res), + ) + + // invalid label or metric name leads to error returned from + // createScopeInfoMetric + invalidName := string([]byte{0xff, 0xfe, 0xfd}) + validName := "validName" + + meterA := provider.Meter(invalidName, otelmetric.WithInstrumentationVersion("v0.1.0")) + + counterA, err := meterA.Int64Counter("with-invalid-description", + otelmetric.WithUnit("By"), + otelmetric.WithDescription(invalidName)) + assert.NoError(t, err) + + counterA.Add(ctx, 100, otelmetric.WithAttributes( + attribute.Key(invalidName).String(invalidName), + )) + + meterB := provider.Meter(validName, otelmetric.WithInstrumentationVersion("v0.1.0")) + counterB, err := meterB.Int64Counter(validName, + otelmetric.WithUnit("By"), + otelmetric.WithDescription(validName)) + assert.NoError(t, err) + + counterB.Add(ctx, 100, otelmetric.WithAttributes( + attribute.Key(validName).String(validName), + )) + + ch := make(chan prometheus.Metric) + + go collector.Collect(ch) + + for { + select { + case m := <-ch: + require.NotNil(t, m) + + if strings.Contains(m.Desc().String(), validName) { + return + } + case <-time.After(time.Second): + t.Fatalf("timeout") + } + } +}