1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-04-15 11:36:44 +02:00

Simplify the histogram implementation (#4370)

This commit is contained in:
Tyler Yahn 2023-07-27 14:06:22 -07:00 committed by GitHub
parent 4f0d73cbc2
commit 859a87098c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 280 additions and 428 deletions

View File

@ -16,12 +16,17 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
import ( import (
"context" "context"
"time"
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
) )
// now is used to return the current local time while allowing tests to
// override the default time.Now function.
var now = time.Now
// Measure receives measurements to be aggregated. // Measure receives measurements to be aggregated.
type Measure[N int64 | float64] func(context.Context, N, attribute.Set) type Measure[N int64 | float64] func(context.Context, N, attribute.Set)
@ -53,19 +58,6 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] {
return f return f
} }
func (b Builder[N]) input(agg aggregator[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(_ context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
agg.Aggregate(n, fAttr)
}
}
return func(_ context.Context, n N, a attribute.Set) {
agg.Aggregate(n, a)
}
}
// LastValue returns a last-value aggregate function input and output. // LastValue returns a last-value aggregate function input and output.
// //
// The Builder.Temporality is ignored and delta is use always. // The Builder.Temporality is ignored and delta is use always.
@ -111,19 +103,12 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
// ExplicitBucketHistogram returns a histogram aggregate function input and // ExplicitBucketHistogram returns a histogram aggregate function input and
// output. // output.
func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram, noSum bool) (Measure[N], ComputeAggregation) { func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram, noSum bool) (Measure[N], ComputeAggregation) {
var h aggregator[N] h := newHistogram[N](cfg, noSum)
switch b.Temporality { switch b.Temporality {
case metricdata.DeltaTemporality: case metricdata.DeltaTemporality:
h = newDeltaHistogram[N](cfg, noSum) return b.filter(h.measure), h.delta
default: default:
h = newCumulativeHistogram[N](cfg, noSum) return b.filter(h.measure), h.cumulative
}
return b.input(h), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = h.Aggregation()
hData, _ := (*dest).(metricdata.Histogram[N])
return len(hData.DataPoints)
} }
} }

View File

@ -55,21 +55,12 @@ var (
} }
) )
type inputTester[N int64 | float64] struct { func TestBuilderFilter(t *testing.T) {
aggregator[N] t.Run("Int64", testBuilderFilter[int64]())
t.Run("Float64", testBuilderFilter[float64]())
value N
attr attribute.Set
} }
func (it *inputTester[N]) Aggregate(v N, a attribute.Set) { it.value, it.attr = v, a } func testBuilderFilter[N int64 | float64]() func(t *testing.T) {
func TestBuilderInput(t *testing.T) {
t.Run("Int64", testBuilderInput[int64]())
t.Run("Float64", testBuilderInput[float64]())
}
func testBuilderInput[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
t.Helper() t.Helper()
@ -78,12 +69,11 @@ func testBuilderInput[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
t.Helper() t.Helper()
it := &inputTester[N]{} meas := b.filter(func(_ context.Context, v N, a attribute.Set) {
meas := b.input(it) assert.Equal(t, value, v, "measured incorrect value")
assert.Equal(t, wantA, a, "measured incorrect attributes")
})
meas(context.Background(), value, attr) meas(context.Background(), value, attr)
assert.Equal(t, value, it.value, "measured incorrect value")
assert.Equal(t, wantA, it.attr, "measured incorrect attributes")
} }
} }

View File

@ -1,40 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// now is used to return the current local time while allowing tests to
// override the default time.Now function.
var now = time.Now
// aggregator forms an aggregation from a collection of recorded measurements.
//
// Aggregators need to be comparable so they can be de-duplicated by the SDK
// when it creates them for multiple views.
type aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
Aggregate(measurement N, attr attribute.Set)
// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
Aggregation() metricdata.Aggregation
}

View File

