1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-26 03:52:03 +02:00

Merge pull request #758 from jmacd/jmacd/hist_search

Use []float64 for histogram boundaries, not []metric.Number
This commit is contained in:
Tyler Yahn 2020-05-21 11:54:05 -07:00 committed by GitHub
commit 51e57199b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 220 additions and 92 deletions

View File

@ -54,7 +54,7 @@ type Exporter struct {
onError func(error)
defaultSummaryQuantiles []float64
defaultHistogramBoundaries []metric.Number
defaultHistogramBoundaries []float64
}
var _ http.Handler = &Exporter{}
@ -85,7 +85,7 @@ type Config struct {
// DefaultHistogramBoundaries defines the default histogram bucket
// boundaries.
DefaultHistogramBoundaries []metric.Number
DefaultHistogramBoundaries []float64
// OnError is a function that handle errors that may occur while exporting metrics.
// TODO: This should be refactored or even removed once we have a better error handling mechanism.
@ -330,12 +330,12 @@ func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregator
// The bucket with upper-bound +inf is not included.
counts := make(map[float64]uint64, len(buckets.Boundaries))
for i := range buckets.Boundaries {
boundary := buckets.Boundaries[i].CoerceToFloat64(kind)
totalCount += buckets.Counts[i].AsUint64()
boundary := buckets.Boundaries[i]
totalCount += uint64(buckets.Counts[i])
counts[boundary] = totalCount
}
// Include the +inf bucket in the total count.
totalCount += buckets.Counts[len(buckets.Counts)-1].AsUint64()
totalCount += uint64(buckets.Counts[len(buckets.Counts)-1])
m, err := prometheus.NewConstHistogram(desc, totalCount, sum.CoerceToFloat64(kind), counts, labels...)
if err != nil {

View File

@ -34,7 +34,7 @@ import (
func TestPrometheusExporter(t *testing.T) {
exporter, err := prometheus.NewExportPipeline(prometheus.Config{
DefaultHistogramBoundaries: []metric.Number{metric.NewFloat64Number(-0.5), metric.NewFloat64Number(1)},
DefaultHistogramBoundaries: []float64{-0.5, 1},
})
require.NoError(t, err)

View File

@ -97,7 +97,7 @@ func (p *CheckpointSet) AddValueRecorder(desc *metric.Descriptor, v float64, lab
p.updateAggregator(desc, array.New(), v, labels...)
}
func (p *CheckpointSet) AddHistogramValueRecorder(desc *metric.Descriptor, boundaries []metric.Number, v float64, labels ...kv.KeyValue) {
func (p *CheckpointSet) AddHistogramValueRecorder(desc *metric.Descriptor, boundaries []float64, v float64, labels ...kv.KeyValue) {
p.updateAggregator(desc, histogram.New(desc, boundaries), v, labels...)
}

View File

@ -68,8 +68,14 @@ type (
// For a Histogram with N defined boundaries, e.g, [x, y, z].
// There are N+1 counts: [-inf, x), [x, y), [y, z), [z, +inf]
Buckets struct {
Boundaries []metric.Number
Counts []metric.Number
// Boundaries are floating point numbers, even when
// aggregating integers.
Boundaries []float64
// Counts are floating point numbers to account for
// the possibility of sampling which allows for
// non-integer count values.
Counts []float64
}
// Histogram returns the count of events in pre-determined buckets.

View File

@ -0,0 +1,129 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package histogram_test
import (
"context"
"math/rand"
"testing"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
)
const inputRange = 1e6
func benchmarkHistogramSearchFloat64(b *testing.B, size int) {
boundaries := make([]float64, size)
for i := range boundaries {
boundaries[i] = rand.Float64() * inputRange
}
values := make([]float64, b.N)
for i := range values {
values[i] = rand.Float64() * inputRange
}
desc := test.NewAggregatorTest(metric.ValueRecorderKind, metric.Float64NumberKind)
agg := histogram.New(desc, boundaries)
ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = agg.Update(ctx, metric.NewFloat64Number(values[i]), desc)
}
}
func BenchmarkHistogramSearchFloat64_1(b *testing.B) {
benchmarkHistogramSearchFloat64(b, 1)
}
func BenchmarkHistogramSearchFloat64_8(b *testing.B) {
benchmarkHistogramSearchFloat64(b, 8)
}
func BenchmarkHistogramSearchFloat64_16(b *testing.B) {
benchmarkHistogramSearchFloat64(b, 16)
}
func BenchmarkHistogramSearchFloat64_32(b *testing.B) {
benchmarkHistogramSearchFloat64(b, 32)
}
func BenchmarkHistogramSearchFloat64_64(b *testing.B) {
benchmarkHistogramSearchFloat64(b, 64)
}
func BenchmarkHistogramSearchFloat64_128(b *testing.B) {
benchmarkHistogramSearchFloat64(b, 128)
}
func BenchmarkHistogramSearchFloat64_256(b *testing.B) {
benchmarkHistogramSearchFloat64(b, 256)
}
func BenchmarkHistogramSearchFloat64_512(b *testing.B) {
benchmarkHistogramSearchFloat64(b, 512)
}
func BenchmarkHistogramSearchFloat64_1024(b *testing.B) {
benchmarkHistogramSearchFloat64(b, 1024)
}
func benchmarkHistogramSearchInt64(b *testing.B, size int) {
boundaries := make([]float64, size)
for i := range boundaries {
boundaries[i] = rand.Float64() * inputRange
}
values := make([]int64, b.N)
for i := range values {
values[i] = int64(rand.Float64() * inputRange)
}
desc := test.NewAggregatorTest(metric.ValueRecorderKind, metric.Int64NumberKind)
agg := histogram.New(desc, boundaries)
ctx := context.Background()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = agg.Update(ctx, metric.NewInt64Number(values[i]), desc)
}
}
func BenchmarkHistogramSearchInt64_1(b *testing.B) {
benchmarkHistogramSearchInt64(b, 1)
}
func BenchmarkHistogramSearchInt64_8(b *testing.B) {
benchmarkHistogramSearchInt64(b, 8)
}
func BenchmarkHistogramSearchInt64_16(b *testing.B) {
benchmarkHistogramSearchInt64(b, 16)
}
func BenchmarkHistogramSearchInt64_32(b *testing.B) {
benchmarkHistogramSearchInt64(b, 32)
}
func BenchmarkHistogramSearchInt64_64(b *testing.B) {
benchmarkHistogramSearchInt64(b, 64)
}
func BenchmarkHistogramSearchInt64_128(b *testing.B) {
benchmarkHistogramSearchInt64(b, 128)
}
func BenchmarkHistogramSearchInt64_256(b *testing.B) {
benchmarkHistogramSearchInt64(b, 256)
}
func BenchmarkHistogramSearchInt64_512(b *testing.B) {
benchmarkHistogramSearchInt64(b, 512)
}
func BenchmarkHistogramSearchInt64_1024(b *testing.B) {
benchmarkHistogramSearchInt64(b, 1024)
}

View File

@ -36,7 +36,7 @@ type (
lock sync.Mutex
current state
checkpoint state
boundaries []metric.Number
boundaries []float64
kind metric.NumberKind
}
@ -44,7 +44,7 @@ type (
// the sum and counts for all observed values and
// the less than equal bucket count for the pre-determined boundaries.
state struct {
bucketCounts []metric.Number
bucketCounts []float64
count metric.Number
sum metric.Number
}
@ -63,23 +63,19 @@ var _ aggregator.Histogram = &Aggregator{}
// Note that this aggregator maintains each value using independent
// atomic operations, which introduces the possibility that
// checkpoints are inconsistent.
func New(desc *metric.Descriptor, boundaries []metric.Number) *Aggregator {
func New(desc *metric.Descriptor, boundaries []float64) *Aggregator {
// Boundaries MUST be ordered otherwise the histogram could not
// be properly computed.
sortedBoundaries := numbers{
numbers: make([]metric.Number, len(boundaries)),
kind: desc.NumberKind(),
}
sortedBoundaries := make([]float64, len(boundaries))
copy(sortedBoundaries.numbers, boundaries)
sort.Sort(&sortedBoundaries)
boundaries = sortedBoundaries.numbers
copy(sortedBoundaries, boundaries)
sort.Float64s(sortedBoundaries)
return &Aggregator{
kind: desc.NumberKind(),
boundaries: boundaries,
current: emptyState(boundaries),
checkpoint: emptyState(boundaries),
boundaries: sortedBoundaries,
current: emptyState(sortedBoundaries),
checkpoint: emptyState(sortedBoundaries),
}
}
@ -117,30 +113,42 @@ func (c *Aggregator) Checkpoint(ctx context.Context, desc *metric.Descriptor) {
c.lock.Unlock()
}
func emptyState(boundaries []metric.Number) state {
func emptyState(boundaries []float64) state {
return state{
bucketCounts: make([]metric.Number, len(boundaries)+1),
bucketCounts: make([]float64, len(boundaries)+1),
}
}
// Update adds the recorded measurement to the current data set.
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
kind := desc.NumberKind()
asFloat := number.CoerceToFloat64(kind)
bucketID := len(c.boundaries)
for i, boundary := range c.boundaries {
if number.CompareNumber(kind, boundary) < 0 {
if asFloat < boundary {
bucketID = i
break
}
}
// Note: Binary-search was compared using the benchmarks. The following
// code is equivalent to the linear search above:
//
// bucketID := sort.Search(len(c.boundaries), func(i int) bool {
// return asFloat < c.boundaries[i]
// })
//
// The binary search wins for very large boundary sets, but
// the linear search performs better up through arrays between
// 256 and 512 elements, which is a relatively large histogram, so we
// continue to prefer linear search.
c.lock.Lock()
defer c.lock.Unlock()
c.current.count.AddInt64(1)
c.current.sum.AddNumber(kind, number)
c.current.bucketCounts[bucketID].AddUint64(1)
c.current.bucketCounts[bucketID]++
return nil
}
@ -156,27 +164,7 @@ func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error
c.checkpoint.count.AddNumber(metric.Uint64NumberKind, o.checkpoint.count)
for i := 0; i < len(c.checkpoint.bucketCounts); i++ {
c.checkpoint.bucketCounts[i].AddNumber(metric.Uint64NumberKind, o.checkpoint.bucketCounts[i])
c.checkpoint.bucketCounts[i] += o.checkpoint.bucketCounts[i]
}
return nil
}
// numbers is an auxiliary struct to order histogram bucket boundaries (slice of kv.Number)
type numbers struct {
numbers []metric.Number
kind metric.NumberKind
}
var _ sort.Interface = (*numbers)(nil)
func (n *numbers) Len() int {
return len(n.numbers)
}
func (n *numbers) Less(i, j int) bool {
return -1 == n.numbers[i].CompareNumber(n.kind, n.numbers[j])
}
func (n *numbers) Swap(i, j int) {
n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i]
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package histogram
package histogram_test
import (
"context"
@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
)
@ -57,36 +58,33 @@ var (
},
}
boundaries = map[metric.NumberKind][]metric.Number{
metric.Float64NumberKind: {metric.NewFloat64Number(500), metric.NewFloat64Number(250), metric.NewFloat64Number(750)},
metric.Int64NumberKind: {metric.NewInt64Number(500), metric.NewInt64Number(250), metric.NewInt64Number(750)},
}
boundaries = []float64{500, 250, 750}
)
func TestHistogramAbsolute(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
histogram(t, profile, positiveOnly)
testHistogram(t, profile, positiveOnly)
})
}
func TestHistogramNegativeOnly(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
histogram(t, profile, negativeOnly)
testHistogram(t, profile, negativeOnly)
})
}
func TestHistogramPositiveAndNegative(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
histogram(t, profile, positiveAndNegative)
testHistogram(t, profile, positiveAndNegative)
})
}
// Validates count, sum and buckets for a given profile and policy
func histogram(t *testing.T, profile test.Profile, policy policy) {
func testHistogram(t *testing.T, profile test.Profile, policy policy) {
ctx := context.Background()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor, boundaries[profile.NumberKind])
agg := histogram.New(descriptor, boundaries)
all := test.NewNumbers(profile.NumberKind)
@ -107,18 +105,21 @@ func histogram(t *testing.T, profile test.Profile, policy policy) {
asum.CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - "+policy.name)
require.Nil(t, err)
require.NoError(t, err)
count, err := agg.Count()
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
buckets, err := agg.Histogram()
require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := agg.checkpoint.bucketCounts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg.checkpoint.bucketCounts)
bCount := uint64(buckets.Counts[i])
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
}
}
@ -126,12 +127,12 @@ func TestHistogramInitial(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor, boundaries[profile.NumberKind])
agg := histogram.New(descriptor, boundaries)
buckets, err := agg.Histogram()
require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries[profile.NumberKind])+1)
require.Equal(t, len(buckets.Boundaries), len(boundaries[profile.NumberKind]))
require.Equal(t, len(buckets.Counts), len(boundaries)+1)
require.Equal(t, len(buckets.Boundaries), len(boundaries))
})
}
@ -141,8 +142,8 @@ func TestHistogramMerge(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg1 := New(descriptor, boundaries[profile.NumberKind])
agg2 := New(descriptor, boundaries[profile.NumberKind])
agg1 := histogram.New(descriptor, boundaries)
agg2 := histogram.New(descriptor, boundaries)
all := test.NewNumbers(profile.NumberKind)
@ -171,18 +172,21 @@ func TestHistogramMerge(t *testing.T) {
asum.CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - absolute")
require.Nil(t, err)
require.NoError(t, err)
count, err := agg1.Count()
require.Equal(t, all.Count(), count, "Same count - absolute")
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, len(agg1.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
buckets, err := agg1.Histogram()
require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
counts := calcBuckets(all.Points(), profile)
for i, v := range counts {
bCount := agg1.checkpoint.bucketCounts[i].AsUint64()
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, agg1.checkpoint.bucketCounts)
bCount := uint64(buckets.Counts[i])
require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts)
}
})
}
@ -193,38 +197,37 @@ func TestHistogramNotSet(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor, boundaries[profile.NumberKind])
agg := histogram.New(descriptor, boundaries)
agg.Checkpoint(ctx, descriptor)
asum, err := agg.Sum()
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")
require.Nil(t, err)
require.NoError(t, err)
count, err := agg.Count()
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
require.Nil(t, err)
require.NoError(t, err)
require.Equal(t, len(agg.checkpoint.bucketCounts), len(boundaries[profile.NumberKind])+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range agg.checkpoint.bucketCounts {
require.Equal(t, uint64(0), bCount.AsUint64(), "Bucket #%d must have 0 observed values", i)
buckets, err := agg.Histogram()
require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range buckets.Counts {
require.Equal(t, uint64(0), uint64(bCount), "Bucket #%d must have 0 observed values", i)
}
})
}
func calcBuckets(points []metric.Number, profile test.Profile) []uint64 {
sortedBoundaries := numbers{
numbers: make([]metric.Number, len(boundaries[profile.NumberKind])),
kind: profile.NumberKind,
}
sortedBoundaries := make([]float64, len(boundaries))
copy(sortedBoundaries.numbers, boundaries[profile.NumberKind])
sort.Sort(&sortedBoundaries)
boundaries := sortedBoundaries.numbers
copy(sortedBoundaries, boundaries)
sort.Float64s(sortedBoundaries)
counts := make([]uint64, len(boundaries)+1)
counts := make([]uint64, len(sortedBoundaries)+1)
idx := 0
for _, p := range points {
for idx < len(boundaries) && p.CompareNumber(profile.NumberKind, boundaries[idx]) != -1 {
for idx < len(sortedBoundaries) && p.CoerceToFloat64(profile.NumberKind) >= sortedBoundaries[idx] {
idx++
}
counts[idx]++

View File

@ -81,6 +81,8 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
// TODO: Expose Numbers in api/metric for sorting support
type Numbers struct {
// numbers has to be aligned for 64-bit atomic operations.
numbers []metric.Number

View File

@ -26,7 +26,7 @@ import (
func TestStressInt64Histogram(t *testing.T) {
desc := metric.NewDescriptor("some_metric", metric.ValueRecorderKind, metric.Int64NumberKind)
h := histogram.New(&desc, []metric.Number{metric.NewInt64Number(25), metric.NewInt64Number(50), metric.NewInt64Number(75)})
h := histogram.New(&desc, []float64{25, 50, 75})
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
@ -51,7 +51,7 @@ func TestStressInt64Histogram(t *testing.T) {
var realCount int64
for _, c := range b.Counts {
v := c.AsInt64()
v := int64(c)
realCount += v
}

View File

@ -31,7 +31,7 @@ type (
config *ddsketch.Config
}
selectorHistogram struct {
boundaries []metric.Number
boundaries []float64
}
)
@ -75,7 +75,7 @@ func NewWithExactDistribution() export.AggregationSelector {
// histogram, and histogram aggregators for the three kinds of metric. This
// selector uses more memory than the NewWithInexpensiveDistribution because it
// uses a counter per bucket.
func NewWithHistogramDistribution(boundaries []metric.Number) export.AggregationSelector {
func NewWithHistogramDistribution(boundaries []float64) export.AggregationSelector {
return selectorHistogram{boundaries: boundaries}
}

View File

@ -56,7 +56,7 @@ func TestExactDistribution(t *testing.T) {
}
func TestHistogramDistribution(t *testing.T) {
ex := simple.NewWithHistogramDistribution([]metric.Number{})
ex := simple.NewWithHistogramDistribution(nil)
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testCounterDesc).(*sum.Aggregator) })
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueRecorderDesc).(*histogram.Aggregator) })
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueObserverDesc).(*histogram.Aggregator) })