1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Simplify the last-value aggregate (#4343)

Instead of treating the returned *lastValue as an aggregator from
newLastValue, just use the type directly to construct the Measure and
ComputeAggregation functions returned from the Builder.

Accept a destination type for the underlying computeAggregation. This
allows memory reuse for collections which adds a considerable
optimization.

Simplify the integration testing of the last-value aggregate.

Update benchmarking.
This commit is contained in:
Tyler Yahn
2023-07-20 23:30:11 -07:00
committed by GitHub
parent a37cb0504a
commit cbc5890d9c
5 changed files with 216 additions and 106 deletions

View File

@@ -42,6 +42,17 @@ type Builder[N int64 | float64] struct {
Filter attribute.Filter
}
func (b Builder[N]) filter(f Measure[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
return func(ctx context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
f(ctx, n, fAttr)
}
}
return f
}
func (b Builder[N]) input(agg aggregator[N]) Measure[N] {
if b.Filter != nil {
fltr := b.Filter // Copy to make it immutable after assignment.
@@ -63,11 +74,13 @@ func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// a last-value aggregate.
lv := newLastValue[N]()
return b.input(lv), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = lv.Aggregation()
return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
// Ignore if dest is not a metricdata.Gauge. The chance for memory
// reuse of the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
lv.computeAggregation(&gData.DataPoints)
*dest = gData
return len(gData.DataPoints)
}
}
@@ -129,3 +142,12 @@ func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistog
return len(hData.DataPoints)
}
}
// reset ensures s has capacity and sets it length. If the capacity of s too
// small, a new slice is returned with the specified capacity and length.
func reset[T any](s []T, length, capacity int) []T {
if cap(s) < capacity {
return make([]T, length, capacity)
}
return s[:length]
}

View File

@@ -16,25 +16,43 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
import (
"context"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
var (
keyUser = "user"
userAlice = attribute.String(keyUser, "Alice")
adminTrue = attribute.Bool("admin", true)
keyUser = "user"
userAlice = attribute.String(keyUser, "Alice")
userBob = attribute.String(keyUser, "Bob")
adminTrue = attribute.Bool("admin", true)
adminFalse = attribute.Bool("admin", false)
alice = attribute.NewSet(userAlice, adminTrue)
bob = attribute.NewSet(userBob, adminFalse)
// Filtered.
attrFltr = func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key(keyUser)
}
fltrAlice = attribute.NewSet(userAlice)
fltrBob = attribute.NewSet(userBob)
// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
staticNowFunc = func() time.Time { return staticTime }
// Pass to t.Cleanup to override the now function with staticNowFunc and
// revert once the test completes. E.g. t.Cleanup(mockTime(now)).
mockTime = func(orig func() time.Time) (cleanup func()) {
now = staticNowFunc
return func() { now = orig }
}
)
type inputTester[N int64 | float64] struct {
@@ -73,3 +91,92 @@ func testBuilderInput[N int64 | float64]() func(t *testing.T) {
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice))
}
}
type arg[N int64 | float64] struct {
ctx context.Context
value N
attr attribute.Set
}
type output struct {
n int
agg metricdata.Aggregation
}
type teststep[N int64 | float64] struct {
input []arg[N]
expect output
}
func test[N int64 | float64](meas Measure[N], comp ComputeAggregation, steps []teststep[N]) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
got := new(metricdata.Aggregation)
for i, step := range steps {
for _, args := range step.input {
meas(args.ctx, args.value, args.attr)
}
t.Logf("step: %d", i)
assert.Equal(t, step.expect.n, comp(got), "incorrect data size")
metricdatatest.AssertAggregationsEqual(t, step.expect.agg, *got)
}
}
}
func benchmarkAggregate[N int64 | float64](factory func() (Measure[N], ComputeAggregation)) func(*testing.B) {
counts := []int{1, 10, 100}
return func(b *testing.B) {
for _, n := range counts {
b.Run(strconv.Itoa(n), func(b *testing.B) {
benchmarkAggregateN(b, factory, n)
})
}
}
}
var bmarkRes metricdata.Aggregation
func benchmarkAggregateN[N int64 | float64](b *testing.B, factory func() (Measure[N], ComputeAggregation), count int) {
ctx := context.Background()
attrs := make([]attribute.Set, count)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
}
b.Run("Measure", func(b *testing.B) {
got := &bmarkRes
meas, comp := factory()
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for _, attr := range attrs {
meas(ctx, 1, attr)
}
}
comp(got)
})
b.Run("ComputeAggregation", func(b *testing.B) {
comps := make([]ComputeAggregation, b.N)
for n := range comps {
meas, comp := factory()
for _, attr := range attrs {
meas(ctx, 1, attr)
}
comps[n] = comp
}
got := &bmarkRes
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
comps[n](got)
}
})
}

View File