@ -1,148 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"strconv"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
const (
defaultGoroutines = 5
defaultMeasurements = 30
defaultCycles = 3
)
var carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))
func monoIncr[N int64 | float64]() setMap[N] {
return setMap[N]{alice: 1, bob: 10, carol: 2}
}
// setMap maps attribute sets to a number.
type setMap[N int64 | float64] map[attribute.Set]N
// expectFunc is a function that returns an Aggregation of expected values for
// a cycle that contains m measurements (total across all goroutines). Each
// call advances the cycle.
type expectFunc func(m int) metricdata.Aggregation
// aggregatorTester runs an acceptance test on an Aggregator. It will ask an
// Aggregator to aggregate a set of values as if they were real measurements
// made MeasurementN number of times. This will be done in GoroutineN number
// of different goroutines. After the Aggregator has been asked to aggregate
// all these measurements, it is validated using a passed expecterFunc. This
// set of operation is a single cycle, and the the aggregatorTester will run
// CycleN number of cycles.
type aggregatorTester[N int64 | float64] struct {
// GoroutineN is the number of goroutines aggregatorTester will use to run
// the test with.
GoroutineN int
// MeasurementN is the number of measurements that are made each cycle a
// goroutine runs the test.
MeasurementN int
// CycleN is the number of times a goroutine will make a set of
// measurements.
CycleN int
}
func (at *aggregatorTester[N]) Run(a aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) {
t.Run("Comparable", func(t *testing.T) {
assert.NotPanics(t, func() {
_ = map[aggregator[N]]struct{}{a: {}}
})
})
t.Run("CorrectnessConcurrentSafe", func(t *testing.T) {
for i := 0; i < at.CycleN; i++ {
var wg sync.WaitGroup
wg.Add(at.GoroutineN)
for j := 0; j < at.GoroutineN; j++ {
go func() {
defer wg.Done()
for k := 0; k < at.MeasurementN; k++ {
for attrs, n := range incr {
a.Aggregate(N(n), attrs)
}
}
}()
}
wg.Wait()
metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation())
}
})
}
}
var bmarkResults metricdata.Aggregation
func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() aggregator[N], count int) {
attrs := make([]attribute.Set, count)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
}
b.Run("Aggregate", func(b *testing.B) {
agg := factory()
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for _, attr := range attrs {
agg.Aggregate(1, attr)
}
}
bmarkResults = agg.Aggregation()
})
b.Run("Aggregations", func(b *testing.B) {
aggs := make([]aggregator[N], b.N)
for n := range aggs {
a := factory()
for _, attr := range attrs {
a.Aggregate(1, attr)
}
aggs[n] = a
}
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
bmarkResults = aggs[n].Aggregation()
}
})
}
func benchmarkAggregator[N int64 | float64](factory func() aggregator[N]) func(*testing.B) {
counts := []int{1, 10, 100}
return func(b *testing.B) {
for _, n := range counts {
b.Run(strconv.Itoa(n), func(b *testing.B) {
benchmarkAggregatorN(b, factory, n)
})
}
}
}

View File

