mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-01-03 22:52:30 +02:00
Metric histogram aggregator: Swap in SynchronizedMove to avoid allocations (#1435)
* Move emptyState() allocations outside lock * Add more testing * Re-comment; add CHANGELOG * Add CHANGELOG PR number * Update CHANGELOG.md Co-authored-by: Sam Xie <xsambundy@gmail.com> Co-authored-by: Bogdan Drutu <lazy@splunk.com> Co-authored-by: Sam Xie <xsambundy@gmail.com> Co-authored-by: Anthony Mirabella <a9@aneurysm9.com>
This commit is contained in:
parent
c29c6fd1ad
commit
207587b6ab
@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369)
|
||||
- Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369)
|
||||
- Unify endpoint API that related to OTel exporter. (#1401)
|
||||
- Optimize metric histogram aggregator to re-use its slice of buckets. (#1435)
|
||||
- Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430)
|
||||
- `SamplingResult` now passed a `Tracestate` from the parent `SpanContext` (#1432)
|
||||
- Moved gRPC driver for OTLP exporter to `exporters/otlp/otlpgrpc`. (#1420)
|
||||
|
@ -38,7 +38,7 @@ type (
|
||||
lock sync.Mutex
|
||||
boundaries []float64
|
||||
kind number.Kind
|
||||
state state
|
||||
state *state
|
||||
}
|
||||
|
||||
// state represents the state of a histogram, consisting of
|
||||
@ -78,8 +78,8 @@ func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator {
|
||||
aggs[i] = Aggregator{
|
||||
kind: desc.NumberKind(),
|
||||
boundaries: sortedBoundaries,
|
||||
state: emptyState(sortedBoundaries),
|
||||
}
|
||||
aggs[i].state = aggs[i].newState()
|
||||
}
|
||||
return aggs
|
||||
}
|
||||
@ -123,22 +123,44 @@ func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descrip
|
||||
return aggregator.NewInconsistentAggregatorError(c, oa)
|
||||
}
|
||||
|
||||
if o != nil {
|
||||
// Swap case: This is the ordinary case for a
|
||||
// synchronous instrument, where the SDK allocates two
|
||||
// Aggregators and lock contention is anticipated.
|
||||
// Reset the target state before swapping it under the
|
||||
// lock below.
|
||||
o.clearState()
|
||||
}
|
||||
|
||||
c.lock.Lock()
|
||||
if o != nil {
|
||||
o.state = c.state
|
||||
c.state, o.state = o.state, c.state
|
||||
} else {
|
||||
// No swap case: This is the ordinary case for an
|
||||
// asynchronous instrument, where the SDK allocates a
|
||||
// single Aggregator and there is no anticipated lock
|
||||
// contention.
|
||||
c.clearState()
|
||||
}
|
||||
c.state = emptyState(c.boundaries)
|
||||
c.lock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func emptyState(boundaries []float64) state {
|
||||
return state{
|
||||
bucketCounts: make([]uint64, len(boundaries)+1),
|
||||
func (c *Aggregator) newState() *state {
|
||||
return &state{
|
||||
bucketCounts: make([]uint64, len(c.boundaries)+1),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Aggregator) clearState() {
|
||||
for i := range c.state.bucketCounts {
|
||||
c.state.bucketCounts[i] = 0
|
||||
}
|
||||
c.state.sum = 0
|
||||
c.state.count = 0
|
||||
}
|
||||
|
||||
// Update adds the recorded measurement to the current data set.
|
||||
func (c *Aggregator) Update(_ context.Context, number number.Number, desc *metric.Descriptor) error {
|
||||
kind := desc.NumberKind()
|
||||
|
@ -115,42 +115,23 @@ func testHistogram(t *testing.T, profile aggregatortest.Profile, policy policy)
|
||||
|
||||
agg, ckpt := new2(descriptor)
|
||||
|
||||
all := aggregatortest.NewNumbers(profile.NumberKind)
|
||||
// This needs to repeat at least 3 times to uncover a failure to reset
|
||||
// for the overall sum and count fields, since the third time through
|
||||
// is the first time a `histogram.state` object is reused.
|
||||
for repeat := 0; repeat < 3; repeat++ {
|
||||
all := aggregatortest.NewNumbers(profile.NumberKind)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(policy.sign())
|
||||
all.Append(x)
|
||||
aggregatortest.CheckedUpdate(t, agg, x, descriptor)
|
||||
}
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(policy.sign())
|
||||
all.Append(x)
|
||||
aggregatortest.CheckedUpdate(t, agg, x, descriptor)
|
||||
}
|
||||
|
||||
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||
|
||||
checkZero(t, agg, descriptor)
|
||||
checkZero(t, agg, descriptor)
|
||||
|
||||
all.Sort()
|
||||
|
||||
asum, err := ckpt.Sum()
|
||||
sum := all.Sum()
|
||||
require.InEpsilon(t,
|
||||
sum.CoerceToFloat64(profile.NumberKind),
|
||||
asum.CoerceToFloat64(profile.NumberKind),
|
||||
0.000000001,
|
||||
"Same sum - "+policy.name)
|
||||
require.NoError(t, err)
|
||||
|
||||
count, err := ckpt.Count()
|
||||
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
|
||||
require.NoError(t, err)
|
||||
|
||||
buckets, err := ckpt.Histogram()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
|
||||
counts := calcBuckets(all.Points(), profile)
|
||||
for i, v := range counts {
|
||||
bCount := uint64(buckets.Counts[i])
|
||||
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
|
||||
checkHistogram(t, all, profile, ckpt)
|
||||
}
|
||||
}
|
||||
|
||||
@ -191,31 +172,7 @@ func TestHistogramMerge(t *testing.T) {
|
||||
|
||||
aggregatortest.CheckedMerge(t, ckpt1, ckpt2, descriptor)
|
||||
|
||||
all.Sort()
|
||||
|
||||
asum, err := ckpt1.Sum()
|
||||
sum := all.Sum()
|
||||
require.InEpsilon(t,
|
||||
sum.CoerceToFloat64(profile.NumberKind),
|
||||
asum.CoerceToFloat64(profile.NumberKind),
|
||||
0.000000001,
|
||||
"Same sum - absolute")
|
||||
require.NoError(t, err)
|
||||
|
||||
count, err := ckpt1.Count()
|
||||
require.Equal(t, all.Count(), count, "Same count - absolute")
|
||||
require.NoError(t, err)
|
||||
|
||||
buckets, err := ckpt1.Histogram()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
|
||||
counts := calcBuckets(all.Points(), profile)
|
||||
for i, v := range counts {
|
||||
bCount := uint64(buckets.Counts[i])
|
||||
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
|
||||
}
|
||||
checkHistogram(t, all, profile, ckpt1)
|
||||
})
|
||||
}
|
||||
|
||||
@ -233,22 +190,49 @@ func TestHistogramNotSet(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func calcBuckets(points []number.Number, profile aggregatortest.Profile) []uint64 {
|
||||
sortedBoundaries := make([]float64, len(boundaries))
|
||||
// checkHistogram ensures the correct aggregated state between `all`
|
||||
// (test aggregator) and `agg` (code under test).
|
||||
func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregatortest.Profile, agg *histogram.Aggregator) {
|
||||
|
||||
all.Sort()
|
||||
|
||||
asum, err := agg.Sum()
|
||||
require.NoError(t, err)
|
||||
|
||||
sum := all.Sum()
|
||||
require.InEpsilon(t,
|
||||
sum.CoerceToFloat64(profile.NumberKind),
|
||||
asum.CoerceToFloat64(profile.NumberKind),
|
||||
0.000000001)
|
||||
|
||||
count, err := agg.Count()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, all.Count(), count)
|
||||
|
||||
buckets, err := agg.Histogram()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(buckets.Counts), len(boundaries)+1,
|
||||
"There should be b + 1 counts, where b is the number of boundaries")
|
||||
|
||||
sortedBoundaries := make([]float64, len(boundaries))
|
||||
copy(sortedBoundaries, boundaries)
|
||||
sort.Float64s(sortedBoundaries)
|
||||
|
||||
require.EqualValues(t, sortedBoundaries, buckets.Boundaries)
|
||||
|
||||
counts := make([]uint64, len(sortedBoundaries)+1)
|
||||
idx := 0
|
||||
for _, p := range points {
|
||||
for _, p := range all.Points() {
|
||||
for idx < len(sortedBoundaries) && p.CoerceToFloat64(profile.NumberKind) >= sortedBoundaries[idx] {
|
||||
idx++
|
||||
}
|
||||
counts[idx]++
|
||||
}
|
||||
|
||||
return counts
|
||||
for i, v := range counts {
|
||||
bCount := uint64(buckets.Counts[i])
|
||||
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSynchronizedMoveReset(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user