You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-10-08 23:21:56 +02:00
Add exemplar reservoir parallel benchmarks (#7441)
This also fixes a bug introduced in https://github.com/open-telemetry/opentelemetry-go/pull/7423, where we were only locking around storage and not around other shared fields (e.g. count). Fixing the bug is required for benchmarks to run properly, but wasn't caught by concurrent safe tests because the SDK does not currently call exemplar methods concurrently. ``` goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/metric/exemplar cpu: Intel(R) Xeon(R) CPU @ 2.20GHz BenchmarkFixedSizeReservoirOffer-24 498955 248.4 ns/op 0 B/op 0 allocs/op BenchmarkHistogramReservoirOffer-24 478068 250.1 ns/op 0 B/op 0 allocs/op ```
This commit is contained in:
50
sdk/metric/exemplar/benchmark_test.go
Normal file
50
sdk/metric/exemplar/benchmark_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func BenchmarkFixedSizeReservoirOffer(b *testing.B) {
|
||||
ts := time.Now()
|
||||
val := NewValue[int64](25)
|
||||
ctx := b.Context()
|
||||
reservoir := NewFixedSizeReservoir(runtime.NumCPU())
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
i := 0
|
||||
for pb.Next() {
|
||||
reservoir.Offer(ctx, ts, val, nil)
|
||||
// Periodically trigger a reset, because the algorithm for fixed-size
|
||||
// reservoirs records exemplars very infrequently after a large
|
||||
// number of collect calls.
|
||||
if i%100 == 99 {
|
||||
reservoir.mu.Lock()
|
||||
reservoir.reset()
|
||||
reservoir.mu.Unlock()
|
||||
}
|
||||
i++
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkHistogramReservoirOffer(b *testing.B) {
|
||||
ts := time.Now()
|
||||
ctx := b.Context()
|
||||
buckets := []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}
|
||||
values := make([]Value, len(buckets))
|
||||
for i, bucket := range buckets {
|
||||
values[i] = NewValue[float64](bucket + 1)
|
||||
}
|
||||
res := NewHistogramReservoir(buckets)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
i := 0
|
||||
for pb.Next() {
|
||||
res.Offer(ctx, ts, values[i%len(values)], nil)
|
||||
i++
|
||||
}
|
||||
})
|
||||
}
|
@@ -125,6 +125,8 @@ func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a
|
||||
// https://github.com/MrAlias/reservoir-sampling for a performance
|
||||
// comparison of reservoir sampling algorithms.
|
||||
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
if int(r.count) < cap(r.measurements) {
|
||||
r.store(int(r.count), newMeasurement(ctx, t, n, a))
|
||||
} else if r.count == r.next {
|
||||
|
@@ -68,5 +68,11 @@ func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a
|
||||
default:
|
||||
panic("unknown value type")
|
||||
}
|
||||
r.store(sort.SearchFloat64s(r.bounds, n), newMeasurement(ctx, t, v, a))
|
||||
|
||||
idx := sort.SearchFloat64s(r.bounds, n)
|
||||
m := newMeasurement(ctx, t, v, a)
|
||||
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.store(idx, m)
|
||||
}
|
||||
|
@@ -27,8 +27,6 @@ func newStorage(n int) *storage {
|
||||
}
|
||||
|
||||
func (r *storage) store(idx int, m measurement) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.measurements[idx] = m
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user