@ -15,6 +15,7 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import ( import (
"context"
"sort" "sort"
"sync" "sync"
"time" "time"
@ -75,7 +76,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[
// Aggregate records the measurement value, scoped by attr, and aggregates it // Aggregate records the measurement value, scoped by attr, and aggregates it
// into a histogram. // into a histogram.
func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) {
// This search will return an index in the range [0, len(s.bounds)], where // This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element // it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the buckets in that the length of buckets // of s.bounds. This aligns with the buckets in that the length of buckets
@ -106,111 +107,93 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
} }
} }
// newDeltaHistogram returns an Aggregator that summarizes a set of // newHistogram returns an Aggregator that summarizes a set of measurements as
// measurements as an histogram. Each histogram is scoped by attributes and // an histogram.
// the aggregation cycle the measurements were made in. func newHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) *histogram[N] {
// return &histogram[N]{
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregations method is called it will reset all histogram
// counts to zero.
func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] {
return &deltaHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries, noSum), histValues: newHistValues[N](cfg.Boundaries, noSum),
noMinMax: cfg.NoMinMax, noMinMax: cfg.NoMinMax,
start: now(), start: now(),
} }
} }
// deltaHistogram summarizes a set of measurements made in a single // histogram summarizes a set of measurements as an histogram with explicitly
// aggregation cycle as an histogram with explicitly defined buckets. // defined buckets.
type deltaHistogram[N int64 | float64] struct { type histogram[N int64 | float64] struct {
*histValues[N] *histValues[N]
noMinMax bool noMinMax bool
start time.Time start time.Time
} }
func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
t := now()
// If *dest is not a metricdata.Histogram, memory reuse is missed. In that
// case, use the zero-value h and hope for better alignment next cycle.
h, _ := (*dest).(metricdata.Histogram[N])
h.Temporality = metricdata.DeltaTemporality
s.valuesMu.Lock() s.valuesMu.Lock()
defer s.valuesMu.Unlock() defer s.valuesMu.Unlock()
if len(s.values) == 0 {
return nil
}
t := now()
// Do not allow modification of our copy of bounds. // Do not allow modification of our copy of bounds.
bounds := make([]float64, len(s.bounds)) bounds := make([]float64, len(s.bounds))
copy(bounds, s.bounds) copy(bounds, s.bounds)
h := metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality, n := len(s.values)
DataPoints: make([]metricdata.HistogramDataPoint[N], 0, len(s.values)), hDPts := reset(h.DataPoints, n, n)
}
var i int
for a, b := range s.values { for a, b := range s.values {
hdp := metricdata.HistogramDataPoint[N]{ hDPts[i].Attributes = a
Attributes: a, hDPts[i].StartTime = s.start
StartTime: s.start, hDPts[i].Time = t
Time: t, hDPts[i].Count = b.count
Count: b.count, hDPts[i].Bounds = bounds
Bounds: bounds, hDPts[i].BucketCounts = b.counts
BucketCounts: b.counts,
}
if !s.noSum { if !s.noSum {
hdp.Sum = b.total hDPts[i].Sum = b.total
} }
if !s.noMinMax { if !s.noMinMax {
hdp.Min = metricdata.NewExtrema(b.min) hDPts[i].Min = metricdata.NewExtrema(b.min)
hdp.Max = metricdata.NewExtrema(b.max) hDPts[i].Max = metricdata.NewExtrema(b.max)
} }
h.DataPoints = append(h.DataPoints, hdp)
// Unused attribute sets do not report. // Unused attribute sets do not report.
delete(s.values, a) delete(s.values, a)
i++
} }
// The delta collection cycle resets. // The delta collection cycle resets.
s.start = t s.start = t
return h
h.DataPoints = hDPts
*dest = h
return n
} }
// newCumulativeHistogram returns an Aggregator that summarizes a set of func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
// measurements as an histogram. Each histogram is scoped by attributes. t := now()
//
// Each aggregation cycle builds from the previous, the histogram counts are
// the bucketed counts of all values aggregated since the returned Aggregator
// was created.
func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] {
return &cumulativeHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries, noSum),
noMinMax: cfg.NoMinMax,
start: now(),
}
}
// cumulativeHistogram summarizes a set of measurements made over all // If *dest is not a metricdata.Histogram, memory reuse is missed. In that
// aggregation cycles as an histogram with explicitly defined buckets. // case, use the zero-value h and hope for better alignment next cycle.
type cumulativeHistogram[N int64 | float64] struct { h, _ := (*dest).(metricdata.Histogram[N])
*histValues[N] h.Temporality = metricdata.CumulativeTemporality
noMinMax bool
start time.Time
}
func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation {
s.valuesMu.Lock() s.valuesMu.Lock()
defer s.valuesMu.Unlock() defer s.valuesMu.Unlock()
if len(s.values) == 0 {
return nil
}
t := now()
// Do not allow modification of our copy of bounds. // Do not allow modification of our copy of bounds.
bounds := make([]float64, len(s.bounds)) bounds := make([]float64, len(s.bounds))
copy(bounds, s.bounds) copy(bounds, s.bounds)
h := metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality, n := len(s.values)
DataPoints: make([]metricdata.HistogramDataPoint[N], 0, len(s.values)), hDPts := reset(h.DataPoints, n, n)
}
var i int
for a, b := range s.values { for a, b := range s.values {
// The HistogramDataPoint field values returned need to be copies of // The HistogramDataPoint field values returned need to be copies of
// the buckets value as we will keep updating them. // the buckets value as we will keep updating them.
@ -220,26 +203,30 @@ func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation {
counts := make([]uint64, len(b.counts)) counts := make([]uint64, len(b.counts))
copy(counts, b.counts) copy(counts, b.counts)
hdp := metricdata.HistogramDataPoint[N]{ hDPts[i].Attributes = a
Attributes: a, hDPts[i].StartTime = s.start
StartTime: s.start, hDPts[i].Time = t
Time: t, hDPts[i].Count = b.count
Count: b.count, hDPts[i].Bounds = bounds
Bounds: bounds, hDPts[i].BucketCounts = counts
BucketCounts: counts,
}
if !s.noSum { if !s.noSum {
hdp.Sum = b.total hDPts[i].Sum = b.total
} }
if !s.noMinMax { if !s.noMinMax {
hdp.Min = metricdata.NewExtrema(b.min) hDPts[i].Min = metricdata.NewExtrema(b.min)
hdp.Max = metricdata.NewExtrema(b.max) hDPts[i].Max = metricdata.NewExtrema(b.max)
} }
h.DataPoints = append(h.DataPoints, hdp) i++
// TODO (#3006): This will use an unbounded amount of memory if there // TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute // are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not // sets that become "stale" need to be forgotten so this will not
// overload the system. // overload the system.
} }
return h
h.DataPoints = hDPts
*dest = h
return n
} }

