You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2026-06-03 18:35:08 +02:00
prometheus: fix Collect data race for constant resource labels (#8227)
This commit is contained in:
@@ -43,6 +43,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Fix gzipped request body replay on redirect in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#8152)
|
||||
- `go.opentelemetry.io/otel/exporters/prometheus` now uses `Value.String` formatting for label values following the [OpenTelemetry AnyValue representation for non-OTLP protocols](https://opentelemetry.io/docs/specs/otel/common/#anyvalue). (#8170)
|
||||
- Propagate errors from the exporter when calling `Shutdown` on `BatchSpanProcessor` in `go.opentelemetry.io/otel/sdk/trace`. (#8197)
|
||||
- Fix a concurrent `Collect` data race and potential panic in `go.opentelemetry.io/otel/exporters/prometheus` when `WithResourceAsConstantLabels` option is used. (#8227)
|
||||
|
||||
<!-- Released section -->
|
||||
<!-- Don't change this section unless doing release -->
|
||||
|
||||
@@ -91,14 +91,18 @@ type collector struct {
|
||||
namespace string
|
||||
resourceAttributesFilter attribute.Filter
|
||||
|
||||
mu sync.Mutex // mu protects all members below from the concurrent access.
|
||||
mu sync.Mutex
|
||||
disableTargetInfo bool
|
||||
targetInfo prometheus.Metric
|
||||
metricFamilies map[string]*dto.MetricFamily
|
||||
resourceKeyVals keyVals
|
||||
metricNamer otlptranslator.MetricNamer
|
||||
labelNamer otlptranslator.LabelNamer
|
||||
unitNamer otlptranslator.UnitNamer
|
||||
|
||||
resourceKeyValsOnce sync.Once
|
||||
resourceKeyVals keyVals
|
||||
resourceKeyValsErr error
|
||||
|
||||
metricNamer otlptranslator.MetricNamer
|
||||
labelNamer otlptranslator.LabelNamer
|
||||
unitNamer otlptranslator.UnitNamer
|
||||
|
||||
inst *observ.Instrumentation
|
||||
|
||||
@@ -226,11 +230,13 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
ch <- c.targetInfo
|
||||
}
|
||||
|
||||
if c.resourceAttributesFilter != nil && len(c.resourceKeyVals.keys) == 0 {
|
||||
e := c.createResourceAttributes(metrics.Resource)
|
||||
if e != nil {
|
||||
otel.Handle(e)
|
||||
err = errors.Join(err, fmt.Errorf("failed to createResourceAttributes: %w", e))
|
||||
if c.resourceAttributesFilter != nil {
|
||||
c.resourceKeyValsOnce.Do(func() {
|
||||
c.resourceKeyVals, c.resourceKeyValsErr = c.createResourceAttributes(metrics.Resource)
|
||||
})
|
||||
if c.resourceKeyValsErr != nil {
|
||||
otel.Handle(c.resourceKeyValsErr)
|
||||
err = errors.Join(err, fmt.Errorf("failed to createResourceAttributes: %w", c.resourceKeyValsErr))
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -719,18 +725,14 @@ func (c *collector) namingMetricType(m metricdata.Metrics) otlptranslator.Metric
|
||||
return otlptranslator.MetricTypeUnknown
|
||||
}
|
||||
|
||||
func (c *collector) createResourceAttributes(res *resource.Resource) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
func (c *collector) createResourceAttributes(res *resource.Resource) (keyVals, error) {
|
||||
resourceAttrs, _ := res.Set().Filter(c.resourceAttributesFilter)
|
||||
resourceKeys, resourceValues, err := getAttrs(resourceAttrs, c.labelNamer)
|
||||
if err != nil {
|
||||
return err
|
||||
return keyVals{}, err
|
||||
}
|
||||
|
||||
c.resourceKeyVals = keyVals{keys: resourceKeys, vals: resourceValues}
|
||||
return nil
|
||||
return keyVals{keys: resourceKeys, vals: resourceValues}, nil
|
||||
}
|
||||
|
||||
func (c *collector) validateMetrics(name, description string, metricType *dto.MetricType) (drop bool, help string) {
|
||||
|
||||
@@ -1192,6 +1192,45 @@ func TestCollectorConcurrentSafe(t *testing.T) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestCollectorWithResourceAsConstantLabelsConcurrentSafe(t *testing.T) {
|
||||
// This test makes sure resource label initialization is concurrent safe
|
||||
// when using WithResourceAsConstantLabels.
|
||||
ctx := t.Context()
|
||||
|
||||
registry := prometheus.NewRegistry()
|
||||
exporter, err := New(
|
||||
WithRegisterer(registry),
|
||||
WithResourceAsConstantLabels(attribute.NewDenyKeysFilter()),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := resource.New(ctx, resource.WithAttributes(semconv.ServiceName("prometheus_test")))
|
||||
require.NoError(t, err)
|
||||
|
||||
provider := metric.NewMeterProvider(
|
||||
metric.WithReader(exporter),
|
||||
metric.WithResource(res),
|
||||
)
|
||||
t.Cleanup(func() {
|
||||
assert.NoError(t, provider.Shutdown(ctx))
|
||||
})
|
||||
|
||||
meter := provider.Meter("testmeter")
|
||||
cnt, err := meter.Int64Counter("foo")
|
||||
require.NoError(t, err)
|
||||
cnt.Add(ctx, 100)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
const concurrencyLevel = 10
|
||||
for range concurrencyLevel {
|
||||
wg.Go(func() {
|
||||
_, err := registry.Gather() // this calls collector.Collect
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestShutdownExporter(t *testing.T) {
|
||||
var handledError error
|
||||
eh := otel.ErrorHandlerFunc(func(e error) { handledError = errors.Join(handledError, e) })
|
||||
|
||||
Reference in New Issue
Block a user