1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2026-06-03 18:35:08 +02:00

Use atomics for exponential histogram buckets (#8057)

Follows https://github.com/open-telemetry/opentelemetry-go/pull/8025

This is the second PR towards a lockless fast-path for the exponential
histogram aggregation. It replaces use of uint64 with atomic.Uint64. It
does not make buckets concurrent-safe. That will come in future PRs.
This is a refactor to make future PRs easier to review since it has a
large diff, but is relatively simple.

The record and measure calls are still guarded by a lock at this point.

You can see the full set of planned changes in
https://github.com/open-telemetry/opentelemetry-go/compare/main...dashpole:opentelemetry-go:lockless_exphist_ai.
The implementation is largely based on
https://github.com/open-telemetry/opentelemetry-go/pull/7535 (which I
implemented by hand), but with help from an AI to break it down into
smaller PRs, and simplify aspects of the design.

Part of https://github.com/open-telemetry/opentelemetry-go/issues/7796
This commit is contained in:
David Ashpole
2026-03-17 10:45:24 -04:00
committed by GitHub
parent d5f403cab5
commit 5576bc22e7
2 changed files with 239 additions and 308 deletions
@@ -181,14 +181,15 @@ func (p *expoHistogramDataPoint[N]) count() uint64 {
// expoBuckets is a set of buckets in an exponential histogram.
type expoBuckets struct {
startBin int32
counts []uint64
counts []atomic.Uint64
}
// record increments the count for the given bin, and expands the buckets if needed.
// Size changes must be done before calling this function.
func (b *expoBuckets) record(bin int32) {
if len(b.counts) == 0 {
b.counts = []uint64{1}
b.counts = make([]atomic.Uint64, 1)
b.counts[0].Store(1)
b.startBin = bin
return
}
@@ -197,7 +198,7 @@ func (b *expoBuckets) record(bin int32) {
// if the new bin is inside the current range
if bin >= b.startBin && int(bin) <= endBin {
b.counts[bin-b.startBin]++
b.counts[bin-b.startBin].Add(1)
return
}
// if the new bin is before the current start add spaces to the counts
@@ -207,16 +208,22 @@ func (b *expoBuckets) record(bin int32) {
shift := b.startBin - bin
if newLength > cap(b.counts) {
b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
b.counts = append(b.counts, make([]atomic.Uint64, newLength-len(b.counts))...)
}
copy(b.counts[shift:origLen+int(shift)], b.counts)
b.counts = b.counts[:newLength]
// Shift existing elements to the right. Go's copy() doesn't work for
// structs like atomic.Uint64.
for i := origLen - 1; i >= 0; i-- {
b.counts[i+int(shift)].Store(b.counts[i].Load())
}
for i := 1; i < int(shift); i++ {
b.counts[i] = 0
b.counts[i].Store(0)
}
b.startBin = bin
b.counts[0] = 1
b.counts[0].Store(1)
return
}
// if the new is after the end add spaces to the end
@@ -224,15 +231,15 @@ func (b *expoBuckets) record(bin int32) {
if int(bin-b.startBin) < cap(b.counts) {
b.counts = b.counts[:bin-b.startBin+1]
for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ {
b.counts[i] = 0
b.counts[i].Store(0)
}
b.counts[bin-b.startBin] = 1
b.counts[bin-b.startBin].Store(1)
return
}
end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1)
end := make([]atomic.Uint64, int(bin-b.startBin)-len(b.counts)+1)
b.counts = append(b.counts, end...)
b.counts[bin-b.startBin] = 1
b.counts[bin-b.startBin].Store(1)
}
}
@@ -259,10 +266,10 @@ func (b *expoBuckets) downscale(delta int32) {
for i := 1; i < len(b.counts); i++ {
idx := i + int(offset)
if idx%int(steps) == 0 {
b.counts[idx/int(steps)] = b.counts[i]
b.counts[idx/int(steps)].Store(b.counts[i].Load())
continue
}
b.counts[idx/int(steps)] += b.counts[i]
b.counts[idx/int(steps)].Add(b.counts[i].Load())
}
lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps)
@@ -272,8 +279,8 @@ func (b *expoBuckets) downscale(delta int32) {
func (b *expoBuckets) count() uint64 {
var total uint64
for _, count := range b.counts {
total += count
for i := range b.counts {
total += b.counts[i].Load()
}
return total
}
@@ -380,7 +387,9 @@ func (e *expoHistogram[N]) delta(
len(val.posBuckets.counts),
len(val.posBuckets.counts),
)
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
for j := range val.posBuckets.counts {
hDPts[i].PositiveBucket.Counts[j] = val.posBuckets.counts[j].Load()
}
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(
@@ -388,7 +397,9 @@ func (e *expoHistogram[N]) delta(
len(val.negBuckets.counts),
len(val.negBuckets.counts),
)
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
for j := range val.negBuckets.counts {
hDPts[i].NegativeBucket.Counts[j] = val.negBuckets.counts[j].Load()
}
if !e.noSum {
hDPts[i].Sum = val.sum.load()
@@ -445,7 +456,9 @@ func (e *expoHistogram[N]) cumulative(
len(val.posBuckets.counts),
len(val.posBuckets.counts),
)
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
for j := range val.posBuckets.counts {
hDPts[i].PositiveBucket.Counts[j] = val.posBuckets.counts[j].Load()
}
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(
@@ -453,7 +466,9 @@ func (e *expoHistogram[N]) cumulative(
len(val.negBuckets.counts),
len(val.negBuckets.counts),
)
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
for j := range val.negBuckets.counts {
hDPts[i].NegativeBucket.Counts[j] = val.negBuckets.counts[j].Load()
}
if !e.noSum {
hDPts[i].Sum = val.sum.load()
@@ -8,6 +8,7 @@ import (
"fmt"
"math"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/assert"
@@ -44,86 +45,64 @@ func testExpoHistogramDataPointRecord[N int64 | float64](t *testing.T) {
testCases := []struct {
maxSize int
values []N
expectedBuckets expoBuckets
expectedStart int32
expectedCounts []uint64
expectedScale int32
}{
{
maxSize: 4,
values: []N{2, 4, 1},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{1, 1, 1},
},
expectedStart: -1,
expectedCounts: []uint64{1, 1, 1},
expectedScale: 0,
},
{
maxSize: 4,
values: []N{4, 4, 4, 2, 16, 1},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{1, 4, 1},
},
expectedStart: -1,
expectedCounts: []uint64{1, 4, 1},
expectedScale: -1,
},
{
maxSize: 2,
values: []N{1, 2, 4},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{1, 2},
},
expectedStart: -1,
expectedCounts: []uint64{1, 2},
expectedScale: -1,
},
{
maxSize: 2,
values: []N{1, 4, 2},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{1, 2},
},
expectedStart: -1,
expectedCounts: []uint64{1, 2},
expectedScale: -1,
},
{
maxSize: 2,
values: []N{2, 4, 1},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{1, 2},
},
expectedStart: -1,
expectedCounts: []uint64{1, 2},
expectedScale: -1,
},
{
maxSize: 2,
values: []N{2, 1, 4},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{1, 2},
},
expectedStart: -1,
expectedCounts: []uint64{1, 2},
expectedScale: -1,
},
{
maxSize: 2,
values: []N{4, 1, 2},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{1, 2},
},
expectedStart: -1,
expectedCounts: []uint64{1, 2},
expectedScale: -1,
},
{
maxSize: 2,
values: []N{4, 2, 1},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{1, 2},
},
expectedStart: -1,
expectedCounts: []uint64{1, 2},
expectedScale: -1,
},
}
@@ -138,8 +117,8 @@ func testExpoHistogramDataPointRecord[N int64 | float64](t *testing.T) {
dp.record(-v)
}
assert.Equal(t, tt.expectedBuckets, dp.posBuckets, "positive buckets")
assert.Equal(t, tt.expectedBuckets, dp.negBuckets, "negative buckets")
assertBuckets(t, tt.expectedStart, tt.expectedCounts, dp.posBuckets, "positive buckets")
assertBuckets(t, tt.expectedStart, tt.expectedCounts, dp.negBuckets, "negative buckets")
assert.Equal(t, tt.expectedScale, dp.scale.Load(), "scale")
})
}
@@ -233,7 +212,8 @@ func testExpoHistogramDataPointRecordFloat64(t *testing.T) {
type TestCase struct {
maxSize int
values []float64
expectedBuckets expoBuckets
expectedStart int32
expectedCounts []uint64
expectedScale int32
}
@@ -241,64 +221,50 @@ func testExpoHistogramDataPointRecordFloat64(t *testing.T) {
{
maxSize: 4,
values: []float64{2, 2, 2, 1, 8, 0.5},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{2, 3, 1},
},
expectedStart: -1,
expectedCounts: []uint64{2, 3, 1},
expectedScale: -1,
},
{
maxSize: 2,
values: []float64{1, 0.5, 2},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{2, 1},
},
expectedStart: -1,
expectedCounts: []uint64{2, 1},
expectedScale: -1,
},
{
maxSize: 2,
values: []float64{1, 2, 0.5},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{2, 1},
},
expectedStart: -1,
expectedCounts: []uint64{2, 1},
expectedScale: -1,
},
{
maxSize: 2,
values: []float64{2, 0.5, 1},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{2, 1},
},
expectedStart: -1,
expectedCounts: []uint64{2, 1},
expectedScale: -1,
},
{
maxSize: 2,
values: []float64{2, 1, 0.5},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{2, 1},
},
expectedStart: -1,
expectedCounts: []uint64{2, 1},
expectedScale: -1,
},
{
maxSize: 2,
values: []float64{0.5, 1, 2},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{2, 1},
},
expectedStart: -1,
expectedCounts: []uint64{2, 1},
expectedScale: -1,
},
{
maxSize: 2,
values: []float64{0.5, 2, 1},
expectedBuckets: expoBuckets{
startBin: -1,
counts: []uint64{2, 1},
},
expectedStart: -1,
expectedCounts: []uint64{2, 1},
expectedScale: -1,
},
}
@@ -313,8 +279,9 @@ func testExpoHistogramDataPointRecordFloat64(t *testing.T) {
dp.record(-v)
}
assert.Equal(t, tt.expectedBuckets, dp.posBuckets)
assert.Equal(t, tt.expectedBuckets, dp.negBuckets)
assertBuckets(t, tt.expectedStart, tt.expectedCounts, dp.posBuckets, "positive buckets")
assertBuckets(t, tt.expectedStart, tt.expectedCounts, dp.negBuckets, "negative buckets")
assert.Equal(t, tt.expectedScale, dp.scale.Load(), "scale")
assert.Equal(t, tt.expectedScale, dp.scale.Load())
})
}
@@ -346,157 +313,125 @@ func TestExponentialHistogramDataPointRecordLimits(t *testing.T) {
}
}
func newBucket(startBin int32, counts []uint64) *expoBuckets {
b := &expoBuckets{startBin: startBin, counts: make([]atomic.Uint64, len(counts))}
for i, v := range counts {
b.counts[i].Store(v)
}
return b
}
func assertBuckets(t *testing.T, expectedStart int32, expectedCounts []uint64, actual expoBuckets, msg string) {
t.Helper()
assert.Equal(t, expectedStart, actual.startBin, "%s: startBin", msg)
var actualCounts []uint64
if len(actual.counts) > 0 {
actualCounts = make([]uint64, len(actual.counts))
for i := range actual.counts {
actualCounts[i] = actual.counts[i].Load()
}
}
assert.Equal(t, expectedCounts, actualCounts, "%s: counts", msg)
}
func TestExpoBucketDownscale(t *testing.T) {
tests := []struct {
name string
bucket *expoBuckets
scale int32
want *expoBuckets
wantStart int32
wantCounts []uint64
}{
{
name: "Empty bucket",
bucket: &expoBuckets{},
bucket: newBucket(0, nil),
scale: 3,
want: &expoBuckets{},
wantStart: 0,
wantCounts: nil,
},
{
name: "1 size bucket",
bucket: &expoBuckets{
startBin: 50,
counts: []uint64{7},
},
bucket: newBucket(50, []uint64{7}),
scale: 4,
want: &expoBuckets{
startBin: 3,
counts: []uint64{7},
},
wantStart: 3,
wantCounts: []uint64{7},
},
{
name: "zero scale",
bucket: &expoBuckets{
startBin: 50,
counts: []uint64{7, 5},
},
bucket: newBucket(50, []uint64{7, 5}),
scale: 0,
want: &expoBuckets{
startBin: 50,
counts: []uint64{7, 5},
},
wantStart: 50,
wantCounts: []uint64{7, 5},
},
{
name: "aligned bucket scale 1",
bucket: &expoBuckets{
startBin: 0,
counts: []uint64{1, 2, 3, 4, 5, 6},
},
bucket: newBucket(0, []uint64{1, 2, 3, 4, 5, 6}),
scale: 1,
want: &expoBuckets{
startBin: 0,
counts: []uint64{3, 7, 11},
},
wantStart: 0,
wantCounts: []uint64{3, 7, 11},
},
{
name: "aligned bucket scale 2",
bucket: &expoBuckets{
startBin: 0,
counts: []uint64{1, 2, 3, 4, 5, 6},
},
bucket: newBucket(0, []uint64{1, 2, 3, 4, 5, 6}),
scale: 2,
want: &expoBuckets{
startBin: 0,
counts: []uint64{10, 11},
},
wantStart: 0,
wantCounts: []uint64{10, 11},
},
{
name: "aligned bucket scale 3",
bucket: &expoBuckets{
startBin: 0,
counts: []uint64{1, 2, 3, 4, 5, 6},
},
bucket: newBucket(0, []uint64{1, 2, 3, 4, 5, 6}),
scale: 3,
want: &expoBuckets{
startBin: 0,
counts: []uint64{21},
},
wantStart: 0,
wantCounts: []uint64{21},
},
{
name: "unaligned bucket scale 1",
bucket: &expoBuckets{
startBin: 5,
counts: []uint64{1, 2, 3, 4, 5, 6},
}, // This is equivalent to [0,0,0,0,0,1,2,3,4,5,6]
bucket: newBucket(5, []uint64{1, 2, 3, 4, 5, 6}), // This is equivalent to [0,0,0,0,0,1,2,3,4,5,6]
scale: 1,
want: &expoBuckets{
startBin: 2,
counts: []uint64{1, 5, 9, 6},
}, // This is equivalent to [0,0,1,5,9,6]
wantStart: 2,
wantCounts: []uint64{1, 5, 9, 6}, // This is equivalent to [0,0,1,5,9,6]
},
{
name: "unaligned bucket scale 2",
bucket: &expoBuckets{
startBin: 7,
counts: []uint64{1, 2, 3, 4, 5, 6},
}, // This is equivalent to [0,0,0,0,0,0,0,1,2,3,4,5,6]
bucket: newBucket(7, []uint64{1, 2, 3, 4, 5, 6}), // This is equivalent to [0,0,0,0,0,0,0,1,2,3,4,5,6]
scale: 2,
want: &expoBuckets{
startBin: 1,
counts: []uint64{1, 14, 6},
}, // This is equivalent to [0,1,14,6]
wantStart: 1,
wantCounts: []uint64{1, 14, 6}, // This is equivalent to [0,1,14,6]
},
{
name: "unaligned bucket scale 3",
bucket: &expoBuckets{
startBin: 3,
counts: []uint64{1, 2, 3, 4, 5, 6},
}, // This is equivalent to [0,0,0,1,2,3,4,5,6]
bucket: newBucket(3, []uint64{1, 2, 3, 4, 5, 6}), // This is equivalent to [0,0,0,1,2,3,4,5,6]
scale: 3,
want: &expoBuckets{
startBin: 0,
counts: []uint64{15, 6},
}, // This is equivalent to [0,15,6]
wantStart: 0,
wantCounts: []uint64{15, 6}, // This is equivalent to [0,15,6]
},
{
name: "unaligned bucket scale 1",
bucket: &expoBuckets{
startBin: 1,
counts: []uint64{1, 0, 1},
},
bucket: newBucket(1, []uint64{1, 0, 1}),
scale: 1,
want: &expoBuckets{
startBin: 0,
counts: []uint64{1, 1},
},
wantStart: 0,
wantCounts: []uint64{1, 1},
},
{
name: "negative startBin",
bucket: &expoBuckets{
startBin: -1,
counts: []uint64{1, 0, 3},
},
bucket: newBucket(-1, []uint64{1, 0, 3}),
scale: 1,
want: &expoBuckets{
startBin: -1,
counts: []uint64{1, 3},
},
wantStart: -1,
wantCounts: []uint64{1, 3},
},
{
name: "negative startBin 2",
bucket: &expoBuckets{
startBin: -4,
counts: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
},
bucket: newBucket(-4, []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}),
scale: 1,
want: &expoBuckets{
startBin: -2,
counts: []uint64{3, 7, 11, 15, 19},
},
wantStart: -2,
wantCounts: []uint64{3, 7, 11, 15, 19},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.bucket.downscale(tt.scale)
assert.Equal(t, tt.want, tt.bucket)
assertBuckets(t, tt.wantStart, tt.wantCounts, *tt.bucket, tt.name)
})
}
}
@@ -506,59 +441,43 @@ func TestExpoBucketRecord(t *testing.T) {
name string
bucket *expoBuckets
bin int32
want *expoBuckets
wantStart int32
wantCounts []uint64
}{
{
name: "Empty Bucket creates first count",
bucket: &expoBuckets{},
bucket: newBucket(0, nil),
bin: -5,
want: &expoBuckets{
startBin: -5,
counts: []uint64{1},
},
wantStart: -5,
wantCounts: []uint64{1},
},
{
name: "Bin is in the bucket",
bucket: &expoBuckets{
startBin: 3,
counts: []uint64{1, 2, 3, 4, 5, 6},
},
bucket: newBucket(3, []uint64{1, 2, 3, 4, 5, 6}),
bin: 5,
want: &expoBuckets{
startBin: 3,
counts: []uint64{1, 2, 4, 4, 5, 6},
},
wantStart: 3,
wantCounts: []uint64{1, 2, 4, 4, 5, 6},
},
{
name: "Bin is before the start of the bucket",
bucket: &expoBuckets{
startBin: 1,
counts: []uint64{1, 2, 3, 4, 5, 6},
},
bucket: newBucket(1, []uint64{1, 2, 3, 4, 5, 6}),
bin: -2,
want: &expoBuckets{
startBin: -2,
counts: []uint64{1, 0, 0, 1, 2, 3, 4, 5, 6},
},
wantStart: -2,
wantCounts: []uint64{1, 0, 0, 1, 2, 3, 4, 5, 6},
},
{
name: "Bin is after the end of the bucket",
bucket: &expoBuckets{
startBin: -2,
counts: []uint64{1, 2, 3, 4, 5, 6},
},
bucket: newBucket(-2, []uint64{1, 2, 3, 4, 5, 6}),
bin: 4,
want: &expoBuckets{
startBin: -2,
counts: []uint64{1, 2, 3, 4, 5, 6, 1},
},
wantStart: -2,
wantCounts: []uint64{1, 2, 3, 4, 5, 6, 1},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.bucket.record(tt.bin)
assert.Equal(t, tt.want, tt.bucket)
assertBuckets(t, tt.wantStart, tt.wantCounts, *tt.bucket, tt.name)
})
}
}
@@ -712,10 +631,7 @@ func TestSubNormal(t *testing.T) {
want.minMax.Update(math.SmallestNonzeroFloat64)
want.sum.add(3 * math.SmallestNonzeroFloat64)
want.scale.Store(20)
want.posBuckets = expoBuckets{
startBin: -1126170625,
counts: []uint64{3},
}
want.posBuckets = *newBucket(-1126170625, []uint64{3})
ehdp := newExpoHistogramDataPoint[float64](alice, 4, 20, false, false)
ehdp.record(math.SmallestNonzeroFloat64)