1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-24 03:47:19 +02:00

Add MetricAggregator.Merge() implementations (#253)

* Add MetricAggregator.Merge() implementations

* Update from feedback

* Type
This commit is contained in:
Joshua MacDonald 2019-10-30 22:15:27 -07:00 committed by GitHub
parent 563985f5d1
commit 915775e348
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 307 additions and 28 deletions

View File

@ -33,6 +33,9 @@ type MetricAggregator interface {
// called in a single-threaded context. Update()
// calls may arrive concurrently.
Collect(context.Context, MetricRecord, MetricBatcher)
// Merge combines state from two aggregators into one.
Merge(MetricAggregator, *Descriptor)
}
// MetricRecord is the unit of export, pairing a metric

View File

@ -61,3 +61,12 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, rec export.Me
c.current.AddNumberAtomic(kind, number)
}
func (c *Aggregator) Merge(oa export.MetricAggregator, desc *export.Descriptor) {
o, _ := oa.(*Aggregator)
if o == nil {
// TODO warn
return
}
c.checkpoint.AddNumber(desc.NumberKind(), o.checkpoint)
}

View File

@ -91,3 +91,31 @@ func TestCounterNonMonotonic(t *testing.T) {
require.Equal(t, sum, agg.AsNumber(), "Same sum - monotonic")
})
}
func TestCounterMerge(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()
batcher, record := test.NewAggregatorTest(export.CounterMetricKind, profile.NumberKind, false)
sum := core.Number(0)
for i := 0; i < count; i++ {
x := profile.Random(+1)
sum.AddNumber(profile.NumberKind, x)
agg1.Update(ctx, x, record)
agg2.Update(ctx, x, record)
}
agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)
agg1.Merge(agg2, record.Descriptor())
sum.AddNumber(record.Descriptor().NumberKind(), sum)
require.Equal(t, sum, agg1.AsNumber(), "Same sum - monotonic")
})
}

View File

@ -101,3 +101,13 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, rec export.Me
defer c.lock.Unlock()
c.current.Add(number.CoerceToFloat64(kind))
}
func (c *Aggregator) Merge(oa export.MetricAggregator, d *export.Descriptor) {
o, _ := oa.(*Aggregator)
if o == nil {
// TODO warn
return
}
c.checkpoint.Merge(o.checkpoint)
}

View File

@ -57,11 +57,57 @@ func TestDDSketchAbsolute(t *testing.T) {
all[len(all)-1].CoerceToFloat64(profile.NumberKind),
agg.Max(),
"Same max - absolute")
// Median
require.InEpsilon(t,
all[len(all)/2].CoerceToFloat64(profile.NumberKind),
all.Median(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
agg.Quantile(0.5),
0.1,
"Same median - absolute")
})
}
func TestDDSketchMerge(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false)
agg1 := New(NewDefaultConfig(), record.Descriptor())
agg2 := New(NewDefaultConfig(), record.Descriptor())
var all test.Numbers
for i := 0; i < count; i++ {
x := profile.Random(+1)
all = append(all, x)
agg1.Update(ctx, x, record)
}
for i := 0; i < count; i++ {
x := profile.Random(+1)
all = append(all, x)
agg2.Update(ctx, x, record)
}
agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)
agg1.Merge(agg2, record.Descriptor())
all.Sort()
require.InEpsilon(t,
all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
agg1.Sum(),
0.0000001,
"Same sum - absolute")
require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute")
require.Equal(t,
all[len(all)-1].CoerceToFloat64(profile.NumberKind),
agg1.Max(),
"Same max - absolute")
require.InEpsilon(t,
all.Median(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
agg1.Quantile(0.5),
0.1,
"Same median - absolute")
})
}

View File