@@ -18,7 +18,6 @@ import (
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
@@ -33,20 +32,7 @@ const (
defaultCycles = 3
)
var (
bob = attribute.NewSet(attribute.String(keyUser, "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String(keyUser, "carol"), attribute.Bool("admin", false))
// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
staticNowFunc = func() time.Time { return staticTime }
// Pass to t.Cleanup to override the now function with staticNowFunc and
// revert once the test completes. E.g. t.Cleanup(mockTime(now)).
mockTime = func(orig func() time.Time) (cleanup func()) {
now = staticNowFunc
return func() { now = orig }
}
)
var carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))
func monoIncr[N int64 | float64]() setMap[N] {
return setMap[N]{alice: 1, bob: 10, carol: 2}

View File

@@ -15,6 +15,7 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"sync"
"time"
@@ -28,6 +29,10 @@ type datapoint[N int64 | float64] struct {
value N
}
func newLastValue[N int64 | float64]() *lastValue[N] {
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
}
// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
sync.Mutex
@@ -35,40 +40,29 @@ type lastValue[N int64 | float64] struct {
values map[attribute.Set]datapoint[N]
}
// newLastValue returns an Aggregator that summarizes a set of measurements as
// the last one made.
func newLastValue[N int64 | float64]() aggregator[N] {
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
}
func (s *lastValue[N]) Aggregate(value N, attr attribute.Set) {
func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) {
d := datapoint[N]{timestamp: now(), value: value}
s.Lock()
s.values[attr] = d
s.Unlock()
}
func (s *lastValue[N]) Aggregation() metricdata.Aggregation {
func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
s.Lock()
defer s.Unlock()
if len(s.values) == 0 {
return nil
}
n := len(s.values)
*dest = reset(*dest, n, n)
gauge := metricdata.Gauge[N]{
DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)),
}
var i int
for a, v := range s.values {
gauge.DataPoints = append(gauge.DataPoints, metricdata.DataPoint[N]{
Attributes: a,
// The event time is the only meaningful timestamp, StartTime is
// ignored.
Time: v.timestamp,
Value: v.value,
})
(*dest)[i].Attributes = a
// The event time is the only meaningful timestamp, StartTime is
// ignored.
(*dest)[i].Time = v.timestamp
(*dest)[i].Value = v.value
// Do not report stale values.
delete(s.values, a)
i++
}
return gauge
}

View File

@@ -15,12 +15,10 @@
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
func TestLastValue(t *testing.T) {
@@ -31,66 +29,69 @@ func TestLastValue(t *testing.T) {
}
func testLastValue[N int64 | float64]() func(*testing.T) {
tester := &aggregatorTester[N]{
GoroutineN: defaultGoroutines,
MeasurementN: defaultMeasurements,
CycleN: defaultCycles,
}
eFunc := func(increments setMap[N]) expectFunc {
data := make([]metricdata.DataPoint[N], 0, len(increments))
for a, v := range increments {
point := metricdata.DataPoint[N]{Attributes: a, Time: now(), Value: N(v)}
data = append(data, point)
}
gauge := metricdata.Gauge[N]{DataPoints: data}
return func(int) metricdata.Aggregation { return gauge }
}
incr := monoIncr[N]()
return tester.Run(newLastValue[N](), incr, eFunc(incr))
}
func testLastValueReset[N int64 | float64](t *testing.T) {
t.Cleanup(mockTime(now))
a := newLastValue[N]()
assert.Nil(t, a.Aggregation())
a.Aggregate(1, alice)
expect := metricdata.Gauge[N]{
DataPoints: []metricdata.DataPoint[N]{{
Attributes: alice,
Time: now(),
Value: 1,
}},
}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())
// The attr set should be forgotten once Aggregations is called.
expect.DataPoints = nil
assert.Nil(t, a.Aggregation())
// Aggregating another set should not affect the original (alice).
a.Aggregate(1, bob)
expect.DataPoints = []metricdata.DataPoint[N]{{
Attributes: bob,
Time: now(),
Value: 1,
}}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())
}
func TestLastValueReset(t *testing.T) {
t.Run("Int64", testLastValueReset[int64])
t.Run("Float64", testLastValueReset[float64])
}
func TestEmptyLastValueNilAggregation(t *testing.T) {
assert.Nil(t, newLastValue[int64]().Aggregation())
assert.Nil(t, newLastValue[float64]().Aggregation())
in, out := Builder[N]{Filter: attrFltr}.LastValue()
ctx := context.Background()
return test[N](in, out, []teststep[N]{
{
// Empty output if nothing is measured.
input: []arg[N]{},
expect: output{n: 0, agg: metricdata.Gauge[N]{}},
}, {
input: []arg[N]{
{ctx, 1, alice},
{ctx, -1, bob},
{ctx, 1, fltrAlice},
{ctx, 2, alice},
{ctx, -10, bob},
},
expect: output{
n: 2,
agg: metricdata.Gauge[N]{
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
Time: staticTime,
Value: 2,
},
{
Attributes: fltrBob,
Time: staticTime,
Value: -10,
},
},
},
},
}, {
// Everything resets, do not report old measurements.
input: []arg[N]{},
expect: output{n: 0, agg: metricdata.Gauge[N]{}},
}, {
input: []arg[N]{
{ctx, 10, alice},
{ctx, 3, bob},
},
expect: output{
n: 2,
agg: metricdata.Gauge[N]{
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
Time: staticTime,
Value: 10,
},
{
Attributes: fltrBob,
Time: staticTime,
Value: 3,
},
},
},
},
},
})
}
func BenchmarkLastValue(b *testing.B) {
b.Run("Int64", benchmarkAggregator(newLastValue[int64]))
b.Run("Float64", benchmarkAggregator(newLastValue[float64]))
b.Run("Int64", benchmarkAggregate(Builder[int64]{}.LastValue))
b.Run("Float64", benchmarkAggregate(Builder[float64]{}.LastValue))
}