You've already forked opentelemetry-go
							
							
				mirror of
				https://github.com/open-telemetry/opentelemetry-go.git
				synced 2025-10-31 00:07:40 +02:00 
			
		
		
		
	Fix metric SDK race condition (#293)
* Initial fix * benchmark acquire * Rename handle benchmarks
This commit is contained in:
		| @@ -23,6 +23,7 @@ import ( | ||||
|  | ||||
| 	"go.opentelemetry.io/otel/api/core" | ||||
| 	"go.opentelemetry.io/otel/api/key" | ||||
| 	"go.opentelemetry.io/otel/api/metric" | ||||
| 	export "go.opentelemetry.io/otel/sdk/export/metric" | ||||
| 	sdk "go.opentelemetry.io/otel/sdk/metric" | ||||
| 	"go.opentelemetry.io/otel/sdk/metric/aggregator/counter" | ||||
| @@ -66,6 +67,16 @@ func (bf *benchFixture) AggregatorFor(rec export.Record) export.Aggregator { | ||||
| func (bf *benchFixture) Export(ctx context.Context, rec export.Record, agg export.Aggregator) { | ||||
| } | ||||
|  | ||||
| func makeLabelSets(n int) [][]core.KeyValue { | ||||
| 	r := make([][]core.KeyValue, n) | ||||
|  | ||||
| 	for i := 0; i < n; i++ { | ||||
| 		r[i] = makeLabels(1) | ||||
| 	} | ||||
|  | ||||
| 	return r | ||||
| } | ||||
|  | ||||
| func makeLabels(n int) []core.KeyValue { | ||||
| 	used := map[string]bool{} | ||||
| 	l := make([]core.KeyValue, n) | ||||
| @@ -117,6 +128,61 @@ func BenchmarkLabels_16(b *testing.B) { | ||||
| // Note: performance does not depend on label set size for the | ||||
| // benchmarks below. | ||||
|  | ||||
| func BenchmarkAcquireNewHandle(b *testing.B) { | ||||
| 	fix := newFixture(b) | ||||
| 	labelSets := makeLabelSets(b.N) | ||||
| 	cnt := fix.sdk.NewInt64Counter("int64.counter") | ||||
| 	labels := make([]metric.LabelSet, b.N) | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		labels[i] = fix.sdk.Labels(labelSets[i]...) | ||||
| 	} | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		cnt.AcquireHandle(labels[i]) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkAcquireExistingHandle(b *testing.B) { | ||||
| 	fix := newFixture(b) | ||||
| 	labelSets := makeLabelSets(b.N) | ||||
| 	cnt := fix.sdk.NewInt64Counter("int64.counter") | ||||
| 	labels := make([]metric.LabelSet, b.N) | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		labels[i] = fix.sdk.Labels(labelSets[i]...) | ||||
| 		cnt.AcquireHandle(labels[i]).Release() | ||||
| 	} | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		cnt.AcquireHandle(labels[i]) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkAcquireReleaseExistingHandle(b *testing.B) { | ||||
| 	fix := newFixture(b) | ||||
| 	labelSets := makeLabelSets(b.N) | ||||
| 	cnt := fix.sdk.NewInt64Counter("int64.counter") | ||||
| 	labels := make([]metric.LabelSet, b.N) | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		labels[i] = fix.sdk.Labels(labelSets[i]...) | ||||
| 		cnt.AcquireHandle(labels[i]).Release() | ||||
| 	} | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		cnt.AcquireHandle(labels[i]).Release() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Counters | ||||
|  | ||||
| func BenchmarkInt64CounterAdd(b *testing.B) { | ||||
| 	ctx := context.Background() | ||||
| 	fix := newFixture(b) | ||||
| @@ -130,19 +196,6 @@ func BenchmarkInt64CounterAdd(b *testing.B) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64CounterAcquireHandle(b *testing.B) { | ||||
| 	fix := newFixture(b) | ||||
| 	labs := fix.sdk.Labels(makeLabels(1)...) | ||||
| 	cnt := fix.sdk.NewInt64Counter("int64.counter") | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		handle := cnt.AcquireHandle(labs) | ||||
| 		handle.Release() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64CounterHandleAdd(b *testing.B) { | ||||
| 	ctx := context.Background() | ||||
| 	fix := newFixture(b) | ||||
| @@ -170,19 +223,6 @@ func BenchmarkFloat64CounterAdd(b *testing.B) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64CounterAcquireHandle(b *testing.B) { | ||||
| 	fix := newFixture(b) | ||||
| 	labs := fix.sdk.Labels(makeLabels(1)...) | ||||
| 	cnt := fix.sdk.NewFloat64Counter("float64.counter") | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		handle := cnt.AcquireHandle(labs) | ||||
| 		handle.Release() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64CounterHandleAdd(b *testing.B) { | ||||
| 	ctx := context.Background() | ||||
| 	fix := newFixture(b) | ||||
| @@ -212,19 +252,6 @@ func BenchmarkInt64GaugeAdd(b *testing.B) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64GaugeAcquireHandle(b *testing.B) { | ||||
| 	fix := newFixture(b) | ||||
| 	labs := fix.sdk.Labels(makeLabels(1)...) | ||||
| 	gau := fix.sdk.NewInt64Gauge("int64.gauge") | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		handle := gau.AcquireHandle(labs) | ||||
| 		handle.Release() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64GaugeHandleAdd(b *testing.B) { | ||||
| 	ctx := context.Background() | ||||
| 	fix := newFixture(b) | ||||
| @@ -252,19 +279,6 @@ func BenchmarkFloat64GaugeAdd(b *testing.B) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64GaugeAcquireHandle(b *testing.B) { | ||||
| 	fix := newFixture(b) | ||||
| 	labs := fix.sdk.Labels(makeLabels(1)...) | ||||
| 	gau := fix.sdk.NewFloat64Gauge("float64.gauge") | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		handle := gau.AcquireHandle(labs) | ||||
| 		handle.Release() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64GaugeHandleAdd(b *testing.B) { | ||||
| 	ctx := context.Background() | ||||
| 	fix := newFixture(b) | ||||
| @@ -294,19 +308,6 @@ func benchmarkInt64MeasureAdd(b *testing.B, name string) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func benchmarkInt64MeasureAcquireHandle(b *testing.B, name string) { | ||||
| 	fix := newFixture(b) | ||||
| 	labs := fix.sdk.Labels(makeLabels(1)...) | ||||
| 	mea := fix.sdk.NewInt64Measure(name) | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		handle := mea.AcquireHandle(labs) | ||||
| 		handle.Release() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func benchmarkInt64MeasureHandleAdd(b *testing.B, name string) { | ||||
| 	ctx := context.Background() | ||||
| 	fix := newFixture(b) | ||||
| @@ -334,19 +335,6 @@ func benchmarkFloat64MeasureAdd(b *testing.B, name string) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func benchmarkFloat64MeasureAcquireHandle(b *testing.B, name string) { | ||||
| 	fix := newFixture(b) | ||||
| 	labs := fix.sdk.Labels(makeLabels(1)...) | ||||
| 	mea := fix.sdk.NewFloat64Measure(name) | ||||
|  | ||||
| 	b.ResetTimer() | ||||
|  | ||||
| 	for i := 0; i < b.N; i++ { | ||||
| 		handle := mea.AcquireHandle(labs) | ||||
| 		handle.Release() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func benchmarkFloat64MeasureHandleAdd(b *testing.B, name string) { | ||||
| 	ctx := context.Background() | ||||
| 	fix := newFixture(b) | ||||
| @@ -367,10 +355,6 @@ func BenchmarkInt64MaxSumCountAdd(b *testing.B) { | ||||
| 	benchmarkInt64MeasureAdd(b, "int64.maxsumcount") | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64MaxSumCountAcquireHandle(b *testing.B) { | ||||
| 	benchmarkInt64MeasureAcquireHandle(b, "int64.maxsumcount") | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64MaxSumCountHandleAdd(b *testing.B) { | ||||
| 	benchmarkInt64MeasureHandleAdd(b, "int64.maxsumcount") | ||||
| } | ||||
| @@ -379,10 +363,6 @@ func BenchmarkFloat64MaxSumCountAdd(b *testing.B) { | ||||
| 	benchmarkFloat64MeasureAdd(b, "float64.maxsumcount") | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64MaxSumCountAcquireHandle(b *testing.B) { | ||||
| 	benchmarkFloat64MeasureAcquireHandle(b, "float64.maxsumcount") | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64MaxSumCountHandleAdd(b *testing.B) { | ||||
| 	benchmarkFloat64MeasureHandleAdd(b, "float64.maxsumcount") | ||||
| } | ||||
| @@ -393,10 +373,6 @@ func BenchmarkInt64DDSketchAdd(b *testing.B) { | ||||
| 	benchmarkInt64MeasureAdd(b, "int64.ddsketch") | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64DDSketchAcquireHandle(b *testing.B) { | ||||
| 	benchmarkInt64MeasureAcquireHandle(b, "int64.ddsketch") | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64DDSketchHandleAdd(b *testing.B) { | ||||
| 	benchmarkInt64MeasureHandleAdd(b, "int64.ddsketch") | ||||
| } | ||||
| @@ -405,10 +381,6 @@ func BenchmarkFloat64DDSketchAdd(b *testing.B) { | ||||
| 	benchmarkFloat64MeasureAdd(b, "float64.ddsketch") | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64DDSketchAcquireHandle(b *testing.B) { | ||||
| 	benchmarkFloat64MeasureAcquireHandle(b, "float64.ddsketch") | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64DDSketchHandleAdd(b *testing.B) { | ||||
| 	benchmarkFloat64MeasureHandleAdd(b, "float64.ddsketch") | ||||
| } | ||||
| @@ -419,10 +391,6 @@ func BenchmarkInt64ArrayAdd(b *testing.B) { | ||||
| 	benchmarkInt64MeasureAdd(b, "int64.array") | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64ArrayAcquireHandle(b *testing.B) { | ||||
| 	benchmarkInt64MeasureAcquireHandle(b, "int64.array") | ||||
| } | ||||
|  | ||||
| func BenchmarkInt64ArrayHandleAdd(b *testing.B) { | ||||
| 	benchmarkInt64MeasureHandleAdd(b, "int64.array") | ||||
| } | ||||
| @@ -431,10 +399,6 @@ func BenchmarkFloat64ArrayAdd(b *testing.B) { | ||||
| 	benchmarkFloat64MeasureAdd(b, "float64.array") | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64ArrayAcquireHandle(b *testing.B) { | ||||
| 	benchmarkFloat64MeasureAcquireHandle(b, "float64.array") | ||||
| } | ||||
|  | ||||
| func BenchmarkFloat64ArrayHandleAdd(b *testing.B) { | ||||
| 	benchmarkFloat64MeasureHandleAdd(b, "float64.array") | ||||
| } | ||||
|   | ||||
| @@ -159,12 +159,19 @@ func (i *instrument) Meter() api.Meter { | ||||
| } | ||||
|  | ||||
| func (i *instrument) acquireHandle(ls *labels) *record { | ||||
| 	// Create lookup key for sync.Map | ||||
| 	// Create lookup key for sync.Map (one allocation) | ||||
| 	mk := mapkey{ | ||||
| 		descriptor: i.descriptor, | ||||
| 		encoded:    ls.encoded, | ||||
| 	} | ||||
|  | ||||
| 	if actual, ok := i.meter.current.Load(mk); ok { | ||||
| 		// Existing record case, only one allocation so far. | ||||
| 		rec := actual.(*record) | ||||
| 		atomic.AddInt64(&rec.refcount, 1) | ||||
| 		return rec | ||||
| 	} | ||||
|  | ||||
| 	// There's a memory allocation here. | ||||
| 	rec := &record{ | ||||
| 		labels:         ls, | ||||
| @@ -174,6 +181,8 @@ func (i *instrument) acquireHandle(ls *labels) *record { | ||||
| 		modifiedEpoch:  0, | ||||
| 	} | ||||
|  | ||||
| 	rec.recorder = i.meter.exporter.AggregatorFor(rec) | ||||
|  | ||||
| 	// Load/Store: there's a memory allocation to place `mk` into | ||||
| 	// an interface here. | ||||
| 	if actual, loaded := i.meter.current.LoadOrStore(mk, rec); loaded { | ||||
| @@ -182,7 +191,6 @@ func (i *instrument) acquireHandle(ls *labels) *record { | ||||
| 		atomic.AddInt64(&rec.refcount, 1) | ||||
| 		return rec | ||||
| 	} | ||||
| 	rec.recorder = i.meter.exporter.AggregatorFor(rec) | ||||
|  | ||||
| 	i.meter.addPrimary(rec) | ||||
| 	return rec | ||||
|   | ||||
		Reference in New Issue
	
	Block a user