@ -33,12 +33,10 @@ type (
// Aggregator aggregates gauge events.
Aggregator struct {
// data is an atomic pointer to *gaugeData. It is set
// to `nil` if the gauge has not been set since the
// last collection.
// current is an atomic pointer to *gaugeData. It is never nil.
current unsafe.Pointer
// N.B. Export is not called when checkpoint is nil
// checkpoint is a copy of the current value taken in Collect()
checkpoint unsafe.Pointer
}
@ -125,3 +123,34 @@ func (g *Aggregator) updateMonotonic(number core.Number, desc *export.Descriptor
}
}
}
func (g *Aggregator) Merge(oa export.MetricAggregator, desc *export.Descriptor) {
o, _ := oa.(*Aggregator)
if o == nil {
// TODO warn
return
}
ggd := (*gaugeData)(atomic.LoadPointer(&g.checkpoint))
ogd := (*gaugeData)(atomic.LoadPointer(&o.checkpoint))
if desc.Alternate() {
// Monotonic: use the greater value
cmp := ggd.value.CompareNumber(desc.NumberKind(), ogd.value)
if cmp > 0 {
return
}
if cmp < 0 {
g.checkpoint = unsafe.Pointer(ogd)
return
}
}
// Non-monotonic gauge or equal values
if ggd.timestamp.After(ogd.timestamp) {
return
}
g.checkpoint = unsafe.Pointer(ogd)
}

View File

@ -94,3 +94,59 @@ func TestGaugeMonotonicDescending(t *testing.T) {
require.Equal(t, first, agg.AsNumber(), "Same last value - monotonic")
})
}
func TestGaugeNormalMerge(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()
batcher, record := test.NewAggregatorTest(export.GaugeMetricKind, profile.NumberKind, false)
first1 := profile.Random(+1)
first2 := profile.Random(+1)
first1.AddNumber(profile.NumberKind, first2)
agg1.Update(ctx, first1, record)
agg2.Update(ctx, first2, record)
agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)
t1 := agg1.Timestamp()
t2 := agg2.Timestamp()
require.True(t, t1.Before(t2))
agg1.Merge(agg2, record.Descriptor())
require.Equal(t, t2, agg1.Timestamp(), "Merged timestamp - non-monotonic")
require.Equal(t, first2, agg1.AsNumber(), "Merged value - non-monotonic")
})
}
func TestGaugeMonotonicMerge(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()
batcher, record := test.NewAggregatorTest(export.GaugeMetricKind, profile.NumberKind, true)
first1 := profile.Random(+1)
agg1.Update(ctx, first1, record)
first2 := profile.Random(+1)
first2.AddNumber(profile.NumberKind, first1)
agg2.Update(ctx, first2, record)
agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)
agg1.Merge(agg2, record.Descriptor())
require.Equal(t, first2, agg1.AsNumber(), "Merged value - monotonic")
require.Equal(t, agg2.Timestamp(), agg1.Timestamp(), "Merged timestamp - monotonic")
})
}

View File

@ -25,8 +25,8 @@ type (
// Aggregator aggregates measure events, keeping only the max,
// sum, and count.
Aggregator struct {
live state
save state
current state
checkpoint state
}
state struct {
@ -45,30 +45,34 @@ func New() *Aggregator {
// Sum returns the accumulated sum as a Number.
func (c *Aggregator) Sum() core.Number {
return c.save.sum
return c.checkpoint.sum
}
// Count returns the accumulated count.
func (c *Aggregator) Count() int64 {
return int64(c.save.count.AsUint64())
return int64(c.checkpoint.count.AsUint64())
}
// Max returns the accumulated max as a Number.
func (c *Aggregator) Max() core.Number {
return c.save.max
return c.checkpoint.max
}
// Collect saves the current value (atomically) and exports it.
// Collect checkpoints the current value (atomically) and exports it.
func (c *Aggregator) Collect(ctx context.Context, rec export.MetricRecord, exp export.MetricBatcher) {
// N.B. There is no atomic operation that can update all three
// values at once, so there are races between Update() and
// Collect(). Therefore, atomically swap fields independently,
// knowing that individually the three parts of this aggregation
// could be spread across multiple collections in rare cases.
// values at once without a memory allocation.
//
// This aggregator is intended to trade this correctness for
// speed.
//
// Therefore, atomically swap fields independently, knowing
// that individually the three parts of this aggregation could
// be spread across multiple collections in rare cases.
c.save.count.SetUint64(c.live.count.SwapUint64Atomic(0))
c.save.sum = c.live.sum.SwapNumberAtomic(core.Number(0))
c.save.max = c.live.max.SwapNumberAtomic(core.Number(0))
c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0))
c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0))
c.checkpoint.max = c.current.max.SwapNumberAtomic(core.Number(0))
exp.Export(ctx, rec, c)
}
@ -83,17 +87,32 @@ func (c *Aggregator) Update(_ context.Context, number core.Number, rec export.Me
return
}
c.live.count.AddUint64Atomic(1)
c.live.sum.AddNumberAtomic(kind, number)
c.current.count.AddUint64Atomic(1)
c.current.sum.AddNumberAtomic(kind, number)
for {
current := c.live.max.AsNumberAtomic()
current := c.current.max.AsNumberAtomic()
if number.CompareNumber(kind, current) <= 0 {
break
}
if c.live.max.CompareAndSwapNumber(current, number) {
if c.current.max.CompareAndSwapNumber(current, number) {
break
}
}
}
func (c *Aggregator) Merge(oa export.MetricAggregator, desc *export.Descriptor) {
o, _ := oa.(*Aggregator)
if o == nil {
// TODO warn
return
}
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(core.Uint64NumberKind, o.checkpoint.count)
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
c.checkpoint.max.SetNumber(o.checkpoint.max)
}
}

