mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-26 21:05:00 +02:00
Move global random number generator to randRes
field (#5819)
Instead of using a global random number generator for all `randRes`, have each value use its own. This removes the need for locking and managing concurrent safe access to the global. Also, the field, given the `Reservoir` type is not concurrent safe and the metric pipeline guards this, does not need a `sync.Mutex` to guard it. Supersedes https://github.com/open-telemetry/opentelemetry-go/pull/5815 Fix #5814 ### Performance Analysis This change has approximately equivalent performance as the existing code based on existing benchmarks. ```terminal goos: linux goarch: amd64 pkg: go.opentelemetry.io/otel/sdk/metric cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz │ old.txt │ new.txt │ │ sec/op │ sec/op vs base │ Exemplars/Int64Counter/8-8 14.00µ ± 3% 13.44µ ± 4% -3.98% (p=0.001 n=10) │ old.txt │ new.txt │ │ B/op │ B/op vs base │ Exemplars/Int64Counter/8-8 3.791Ki ± 0% 3.791Ki ± 0% ~ (p=1.000 n=10) ¹ ¹ all samples are equal │ old.txt │ new.txt │ │ allocs/op │ allocs/op vs base │ Exemplars/Int64Counter/8-8 84.00 ± 0% 84.00 ± 0% ~ (p=1.000 n=10) ¹ ¹ all samples are equal ```
This commit is contained in:
parent
a7e83aace9
commit
42fd8fe325
@ -14,6 +14,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- `Logger.Enabled` in `go.opentelemetry.io/otel/log` now accepts a newly introduced `EnabledParameters` type instead of `Record`. (#5791)
|
||||
- `FilterProcessor.Enabled` in `go.opentelemetry.io/otel/sdk/log/internal/x` now accepts `EnabledParameters` instead of `Record`. (#5791)
|
||||
|
||||
### Fixed
|
||||
|
||||
- The race condition for multiple `FixedSize` exemplar reservoirs identified in #5814 is resolved. (#5819)
|
||||
|
||||
<!-- Released section -->
|
||||
<!-- Don't change this section unless doing release -->
|
||||
|
||||
|
63
sdk/metric/exemplar_test.go
Normal file
63
sdk/metric/exemplar_test.go
Normal file
@ -0,0 +1,63 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
func TestFixedSizeExemplarConcurrentSafe(t *testing.T) {
|
||||
// Tests https://github.com/open-telemetry/opentelemetry-go/issues/5814
|
||||
|
||||
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", "always_on")
|
||||
|
||||
r := NewManualReader()
|
||||
m := NewMeterProvider(WithReader(r)).Meter("exemplar-concurrency")
|
||||
// Use two instruments to get concurrent access to any shared globals.
|
||||
i0, err := m.Int64Counter("counter.0")
|
||||
require.NoError(t, err)
|
||||
i1, err := m.Int64Counter("counter.1")
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
add := func() {
|
||||
i0.Add(ctx, 1)
|
||||
i1.Add(ctx, 2)
|
||||
}
|
||||
|
||||
goRoutines := max(10, runtime.NumCPU())
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for n := 0; n < goRoutines; n++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
require.NotPanics(t, add)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
const collections = 100
|
||||
var rm metricdata.ResourceMetrics
|
||||
for c := 0; c < collections; c++ {
|
||||
require.NotPanics(t, func() { _ = r.Collect(ctx, &rm) })
|
||||
}
|
||||
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}
|
@ -7,25 +7,50 @@ import (
|
||||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
)
|
||||
|
||||
var (
|
||||
// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
|
||||
// are k or less measurements made, the Reservoir will sample each one. If
|
||||
// there are more than k, the Reservoir will then randomly sample all
|
||||
// additional measurement with a decreasing probability.
|
||||
func FixedSize(k int) Reservoir {
|
||||
return newRandRes(newStorage(k))
|
||||
}
|
||||
|
||||
type randRes struct {
|
||||
*storage
|
||||
|
||||
// count is the number of measurement seen.
|
||||
count int64
|
||||
// next is the next count that will store a measurement at a random index
|
||||
// once the reservoir has been filled.
|
||||
next int64
|
||||
// w is the largest random number in a distribution that is used to compute
|
||||
// the next next.
|
||||
w float64
|
||||
|
||||
// rng is used to make sampling decisions.
|
||||
//
|
||||
// Do not use crypto/rand. There is no reason for the decrease in performance
|
||||
// given this is not a security sensitive decision.
|
||||
rng = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
// Ensure concurrent safe access to rng and its underlying source.
|
||||
rngMu sync.Mutex
|
||||
)
|
||||
rng *rand.Rand
|
||||
}
|
||||
|
||||
// random returns, as a float64, a uniform pseudo-random number in the open
|
||||
// interval (0.0,1.0).
|
||||
func random() float64 {
|
||||
func newRandRes(s *storage) *randRes {
|
||||
r := &randRes{
|
||||
storage: s,
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
r.reset()
|
||||
return r
|
||||
}
|
||||
|
||||
// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
|
||||
// open interval (0.0,1.0).
|
||||
func (r *randRes) randomFloat64() float64 {
|
||||
// TODO: This does not return a uniform number. rng.Float64 returns a
|
||||
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
|
||||
// returns multiples of 2^-53, and not all floating point numbers between 0
|
||||
@ -43,39 +68,13 @@ func random() float64 {
|
||||
//
|
||||
// There are likely many other methods to explore here as well.
|
||||
|
||||
rngMu.Lock()
|
||||
defer rngMu.Unlock()
|
||||
|
||||
f := rng.Float64()
|
||||
f := r.rng.Float64()
|
||||
for f == 0 {
|
||||
f = rng.Float64()
|
||||
f = r.rng.Float64()
|
||||
}
|
||||
return f
|
||||
}
|
||||
|
||||
// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
|
||||
// are k or less measurements made, the Reservoir will sample each one. If
|
||||
// there are more than k, the Reservoir will then randomly sample all
|
||||
// additional measurement with a decreasing probability.
|
||||
func FixedSize(k int) Reservoir {
|
||||
r := &randRes{storage: newStorage(k)}
|
||||
r.reset()
|
||||
return r
|
||||
}
|
||||
|
||||
type randRes struct {
|
||||
*storage
|
||||
|
||||
// count is the number of measurement seen.
|
||||
count int64
|
||||
// next is the next count that will store a measurement at a random index
|
||||
// once the reservoir has been filled.
|
||||
next int64
|
||||
// w is the largest random number in a distribution that is used to compute
|
||||
// the next next.
|
||||
w float64
|
||||
}
|
||||
|
||||
func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
|
||||
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
|
||||
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
|
||||
@ -123,7 +122,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute
|
||||
} else {
|
||||
if r.count == r.next {
|
||||
// Overwrite a random existing measurement with the one offered.
|
||||
idx := int(rng.Int63n(int64(cap(r.store))))
|
||||
idx := int(r.rng.Int63n(int64(cap(r.store))))
|
||||
r.store[idx] = newMeasurement(ctx, t, n, a)
|
||||
r.advance()
|
||||
}
|
||||
@ -147,7 +146,7 @@ func (r *randRes) reset() {
|
||||
// This maps the uniform random number in (0,1) to a geometric distribution
|
||||
// over the same interval. The mean of the distribution is inversely
|
||||
// proportional to the storage capacity.
|
||||
r.w = math.Exp(math.Log(random()) / float64(cap(r.store)))
|
||||
r.w = math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))
|
||||
|
||||
r.advance()
|
||||
}
|
||||
@ -167,7 +166,7 @@ func (r *randRes) advance() {
|
||||
// therefore the next r.w will be based on the same distribution (i.e.
|
||||
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
|
||||
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
|
||||
r.w *= math.Exp(math.Log(random()) / float64(cap(r.store)))
|
||||
r.w *= math.Exp(math.Log(r.randomFloat64()) / float64(cap(r.store)))
|
||||
// Use the new random number in the series to calculate the count of the
|
||||
// next measurement that will be stored.
|
||||
//
|
||||
@ -178,7 +177,7 @@ func (r *randRes) advance() {
|
||||
//
|
||||
// Important to note, the new r.next will always be at least 1 more than
|
||||
// the last r.next.
|
||||
r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1
|
||||
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
|
||||
}
|
||||
|
||||
func (r *randRes) Collect(dest *[]Exemplar) {
|
||||
|
@ -6,9 +6,10 @@ package exemplar
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -27,10 +28,12 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
|
||||
intensity := 0.1
|
||||
sampleSize := 1000
|
||||
|
||||
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
data := make([]float64, sampleSize*1000)
|
||||
for i := range data {
|
||||
// Generate exponentially distributed data.
|
||||
data[i] = (-1.0 / intensity) * math.Log(random())
|
||||
data[i] = (-1.0 / intensity) * math.Log(rng.Float64())
|
||||
}
|
||||
// Sort to test position bias.
|
||||
slices.Sort(data)
|
||||
@ -50,18 +53,3 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
|
||||
// ensuring no bias in our random sampling algorithm.
|
||||
assert.InDelta(t, 1/mean, intensity, 0.02) // Within 5σ.
|
||||
}
|
||||
|
||||
func TestRandomConcurrentSafe(t *testing.T) {
|
||||
const goRoutines = 10
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for n := 0; n < goRoutines; n++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_ = random()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user