View File

@ -15,6 +15,7 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import ( import (
"context"
"sort" "sort"
"testing" "testing"
@ -37,74 +38,155 @@ var (
func TestHistogram(t *testing.T) { func TestHistogram(t *testing.T) {
t.Cleanup(mockTime(now)) t.Cleanup(mockTime(now))
t.Run("Int64", testHistogram[int64])
t.Run("Float64", testHistogram[float64]) t.Run("Int64/Delta/Sum", testDeltaHist[int64](conf[int64]{hPt: hPointSummed[int64]}))
t.Run("Int64/Delta/NoSum", testDeltaHist[int64](conf[int64]{noSum: true, hPt: hPoint[int64]}))
t.Run("Float64/Delta/Sum", testDeltaHist[float64](conf[float64]{hPt: hPointSummed[float64]}))
t.Run("Float64/Delta/NoSum", testDeltaHist[float64](conf[float64]{noSum: true, hPt: hPoint[float64]}))
t.Run("Int64/Cumulative/Sum", testCumulativeHist[int64](conf[int64]{hPt: hPointSummed[int64]}))
t.Run("Int64/Cumulative/NoSum", testCumulativeHist[int64](conf[int64]{noSum: true, hPt: hPoint[int64]}))
t.Run("Float64/Cumulative/Sum", testCumulativeHist[float64](conf[float64]{hPt: hPointSummed[float64]}))
t.Run("Float64/Cumulative/NoSum", testCumulativeHist[float64](conf[float64]{noSum: true, hPt: hPoint[float64]}))
} }
func testHistogram[N int64 | float64](t *testing.T) { type conf[N int64 | float64] struct {
tester := &aggregatorTester[N]{ noSum bool
GoroutineN: defaultGoroutines, hPt func(attribute.Set, N, uint64) metricdata.HistogramDataPoint[N]
MeasurementN: defaultMeasurements,
CycleN: defaultCycles,
}
incr := monoIncr[N]()
eFunc := deltaHistSummedExpecter[N](incr)
t.Run("Delta/Summed", tester.Run(newDeltaHistogram[N](histConf, false), incr, eFunc))
eFunc = deltaHistExpecter[N](incr)
t.Run("Delta/NoSum", tester.Run(newDeltaHistogram[N](histConf, true), incr, eFunc))
eFunc = cumuHistSummedExpecter[N](incr)
t.Run("Cumulative/Summed", tester.Run(newCumulativeHistogram[N](histConf, false), incr, eFunc))
eFunc = cumuHistExpecter[N](incr)
t.Run("Cumulative/NoSum", tester.Run(newCumulativeHistogram[N](histConf, true), incr, eFunc))
} }
func deltaHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality} in, out := Builder[N]{
return func(m int) metricdata.Aggregation { Temporality: metricdata.DeltaTemporality,
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) Filter: attrFltr,
for a, v := range incr { }.ExplicitBucketHistogram(histConf, c.noSum)
h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(m))) ctx := context.Background()
} return test[N](in, out, []teststep[N]{
return h {
} input: []arg[N]{},
expect: output{
n: 0,
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{},
},
},
},
{
input: []arg[N]{
{ctx, 2, alice},
{ctx, 10, bob},
{ctx, 2, alice},
{ctx, 2, alice},
{ctx, 10, bob},
},
expect: output{
n: 2,
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 3),
c.hPt(fltrBob, 10, 2),
},
},
},
},
{
input: []arg[N]{
{ctx, 10, alice},
{ctx, 3, bob},
},
expect: output{
n: 2,
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 10, 1),
c.hPt(fltrBob, 3, 1),
},
},
},
},
{
input: []arg[N]{},
// Delta histograms are expected to reset.
expect: output{
n: 0,
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{},
},
},
},
})
} }
func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality} in, out := Builder[N]{
return func(m int) metricdata.Aggregation { Temporality: metricdata.CumulativeTemporality,
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) Filter: attrFltr,
for a, v := range incr { }.ExplicitBucketHistogram(histConf, c.noSum)
h.DataPoints = append(h.DataPoints, hPoint[N](a, v, uint64(m))) ctx := context.Background()
} return test[N](in, out, []teststep[N]{
return h {
} input: []arg[N]{},
} expect: output{
n: 0,
func cumuHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { agg: metricdata.Histogram[N]{
var cycle int Temporality: metricdata.CumulativeTemporality,
h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} DataPoints: []metricdata.HistogramDataPoint[N]{},
return func(m int) metricdata.Aggregation { },
cycle++ },
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) },
for a, v := range incr { {
h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(cycle*m))) input: []arg[N]{
} {ctx, 2, alice},
return h {ctx, 10, bob},
} {ctx, 2, alice},
} {ctx, 2, alice},
{ctx, 10, bob},
func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { },
var cycle int expect: output{
h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} n: 2,
return func(m int) metricdata.Aggregation { agg: metricdata.Histogram[N]{
cycle++ Temporality: metricdata.CumulativeTemporality,
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) DataPoints: []metricdata.HistogramDataPoint[N]{
for a, v := range incr { c.hPt(fltrAlice, 2, 3),
h.DataPoints = append(h.DataPoints, hPoint[N](a, v, uint64(cycle*m))) c.hPt(fltrBob, 10, 2),
} },
return h },
} },
},
{
input: []arg[N]{
{ctx, 2, alice},
{ctx, 10, bob},
},
expect: output{
n: 2,
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 4),
c.hPt(fltrBob, 10, 3),
},
},
},
},
{
input: []arg[N]{},
expect: output{
n: 2,
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 4),
c.hPt(fltrBob, 10, 3),
},
},
},
},
})
} }
// hPointSummed returns an HistogramDataPoint that started and ended now with // hPointSummed returns an HistogramDataPoint that started and ended now with
@ -190,93 +272,89 @@ func testBucketsSum[N int64 | float64]() func(t *testing.T) {
} }
} }
func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram, bool) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) { func TestHistogramImmutableBounds(t *testing.T) {
b := []float64{0, 1, 2} b := []float64{0, 1, 2}
cpB := make([]float64, len(b)) cpB := make([]float64, len(b))
copy(cpB, b) copy(cpB, b)
a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}, false) h := newHistogram[int64](aggregation.ExplicitBucketHistogram{Boundaries: b}, false)
return func(t *testing.T) { require.Equal(t, cpB, h.bounds)
require.Equal(t, cpB, getBounds(a))
b[0] = 10 b[0] = 10
assert.Equal(t, cpB, getBounds(a), "modifying the bounds argument should not change the bounds") assert.Equal(t, cpB, h.bounds, "modifying the bounds argument should not change the bounds")
a.Aggregate(5, alice) h.measure(context.Background(), 5, alice)
hdp := a.Aggregation().(metricdata.Histogram[N]).DataPoints[0]
hdp.Bounds[1] = 10
assert.Equal(t, cpB, getBounds(a), "modifying the Aggregation bounds should not change the bounds")
}
}
func TestHistogramImmutableBounds(t *testing.T) { var data metricdata.Aggregation = metricdata.Histogram[int64]{}
t.Run("Delta", testHistImmutableBounds( h.cumulative(&data)
newDeltaHistogram[int64], hdp := data.(metricdata.Histogram[int64]).DataPoints[0]
func(a aggregator[int64]) []float64 { hdp.Bounds[1] = 10
deltaH := a.(*deltaHistogram[int64]) assert.Equal(t, cpB, h.bounds, "modifying the Aggregation bounds should not change the bounds")
return deltaH.bounds
},
))
t.Run("Cumulative", testHistImmutableBounds(
newCumulativeHistogram[int64],
func(a aggregator[int64]) []float64 {
cumuH := a.(*cumulativeHistogram[int64])
return cumuH.bounds
},
))
} }
func TestCumulativeHistogramImutableCounts(t *testing.T) { func TestCumulativeHistogramImutableCounts(t *testing.T) {
a := newCumulativeHistogram[int64](histConf, false) h := newHistogram[int64](histConf, false)
a.Aggregate(5, alice) h.measure(context.Background(), 5, alice)
hdp := a.Aggregation().(metricdata.Histogram[int64]).DataPoints[0]
cumuH := a.(*cumulativeHistogram[int64]) var data metricdata.Aggregation = metricdata.Histogram[int64]{}
require.Equal(t, hdp.BucketCounts, cumuH.values[alice].counts) h.cumulative(&data)
hdp := data.(metricdata.Histogram[int64]).DataPoints[0]
require.Equal(t, hdp.BucketCounts, h.values[alice].counts)
cpCounts := make([]uint64, len(hdp.BucketCounts)) cpCounts := make([]uint64, len(hdp.BucketCounts))
copy(cpCounts, hdp.BucketCounts) copy(cpCounts, hdp.BucketCounts)
hdp.BucketCounts[0] = 10 hdp.BucketCounts[0] = 10
assert.Equal(t, cpCounts, cumuH.values[alice].counts, "modifying the Aggregator bucket counts should not change the Aggregator") assert.Equal(t, cpCounts, h.values[alice].counts, "modifying the Aggregator bucket counts should not change the Aggregator")
} }
func TestDeltaHistogramReset(t *testing.T) { func TestDeltaHistogramReset(t *testing.T) {
t.Cleanup(mockTime(now)) t.Cleanup(mockTime(now))
a := newDeltaHistogram[int64](histConf, false) h := newHistogram[int64](histConf, false)
assert.Nil(t, a.Aggregation())
var data metricdata.Aggregation = metricdata.Histogram[int64]{}
require.Equal(t, 0, h.delta(&data))
require.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0)
h.measure(context.Background(), 1, alice)
a.Aggregate(1, alice)
expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality}
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)} expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) h.delta(&data)
metricdatatest.AssertAggregationsEqual(t, expect, data)
// The attr set should be forgotten once Aggregations is called. // The attr set should be forgotten once Aggregations is called.
expect.DataPoints = nil expect.DataPoints = nil
assert.Nil(t, a.Aggregation()) assert.Equal(t, 0, h.delta(&data))
assert.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0)
// Aggregating another set should not affect the original (alice). // Aggregating another set should not affect the original (alice).
a.Aggregate(1, bob) h.measure(context.Background(), 1, bob)
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)} expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) h.delta(&data)
} metricdatatest.AssertAggregationsEqual(t, expect, data)
func TestEmptyHistogramNilAggregation(t *testing.T) {
assert.Nil(t, newCumulativeHistogram[int64](histConf, false).Aggregation())
assert.Nil(t, newCumulativeHistogram[float64](histConf, false).Aggregation())
assert.Nil(t, newDeltaHistogram[int64](histConf, false).Aggregation())
assert.Nil(t, newDeltaHistogram[float64](histConf, false).Aggregation())
} }
func BenchmarkHistogram(b *testing.B) { func BenchmarkHistogram(b *testing.B) {
b.Run("Int64", benchmarkHistogram[int64]) b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
b.Run("Float64", benchmarkHistogram[float64]) return Builder[int64]{
} Temporality: metricdata.CumulativeTemporality,
}.ExplicitBucketHistogram(histConf, false)
func benchmarkHistogram[N int64 | float64](b *testing.B) { }))
factory := func() aggregator[N] { return newDeltaHistogram[N](histConf, false) } b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
b.Run("Delta", benchmarkAggregator(factory)) return Builder[int64]{
factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf, false) } Temporality: metricdata.DeltaTemporality,
b.Run("Cumulative", benchmarkAggregator(factory)) }.ExplicitBucketHistogram(histConf, false)
}))
b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.CumulativeTemporality,
}.ExplicitBucketHistogram(histConf, false)
}))
b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.DeltaTemporality,
}.ExplicitBucketHistogram(histConf, false)
}))
} }