1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

Use sync.Map and atomics for fixed bucket histograms (#7474)

Implement a lockless histogram using atomics, and use a sync.Map for
attribute access. This improves performance by ~2x.

The design is very similar to
https://github.com/open-telemetry/opentelemetry-go/pull/7427, but with
one additional change to make the histogram data point itself atomic:

* For cumulative histograms, which do not use a hot/cold limitedSyncMap,
we use a hot/cold data point. This way, we maintain the keys in the sync
map, but still ensure that collection gets a consistent view of
measure() calls.

Parallel benchmarks:
```
                                                                       │  main.txt   │              hist.txt              │
                                                                       │   sec/op    │   sec/op     vs base               │
SyncMeasure/NoView/ExemplarsDisabled/Int64Histogram/Attributes/10-24     274.5n ± 2%   125.2n ± 5%  -54.42% (p=0.002 n=6)
SyncMeasure/NoView/ExemplarsDisabled/Float64Histogram/Attributes/10-24   274.1n ± 2%   132.5n ± 2%  -51.65% (p=0.002 n=6)
geomean                                                                  274.3n        128.8n       -53.05%
```

zero memory allocations before and after this change for Measure().
Omitted for brevity

Benchmarks for collect:
```
                                                    │   main.txt    │               hist.txt               │
                                                    │    sec/op     │    sec/op     vs base                │
Collect/NoView/Int64Histogram/1/Attributes/0-24       1.799µ ±  60%    1.702µ ± 6%         ~ (p=1.000 n=6)
Collect/NoView/Int64Histogram/1/Attributes/1-24       973.7n ±  28%   1720.0n ± 5%   +76.65% (p=0.002 n=6)
Collect/NoView/Int64Histogram/1/Attributes/10-24      881.0n ±  17%   1710.0n ± 5%   +94.09% (p=0.002 n=6)
Collect/NoView/Int64Histogram/10/Attributes/0-24      996.1n ±  14%   1781.5n ± 4%   +78.85% (p=0.002 n=6)
Collect/NoView/Int64Histogram/10/Attributes/1-24      1.029µ ±  67%    1.733µ ± 3%   +68.42% (p=0.009 n=6)
Collect/NoView/Int64Histogram/10/Attributes/10-24     1.533µ ±  18%    1.708µ ± 4%         ~ (p=0.240 n=6)
Collect/NoView/Float64Histogram/1/Attributes/0-24     1.222µ ± 120%    1.733µ ± 4%         ~ (p=0.065 n=6)
Collect/NoView/Float64Histogram/1/Attributes/1-24     893.3n ±   8%   1733.0n ± 4%   +94.00% (p=0.002 n=6)
Collect/NoView/Float64Histogram/1/Attributes/10-24    860.7n ±   2%   1732.0n ± 5%  +101.23% (p=0.002 n=6)
Collect/NoView/Float64Histogram/10/Attributes/0-24    852.5n ±   4%   1758.0n ± 3%  +106.22% (p=0.002 n=6)
Collect/NoView/Float64Histogram/10/Attributes/1-24    853.8n ±   3%   1725.0n ± 3%  +102.04% (p=0.002 n=6)
Collect/NoView/Float64Histogram/10/Attributes/10-24   843.4n ±   2%   1755.0n ± 4%  +108.10% (p=0.002 n=6)
geomean                                               1.028µ           1.732µ        +68.46%

                                                    │  main.txt  │               hist.txt               │
                                                    │    B/op    │     B/op      vs base                │
Collect/NoView/Int64Histogram/1/Attributes/0-24       336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Int64Histogram/1/Attributes/1-24       336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Int64Histogram/1/Attributes/10-24      336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Int64Histogram/10/Attributes/0-24      336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Int64Histogram/10/Attributes/1-24      336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Int64Histogram/10/Attributes/10-24     336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Float64Histogram/1/Attributes/0-24     336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Float64Histogram/1/Attributes/1-24     336.0 ± 0%    2130.5 ± 0%  +534.08% (p=0.002 n=6)
Collect/NoView/Float64Histogram/1/Attributes/10-24    336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Float64Histogram/10/Attributes/0-24    336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Float64Histogram/10/Attributes/1-24    336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
Collect/NoView/Float64Histogram/10/Attributes/10-24   336.0 ± 0%    2131.0 ± 0%  +534.23% (p=0.002 n=6)
geomean                                               336.0        2.081Ki       +534.21%

                                                    │  main.txt  │             hist.txt              │
                                                    │ allocs/op  │ allocs/op   vs base               │
Collect/NoView/Int64Histogram/1/Attributes/0-24       5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Int64Histogram/1/Attributes/1-24       5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Int64Histogram/1/Attributes/10-24      5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Int64Histogram/10/Attributes/0-24      5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Int64Histogram/10/Attributes/1-24      5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Int64Histogram/10/Attributes/10-24     5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Float64Histogram/1/Attributes/0-24     5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Float64Histogram/1/Attributes/1-24     5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Float64Histogram/1/Attributes/10-24    5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Float64Histogram/10/Attributes/0-24    5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Float64Histogram/10/Attributes/1-24    5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
Collect/NoView/Float64Histogram/10/Attributes/10-24   5.000 ± 0%   6.000 ± 0%  +20.00% (p=0.002 n=6)
geomean                                               5.000        6.000       +20.00%
```

Collect does get substantially worse, but Measure is expected to be
called significantly more often than collect.

---------

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
David Ashpole
2025-12-11 11:56:03 -05:00
committed by GitHub
parent b4578c886a
commit f57bf14de2
7 changed files with 561 additions and 202 deletions
+1
View File
@@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- `Exporter` in `go.opentelemetry.io/otel/exporter/prometheus` ignores metrics with the scope `go.opentelemetry.io/contrib/bridges/prometheus`.
This prevents scrape failures when the Prometheus exporter is misconfigured to get data from the Prometheus bridge. (#7688)
- Improve performance of concurrent histogram measurements in `go.opentelemetry.io/otel/sdk/metric`. (#7474)
<!-- Released section -->
<!-- Don't change this section unless doing release -->
+4 -3
View File
@@ -126,12 +126,13 @@ func (b Builder[N]) ExplicitBucketHistogram(
boundaries []float64,
noMinMax, noSum bool,
) (Measure[N], ComputeAggregation) {
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
switch b.Temporality {
case metricdata.DeltaTemporality:
return b.filter(h.measure), h.delta
h := newDeltaHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
default:
return b.filter(h.measure), h.cumulative
h := newCumulativeHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
return b.filter(h.measure), h.collect
}
}
+91
View File
@@ -51,6 +51,97 @@ func (n *atomicCounter[N]) add(value N) {
}
}
// reset resets the internal state, and is not safe to call concurrently.
func (n *atomicCounter[N]) reset() {
n.nFloatBits.Store(0)
n.nInt.Store(0)
}
// atomicN is a generic atomic number value.
type atomicN[N int64 | float64] struct {
val atomic.Uint64
}
func (a *atomicN[N]) Load() (value N) {
v := a.val.Load()
switch any(value).(type) {
case int64:
value = N(v)
case float64:
value = N(math.Float64frombits(v))
default:
panic("unsupported type")
}
return value
}
func (a *atomicN[N]) Store(v N) {
var val uint64
switch any(v).(type) {
case int64:
val = uint64(v)
case float64:
val = math.Float64bits(float64(v))
default:
panic("unsupported type")
}
a.val.Store(val)
}
func (a *atomicN[N]) CompareAndSwap(oldN, newN N) bool {
var o, n uint64
switch any(oldN).(type) {
case int64:
o, n = uint64(oldN), uint64(newN)
case float64:
o, n = math.Float64bits(float64(oldN)), math.Float64bits(float64(newN))
default:
panic("unsupported type")
}
return a.val.CompareAndSwap(o, n)
}
type atomicMinMax[N int64 | float64] struct {
minimum, maximum atomicN[N]
set atomic.Bool
mu sync.Mutex
}
// init returns true if the value was used to initialize min and max.
func (s *atomicMinMax[N]) init(val N) bool {
s.mu.Lock()
defer s.mu.Unlock()
if !s.set.Load() {
defer s.set.Store(true)
s.minimum.Store(val)
s.maximum.Store(val)
return true
}
return false
}
func (s *atomicMinMax[N]) Update(val N) {
if !s.set.Load() && s.init(val) {
return
}
old := s.minimum.Load()
for val < old {
if s.minimum.CompareAndSwap(old, val) {
return
}
old = s.minimum.Load()
}
old = s.maximum.Load()
for old < val {
if s.maximum.CompareAndSwap(old, val) {
return
}
old = s.maximum.Load()
}
}
// hotColdWaitGroup is a synchronization primitive which enables lockless
// writes for concurrent writers and enables a reader to acquire exclusive
// access to a snapshot of state including only completed operations.
@@ -52,6 +52,33 @@ func TestAtomicSumAddIntConcurrentSafe(t *testing.T) {
assert.Equal(t, int64(15), aSum.load())
}
func BenchmarkAtomicCounter(b *testing.B) {
b.Run("Int64", benchmarkAtomicCounter[int64])
b.Run("Float64", benchmarkAtomicCounter[float64])
}
func benchmarkAtomicCounter[N int64 | float64](b *testing.B) {
b.Run("add", func(b *testing.B) {
var a atomicCounter[N]
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.add(2)
}
})
})
b.Run("load", func(b *testing.B) {
var a atomicCounter[N]
a.add(2)
var v N
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
v = a.load()
}
})
assert.Equal(b, N(2), v)
})
}
func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
var wg sync.WaitGroup
hcwg := &hotColdWaitGroup{}
@@ -76,3 +103,150 @@ func TestHotColdWaitGroupConcurrentSafe(t *testing.T) {
}
wg.Wait()
}
func TestAtomicN(t *testing.T) {
t.Run("Int64", testAtomicN[int64])
t.Run("Float64", testAtomicN[float64])
}
func testAtomicN[N int64 | float64](t *testing.T) {
var v atomicN[N]
assert.Equal(t, N(0), v.Load())
assert.True(t, v.CompareAndSwap(0, 6))
assert.Equal(t, N(6), v.Load())
assert.False(t, v.CompareAndSwap(0, 6))
v.Store(22)
assert.Equal(t, N(22), v.Load())
}
func TestAtomicNConcurrentSafe(t *testing.T) {
t.Run("Int64", testAtomicNConcurrentSafe[int64])
t.Run("Float64", testAtomicNConcurrentSafe[float64])
}
func testAtomicNConcurrentSafe[N int64 | float64](t *testing.T) {
var wg sync.WaitGroup
var v atomicN[N]
for range 2 {
wg.Add(1)
go func() {
defer wg.Done()
got := v.Load()
assert.Equal(t, int64(0), int64(got)%6)
}()
wg.Add(1)
go func() {
defer wg.Done()
v.Store(12)
}()
wg.Add(1)
go func() {
defer wg.Done()
v.CompareAndSwap(0, 6)
}()
}
wg.Wait()
}
func BenchmarkAtomicN(b *testing.B) {
b.Run("Int64", benchmarkAtomicN[int64])
b.Run("Float64", benchmarkAtomicN[float64])
}
func benchmarkAtomicN[N int64 | float64](b *testing.B) {
b.Run("Load", func(b *testing.B) {
var a atomicN[N]
a.Store(2)
var v N
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
v = a.Load()
}
})
assert.Equal(b, N(2), v)
})
b.Run("Store", func(b *testing.B) {
var a atomicN[N]
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Store(3)
}
})
})
b.Run("CompareAndSwap", func(b *testing.B) {
var a atomicN[N]
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
// Make sure we swap back and forth, in-case that matters.
if i%2 == 0 {
a.CompareAndSwap(0, 1)
} else {
a.CompareAndSwap(1, 0)
}
i++
}
})
})
}
func TestAtomicMinMaxConcurrentSafe(t *testing.T) {
t.Run("Int64", testAtomicMinMaxConcurrentSafe[int64])
t.Run("Float64", testAtomicMinMaxConcurrentSafe[float64])
}
func testAtomicMinMaxConcurrentSafe[N int64 | float64](t *testing.T) {
var wg sync.WaitGroup
var minMax atomicMinMax[N]
assert.False(t, minMax.set.Load())
for _, i := range []float64{2, 4, 6, 8, -3, 0, 8, 0} {
wg.Add(1)
go func() {
defer wg.Done()
minMax.Update(N(i))
}()
}
wg.Wait()
assert.True(t, minMax.set.Load())
assert.Equal(t, N(-3), minMax.minimum.Load())
assert.Equal(t, N(8), minMax.maximum.Load())
}
func BenchmarkAtomicMinMax(b *testing.B) {
b.Run("Int64", benchmarkAtomicMinMax[int64])
b.Run("Float64", benchmarkAtomicMinMax[float64])
}
func benchmarkAtomicMinMax[N int64 | float64](b *testing.B) {
b.Run("UpdateIncreasing", func(b *testing.B) {
var a atomicMinMax[N]
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
a.Update(N(i))
i++
}
})
})
b.Run("UpdateDecreasing", func(b *testing.B) {
var a atomicMinMax[N]
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
a.Update(N(i))
i--
}
})
})
b.Run("UpdateConstant", func(b *testing.B) {
var a atomicMinMax[N]
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
a.Update(N(5))
}
})
})
}
+261 -134
View File
@@ -7,151 +7,169 @@ import (
"context"
"slices"
"sort"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type buckets[N int64 | float64] struct {
// histogramPoint is a single histogram point, used in delta aggregations.
type histogramPoint[N int64 | float64] struct {
attrs attribute.Set
res FilteredExemplarReservoir[N]
counts []uint64
count uint64
total N
min, max N
histogramPointCounters[N]
}
// newBuckets returns buckets with n bins.
func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] {
return &buckets[N]{attrs: attrs, counts: make([]uint64, n)}
// hotColdHistogramPoint a hot and cold histogram points, used in cumulative
// aggregations.
type hotColdHistogramPoint[N int64 | float64] struct {
hcwg hotColdWaitGroup
hotColdPoint [2]histogramPointCounters[N]
attrs attribute.Set
res FilteredExemplarReservoir[N]
}
func (b *buckets[N]) sum(value N) { b.total += value }
func (b *buckets[N]) bin(idx int) {
b.counts[idx]++
b.count++
// histogramPointCounters contains only the atomic counter data, and is used by
// both histogramPoint and hotColdHistogramPoint.
type histogramPointCounters[N int64 | float64] struct {
counts []atomic.Uint64
total atomicCounter[N]
minMax atomicMinMax[N]
}
func (b *buckets[N]) minMax(value N) {
if value < b.min {
b.min = value
} else if value > b.max {
b.max = value
func (b *histogramPointCounters[N]) loadCountsInto(into *[]uint64) uint64 {
// TODO (#3047): Making copies for counts incurs a large
// memory allocation footprint. Alternatives should be explored.
counts := reset(*into, len(b.counts), len(b.counts))
count := uint64(0)
for i := range b.counts {
c := b.counts[i].Load()
counts[i] = c
count += c
}
*into = counts
return count
}
// mergeIntoAndReset merges this set of histogram counter data into another,
// and resets the state of this set of counters. This is used by
// hotColdHistogramPoint to ensure that the cumulative counters continue to
// accumulate after being read.
func (b *histogramPointCounters[N]) mergeIntoAndReset( // nolint:revive // Intentional internal control flag
into *histogramPointCounters[N],
noMinMax, noSum bool,
) {
for i := range b.counts {
into.counts[i].Add(b.counts[i].Load())
b.counts[i].Store(0)
}
if !noMinMax {
// Do not reset min or max because cumulative min and max only ever grow
// smaller or larger respectively.
if b.minMax.set.Load() {
into.minMax.Update(b.minMax.minimum.Load())
into.minMax.Update(b.minMax.maximum.Load())
}
}
if !noSum {
into.total.add(b.total.load())
b.total.reset()
}
}
// histValues summarizes a set of measurements as an histValues with
// explicitly defined buckets.
type histValues[N int64 | float64] struct {
// deltaHistogram is a histogram whose internal storage is reset when it is
// collected.
//
// deltaHistogram's measure is implemented without locking, even when called
// concurrently with collect. This is done by maintaining two separate maps:
// one "hot" which is concurrently updated by measure(), and one "cold", which
// is read and reset by collect(). The [hotcoldWaitGroup] allows collect() to
// swap the hot and cold maps, and wait for updates to the cold map to complete
// prior to reading. deltaHistogram swaps ald clears complete maps so that
// unused attribute sets do not report in subsequent collect() calls.
type deltaHistogram[N int64 | float64] struct {
hcwg hotColdWaitGroup
hotColdValMap [2]limitedSyncMap
start time.Time
noMinMax bool
noSum bool
bounds []float64
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}
func newHistValues[N int64 | float64](
bounds []float64,
noMinMax, noSum bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
// complete control over the fix.
b := slices.Clone(bounds)
slices.Sort(b)
return &histValues[N]{
noMinMax: noMinMax,
noSum: noSum,
bounds: b,
newRes: r,
limit: newLimiter[buckets[N]](limit),
values: make(map[attribute.Distinct]*buckets[N]),
}
}
// Aggregate records the measurement value, scoped by attr, and aggregates it
// into a histogram.
func (s *histValues[N]) measure(
func (s *deltaHistogram[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
droppedAttr []attribute.KeyValue,
) {
// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the buckets in that the length of buckets
// is len(s.bounds)+1, with the last bucket representing:
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, float64(value))
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
b, ok := s.values[fltrAttr.Equivalent()]
if !ok {
fltrAttr = s.limit.Attributes(fltrAttr, s.values)
// If we overflowed, make sure we add to the existing overflow series
// if it already exists.
b, ok = s.values[fltrAttr.Equivalent()]
if !ok {
hotIdx := s.hcwg.start()
defer s.hcwg.done(hotIdx)
h := s.hotColdValMap[hotIdx].LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any {
hPt := &histogramPoint[N]{
res: s.newRes(attr),
attrs: attr,
// N+1 buckets. For example:
//
// bounds = [0, 5, 10]
//
// Then,
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets[N](fltrAttr, len(s.bounds)+1)
b.res = s.newRes(fltrAttr)
// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = value, value
s.values[fltrAttr.Equivalent()] = b
// counts = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
histogramPointCounters: histogramPointCounters[N]{counts: make([]atomic.Uint64, len(s.bounds)+1)},
}
}
b.bin(idx)
return hPt
}).(*histogramPoint[N])
// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint
// is len(s.bounds)+1, with the last bucket representing:
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, float64(value))
h.counts[idx].Add(1)
if !s.noMinMax {
b.minMax(value)
h.minMax.Update(value)
}
if !s.noSum {
b.sum(value)
h.total.add(value)
}
b.res.Offer(ctx, value, droppedAttr)
h.res.Offer(ctx, value, droppedAttr)
}
// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](
// newDeltaHistogram returns a histogram that is reset each time it is
// collected.
func newDeltaHistogram[N int64 | float64](
boundaries []float64,
noMinMax, noSum bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noMinMax, noSum, limit, r),
start: now(),
) *deltaHistogram[N] {
// The responsibility of keeping all histogramPoint correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
// complete control over the fix.
b := slices.Clone(boundaries)
slices.Sort(b)
return &deltaHistogram[N]{
start: now(),
noMinMax: noMinMax,
noSum: noSum,
bounds: b,
newRes: r,
hotColdValMap: [2]limitedSyncMap{
{aggLimit: limit},
{aggLimit: limit},
},
}
}
// histogram summarizes a set of measurements as an histogram with explicitly
// defined buckets.
type histogram[N int64 | float64] struct {
*histValues[N]
start time.Time
}
func (s *histogram[N]) delta(
func (s *deltaHistogram[N]) collect(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
@@ -161,39 +179,46 @@ func (s *histogram[N]) delta(
h, _ := (*dest).(metricdata.Histogram[N])
h.Temporality = metricdata.DeltaTemporality
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
// delta always clears values on collection
readIdx := s.hcwg.swapHotAndWait()
// Do not allow modification of our copy of bounds.
bounds := slices.Clone(s.bounds)
n := len(s.values)
// The len will not change while we iterate over values, since we waited
// for all writes to finish to the cold values and len.
n := s.hotColdValMap[readIdx].Len()
hDPts := reset(h.DataPoints, n, n)
var i int
for _, val := range s.values {
s.hotColdValMap[readIdx].Range(func(_, value any) bool {
val := value.(*histogramPoint[N])
count := val.loadCountsInto(&hDPts[i].BucketCounts)
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Count = count
hDPts[i].Bounds = bounds
hDPts[i].BucketCounts = val.counts
if !s.noSum {
hDPts[i].Sum = val.total
hDPts[i].Sum = val.total.load()
}
if !s.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
if val.minMax.set.Load() {
hDPts[i].Min = metricdata.NewExtrema(val.minMax.minimum.Load())
hDPts[i].Max = metricdata.NewExtrema(val.minMax.maximum.Load())
}
}
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
i++
}
return true
})
// Unused attribute sets do not report.
clear(s.values)
s.hotColdValMap[readIdx].Clear()
// The delta collection cycle resets.
s.start = t
@@ -203,7 +228,101 @@ func (s *histogram[N]) delta(
return n
}
func (s *histogram[N]) cumulative(
// cumulativeHistogram summarizes a set of measurements as an histogram with explicitly
// defined histogramPoint.
//
// cumulativeHistogram's measure is implemented without locking, even when
// called concurrently with collect. This is done by maintaining two separate
// histogramPointCounters for each attribute set: one "hot" which is
// concurrently updated by measure(), and one "cold", which is read and reset
// by collect(). The [hotcoldWaitGroup] allows collect() to swap the hot and
// cold counters, and wait for updates to the cold counters to complete prior
// to reading. Unlike deltaHistogram, this maintains a single map so that the
// preserved attribute sets do not change when collect() is called.
type cumulativeHistogram[N int64 | float64] struct {
values limitedSyncMap
start time.Time
noMinMax bool
noSum bool
bounds []float64
newRes func(attribute.Set) FilteredExemplarReservoir[N]
}
// newCumulativeHistogram returns a histogram that accumulates measurements
// into a histogram data structure. It is never reset.
func newCumulativeHistogram[N int64 | float64](
boundaries []float64,
noMinMax, noSum bool,
limit int,
r func(attribute.Set) FilteredExemplarReservoir[N],
) *cumulativeHistogram[N] {
// The responsibility of keeping all histogramPoint correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
// complete control over the fix.
b := slices.Clone(boundaries)
slices.Sort(b)
return &cumulativeHistogram[N]{
start: now(),
noMinMax: noMinMax,
noSum: noSum,
bounds: b,
newRes: r,
values: limitedSyncMap{aggLimit: limit},
}
}
func (s *cumulativeHistogram[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
droppedAttr []attribute.KeyValue,
) {
h := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any {
hPt := &hotColdHistogramPoint[N]{
res: s.newRes(attr),
attrs: attr,
// N+1 buckets. For example:
//
// bounds = [0, 5, 10]
//
// Then,
//
// count = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
hotColdPoint: [2]histogramPointCounters[N]{
{
counts: make([]atomic.Uint64, len(s.bounds)+1),
},
{
counts: make([]atomic.Uint64, len(s.bounds)+1),
},
},
}
return hPt
}).(*hotColdHistogramPoint[N])
// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the histogramPoint in that the length of histogramPoint
// is len(s.bounds)+1, with the last bucket representing:
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, float64(value))
hotIdx := h.hcwg.start()
defer h.hcwg.done(hotIdx)
h.hotColdPoint[hotIdx].counts[idx].Add(1)
if !s.noMinMax {
h.hotColdPoint[hotIdx].minMax.Update(value)
}
if !s.noSum {
h.hotColdPoint[hotIdx].total.add(value)
}
h.res.Offer(ctx, value, droppedAttr)
}
func (s *cumulativeHistogram[N]) collect(
dest *metricdata.Aggregation, //nolint:gocritic // The pointer is needed for the ComputeAggregation interface
) int {
t := now()
@@ -213,50 +332,58 @@ func (s *histogram[N]) cumulative(
h, _ := (*dest).(metricdata.Histogram[N])
h.Temporality = metricdata.CumulativeTemporality
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
// Do not allow modification of our copy of bounds.
bounds := slices.Clone(s.bounds)
n := len(s.values)
hDPts := reset(h.DataPoints, n, n)
// Values are being concurrently written while we iterate, so only use the
// current length for capacity.
hDPts := reset(h.DataPoints, 0, s.values.Len())
var i int
for _, val := range s.values {
hDPts[i].Attributes = val.attrs
hDPts[i].StartTime = s.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Bounds = bounds
// The HistogramDataPoint field values returned need to be copies of
// the buckets value as we will keep updating them.
//
// TODO (#3047): Making copies for bounds and counts incurs a large
// memory allocation footprint. Alternatives should be explored.
hDPts[i].BucketCounts = slices.Clone(val.counts)
s.values.Range(func(_, value any) bool {
val := value.(*hotColdHistogramPoint[N])
// swap, observe, and clear the point
readIdx := val.hcwg.swapHotAndWait()
var bucketCounts []uint64
count := val.hotColdPoint[readIdx].loadCountsInto(&bucketCounts)
newPt := metricdata.HistogramDataPoint[N]{
Attributes: val.attrs,
StartTime: s.start,
Time: t,
Count: count,
Bounds: bounds,
// The HistogramDataPoint field values returned need to be copies of
// the histogramPoint value as we will keep updating them.
BucketCounts: bucketCounts,
}
if !s.noSum {
hDPts[i].Sum = val.total
newPt.Sum = val.hotColdPoint[readIdx].total.load()
}
if !s.noMinMax {
hDPts[i].Min = metricdata.NewExtrema(val.min)
hDPts[i].Max = metricdata.NewExtrema(val.max)
if val.hotColdPoint[readIdx].minMax.set.Load() {
newPt.Min = metricdata.NewExtrema(val.hotColdPoint[readIdx].minMax.minimum.Load())
newPt.Max = metricdata.NewExtrema(val.hotColdPoint[readIdx].minMax.maximum.Load())
}
}
// Once we've read the point, merge it back into the hot histogram
// point since it is cumulative.
hotIdx := (readIdx + 1) % 2
val.hotColdPoint[readIdx].mergeIntoAndReset(&val.hotColdPoint[hotIdx], s.noMinMax, s.noSum)
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
collectExemplars(&newPt.Exemplars, val.res.Collect)
hDPts = append(hDPts, newPt)
i++
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
}
return true
})
h.DataPoints = hDPts
*dest = h
return n
return i
}
+24 -59
View File
@@ -330,60 +330,12 @@ func hPoint[N int64 | float64](
}
}
func TestBucketsBin(t *testing.T) {
t.Run("Int64", testBucketsBin[int64]())
t.Run("Float64", testBucketsBin[float64]())
}
func testBucketsBin[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
b := newBuckets[N](alice, 3)
assertB := func(counts []uint64, count uint64, mi, ma N) {
t.Helper()
assert.Equal(t, counts, b.counts)
assert.Equal(t, count, b.count)
assert.Equal(t, mi, b.min)
assert.Equal(t, ma, b.max)
}
assertB([]uint64{0, 0, 0}, 0, 0, 0)
b.bin(1)
b.minMax(2)
assertB([]uint64{0, 1, 0}, 1, 0, 2)
b.bin(0)
b.minMax(-1)
assertB([]uint64{1, 1, 0}, 2, -1, 2)
}
}
func TestBucketsSum(t *testing.T) {
t.Run("Int64", testBucketsSum[int64]())
t.Run("Float64", testBucketsSum[float64]())
}
func testBucketsSum[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
b := newBuckets[N](alice, 3)
var want N
assert.Equal(t, want, b.total)
b.sum(2)
want = 2
assert.Equal(t, want, b.total)
b.sum(-1)
want = 1
assert.Equal(t, want, b.total)
}
}
func TestHistogramImmutableBounds(t *testing.T) {
b := []float64{0, 1, 2}
cpB := make([]float64, len(b))
copy(cpB, b)
h := newHistogram[int64](b, false, false, 0, dropExemplars[int64])
h := newCumulativeHistogram[int64](b, false, false, 0, dropExemplars[int64])
require.Equal(t, cpB, h.bounds)
b[0] = 10
@@ -392,29 +344,42 @@ func TestHistogramImmutableBounds(t *testing.T) {
h.measure(t.Context(), 5, alice, nil)
var data metricdata.Aggregation = metricdata.Histogram[int64]{}
h.cumulative(&data)
h.collect(&data)
hdp := data.(metricdata.Histogram[int64]).DataPoints[0]
hdp.Bounds[1] = 10
assert.Equal(t, cpB, h.bounds, "modifying the Aggregation bounds should not change the bounds")
}
func TestCumulativeHistogramImmutableCounts(t *testing.T) {
h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64])
h := newCumulativeHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64])
h.measure(t.Context(), 5, alice, nil)
var data metricdata.Aggregation = metricdata.Histogram[int64]{}
h.cumulative(&data)
h.collect(&data)
hdp := data.(metricdata.Histogram[int64]).DataPoints[0]
require.Equal(t, hdp.BucketCounts, h.values[alice.Equivalent()].counts)
hPt, ok := h.values.Load(alice.Equivalent())
require.True(t, ok)
hcHistPt := hPt.(*hotColdHistogramPoint[int64])
readIdx := hcHistPt.hcwg.swapHotAndWait()
var bucketCounts []uint64
hcHistPt.hotColdPoint[readIdx].loadCountsInto(&bucketCounts)
require.Equal(t, hdp.BucketCounts, bucketCounts)
hotIdx := (readIdx + 1) % 2
hcHistPt.hotColdPoint[readIdx].mergeIntoAndReset(&hcHistPt.hotColdPoint[hotIdx], noMinMax, false)
cpCounts := make([]uint64, len(hdp.BucketCounts))
copy(cpCounts, hdp.BucketCounts)
hdp.BucketCounts[0] = 10
hPt, ok = h.values.Load(alice.Equivalent())
require.True(t, ok)
hcHistPt = hPt.(*hotColdHistogramPoint[int64])
readIdx = hcHistPt.hcwg.swapHotAndWait()
hcHistPt.hotColdPoint[readIdx].loadCountsInto(&bucketCounts)
assert.Equal(
t,
cpCounts,
h.values[alice.Equivalent()].counts,
bucketCounts,
"modifying the Aggregator bucket counts should not change the Aggregator",
)
}
@@ -424,28 +389,28 @@ func TestDeltaHistogramReset(t *testing.T) {
now = func() time.Time { return y2k }
t.Cleanup(func() { now = orig })
h := newHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64])
h := newDeltaHistogram[int64](bounds, noMinMax, false, 0, dropExemplars[int64])
var data metricdata.Aggregation = metricdata.Histogram[int64]{}
require.Equal(t, 0, h.delta(&data))
require.Equal(t, 0, h.collect(&data))
require.Empty(t, data.(metricdata.Histogram[int64]).DataPoints)
h.measure(t.Context(), 1, alice, nil)
expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality}
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1, now(), now())}
h.delta(&data)
h.collect(&data)
metricdatatest.AssertAggregationsEqual(t, expect, data)
// The attr set should be forgotten once Aggregations is called.
expect.DataPoints = nil
assert.Equal(t, 0, h.delta(&data))
assert.Equal(t, 0, h.collect(&data))
assert.Empty(t, data.(metricdata.Histogram[int64]).DataPoints)
// Aggregating another set should not affect the original (alice).
h.measure(t.Context(), 1, bob, nil)
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1, now(), now())}
h.delta(&data)
h.collect(&data)
metricdatatest.AssertAggregationsEqual(t, expect, data)
}
+6 -6
View File
@@ -17,12 +17,12 @@ type sumValue[N int64 | float64] struct {
attrs attribute.Set
}
type valueMap[N int64 | float64] struct {
type sumValueMap[N int64 | float64] struct {
values limitedSyncMap
newRes func(attribute.Set) FilteredExemplarReservoir[N]
}
func (s *valueMap[N]) measure(
func (s *sumValueMap[N]) measure(
ctx context.Context,
value N,
fltrAttr attribute.Set,
@@ -52,7 +52,7 @@ func newDeltaSum[N int64 | float64](
return &deltaSum[N]{
monotonic: monotonic,
start: now(),
hotColdValMap: [2]valueMap[N]{
hotColdValMap: [2]sumValueMap[N]{
{
values: limitedSyncMap{aggLimit: limit},
newRes: r,
@@ -71,7 +71,7 @@ type deltaSum[N int64 | float64] struct {
start time.Time
hcwg hotColdWaitGroup
hotColdValMap [2]valueMap[N]
hotColdValMap [2]sumValueMap[N]
}
func (s *deltaSum[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
@@ -130,7 +130,7 @@ func newCumulativeSum[N int64 | float64](
return &cumulativeSum[N]{
monotonic: monotonic,
start: now(),
valueMap: valueMap[N]{
sumValueMap: sumValueMap[N]{
values: limitedSyncMap{aggLimit: limit},
newRes: r,
},
@@ -142,7 +142,7 @@ type cumulativeSum[N int64 | float64] struct {
monotonic bool
start time.Time
valueMap[N]
sumValueMap[N]
}
func (s *cumulativeSum[N]) collect(