View File

@ -50,10 +50,52 @@ func TestMaxSumCountAbsolute(t *testing.T) {
agg.Sum().CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - absolute")
require.Equal(t, all.Count(), agg.Count(), "Same sum - absolute")
require.Equal(t, all.Count(), agg.Count(), "Same count - absolute")
require.Equal(t,
all[len(all)-1],
agg.Max(),
"Same sum - absolute")
"Same max - absolute")
})
}
func TestMaxSumCountMerge(t *testing.T) {
ctx := context.Background()
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false)
agg1 := New()
agg2 := New()
var all test.Numbers
for i := 0; i < count; i++ {
x := profile.Random(+1)
all = append(all, x)
agg1.Update(ctx, x, record)
}
for i := 0; i < count; i++ {
x := profile.Random(+1)
all = append(all, x)
agg2.Update(ctx, x, record)
}
agg1.Collect(ctx, record, batcher)
agg2.Collect(ctx, record, batcher)
agg1.Merge(agg2, record.Descriptor())
all.Sort()
require.InEpsilon(t,
all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
agg1.Sum().CoerceToFloat64(profile.NumberKind),
0.000000001,
"Same sum - absolute")
require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute")
require.Equal(t,
all[len(all)-1],
agg1.Max(),
"Same max - absolute")
})
}

View File

@ -85,9 +85,19 @@ func RunProfiles(t *testing.T, f func(*testing.T, Profile)) {
type Numbers []core.Number
func (n *Numbers) Sort() {
sort.Slice(*n, func(i, j int) bool {
return (*n)[i] < (*n)[j]
})
sort.Sort(n)
}
func (n *Numbers) Less(i, j int) bool {
return (*n)[i] < (*n)[j]
}
func (n *Numbers) Len() int {
return len(*n)
}
func (n *Numbers) Swap(i, j int) {
(*n)[i], (*n)[j] = (*n)[j], (*n)[i]
}
func (n *Numbers) Sum(kind core.NumberKind) core.Number {
@ -101,3 +111,30 @@ func (n *Numbers) Sum(kind core.NumberKind) core.Number {
func (n *Numbers) Count() int64 {
return int64(len(*n))
}
func (n *Numbers) Median(kind core.NumberKind) core.Number {
if !sort.IsSorted(n) {
panic("Sort these numbers before calling Median")
}
l := len(*n)
if l%2 == 1 {
return (*n)[l/2]
}
lower := (*n)[l/2-1]
upper := (*n)[l/2]
sum := lower
sum.AddNumber(kind, upper)
switch kind {
case core.Uint64NumberKind:
return core.NewUint64Number(sum.AsUint64() / 2)
case core.Int64NumberKind:
return core.NewInt64Number(sum.AsInt64() / 2)
case core.Float64NumberKind:
return core.NewFloat64Number(sum.AsFloat64() / 2)
}
panic("unknown number kind")
}