From 9a2484c373d7e3c25383545bc4514eb83cabf41a Mon Sep 17 00:00:00 2001 From: ET Date: Mon, 25 Nov 2019 09:51:49 -0800 Subject: [PATCH] Implement support for NonAbsolute Measurement MaxSumCount (#335) * Add tests for nonabsolute and varying sign values * Implement support for NonAbsolute Measurement MaxSumCount Previously, the MaxSumCount aggregator failed to work correctly with negative numbers (e.g. MeasureKind Alternate()==true). * Pass NumberKind to MaxSumCount New() function Allows it to set the initial state (current.max) to the correct value based on the NumberKind. * Revert extraneous local change * Pass full descriptor to msc New() This is analagous to the DDSketch New() constructor * Remember to run make precommit first * Add tests for empty checkpoint of MaxSumCount aggregator An empty checkpoint should have Sum() == 0, Count() == 0 and Max() still equal to the numberKind.Minimum() * Return ErrEmptyDataSet if no value set by the aggregator Remove TODO from stdout exporter to ensure that if a maxsumcount or ddsketch aggregator returns ErrEmptyDataSet from Max(), then the entire record will be skipped by the exporter. Added tests to ensure the exporter doesn't send any updates for EmptyDataSet checkpoints - for both ddsketch and maxsumcount. * Relayout Aggreggator struct to ensure int64s are 8-byte aligned On 32-bit architectures, Go only guarantees that primitive values are aligned to a 4 byte boundary. Atomic operations on 32-bit machines require 8-byte alignment. See https://github.com/golang/go/issues/599 * Addressing PR comments The use of Minimum() for the default uninitialized Maximum value means that in the unlikely condition that every recorded value for a measure is equal to the same NumberKind.Minimum(), then the aggregator's Max() will return ErrEmptyDataSet * Fix PR merge issue --- api/core/number.go | 15 ++ exporter/metric/stdout/stdout.go | 12 +- exporter/metric/stdout/stdout_test.go | 35 +++-- sdk/metric/aggregator/ddsketch/ddsketch.go | 2 +- sdk/metric/aggregator/maxsumcount/msc.go | 25 ++- sdk/metric/aggregator/maxsumcount/msc_test.go | 148 +++++++++++++----- sdk/metric/benchmark_test.go | 2 +- sdk/metric/selector/simple/simple.go | 2 +- 8 files changed, 176 insertions(+), 65 deletions(-) diff --git a/api/core/number.go b/api/core/number.go index 9881fb6f0..ce5d18069 100644 --- a/api/core/number.go +++ b/api/core/number.go @@ -34,6 +34,21 @@ const ( Uint64NumberKind ) +// Minimum returns the minimum representable value +// for a given NumberKind +func (k NumberKind) Minimum() Number { + switch k { + case Int64NumberKind: + return NewInt64Number(math.MinInt64) + case Float64NumberKind: + return NewFloat64Number(-1. * math.MaxFloat64) + case Uint64NumberKind: + return NewUint64Number(0) + default: + return Number(0) + } +} + // Number represents either an integral or a floating point value. It // needs to be accompanied with a source of NumberKind that describes // the actual type of the value stored within Number. diff --git a/exporter/metric/stdout/stdout.go b/exporter/metric/stdout/stdout.go index 7a91959e2..622b3cd41 100644 --- a/exporter/metric/stdout/stdout.go +++ b/exporter/metric/stdout/stdout.go @@ -130,13 +130,13 @@ func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) expose.Count = count } - // TODO: Should tolerate ErrEmptyDataSet here, - // just like ErrNoLastValue below, since - // there's a race condition between creating - // the Aggregator and updating the first - // value. - if max, err := msc.Max(); err != nil { + if err == aggregator.ErrEmptyDataSet { + // This is a special case, indicates an aggregator that + // was checkpointed before its first value was set. + return + } + aggError = err expose.Max = "NaN" } else { diff --git a/exporter/metric/stdout/stdout_test.go b/exporter/metric/stdout/stdout_test.go index 3865bbaef..803091ba9 100644 --- a/exporter/metric/stdout/stdout_test.go +++ b/exporter/metric/stdout/stdout_test.go @@ -162,7 +162,7 @@ func TestStdoutMaxSumCount(t *testing.T) { checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false) - magg := maxsumcount.New() + magg := maxsumcount.New(desc) aggtest.CheckedUpdate(fix.t, magg, core.NewFloat64Number(123.456), desc) aggtest.CheckedUpdate(fix.t, magg, core.NewFloat64Number(876.543), desc) magg.Checkpoint(fix.ctx, desc) @@ -220,23 +220,30 @@ func TestStdoutMeasureFormat(t *testing.T) { }`, fix.Output()) } -func TestStdoutAggError(t *testing.T) { - fix := newFixture(t, stdout.Options{}) - - checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) - +func TestStdoutEmptyDataSet(t *testing.T) { desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false) - magg := ddsketch.New(ddsketch.NewDefaultConfig(), desc) - magg.Checkpoint(fix.ctx, desc) + for name, tc := range map[string]export.Aggregator{ + "ddsketch": ddsketch.New(ddsketch.NewDefaultConfig(), desc), + "maxsumcount": maxsumcount.New(desc), + } { + tc := tc + t.Run(name, func(t *testing.T) { + t.Parallel() - checkpointSet.Add(desc, magg) + fix := newFixture(t, stdout.Options{}) - err := fix.exporter.Export(fix.ctx, checkpointSet) + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) - // An error is returned and NaN values are printed. - require.Error(t, err) - require.Equal(t, aggregator.ErrEmptyDataSet, err) - require.Equal(t, `{"updates":[{"name":"test.name","max":"NaN","sum":0,"count":0,"quantiles":[{"q":0.5,"v":"NaN"},{"q":0.9,"v":"NaN"},{"q":0.99,"v":"NaN"}]}]}`, fix.Output()) + magg := tc + magg.Checkpoint(fix.ctx, desc) + + checkpointSet.Add(desc, magg) + + fix.Export(checkpointSet) + + require.Equal(t, `{"updates":null}`, fix.Output()) + }) + } } func TestStdoutGaugeNotSet(t *testing.T) { diff --git a/sdk/metric/aggregator/ddsketch/ddsketch.go b/sdk/metric/aggregator/ddsketch/ddsketch.go index b07c0fd13..910c9f34d 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch.go @@ -75,7 +75,7 @@ func (c *Aggregator) Max() (core.Number, error) { return c.Quantile(1) } -// Min returns the mininum value in the checkpoint. +// Min returns the minimum value in the checkpoint. func (c *Aggregator) Min() (core.Number, error) { return c.Quantile(0) } diff --git a/sdk/metric/aggregator/maxsumcount/msc.go b/sdk/metric/aggregator/maxsumcount/msc.go index 464feb0fb..ce32e9a01 100644 --- a/sdk/metric/aggregator/maxsumcount/msc.go +++ b/sdk/metric/aggregator/maxsumcount/msc.go @@ -28,6 +28,7 @@ type ( Aggregator struct { current state checkpoint state + kind core.NumberKind } state struct { @@ -50,8 +51,15 @@ var _ aggregator.MaxSumCount = &Aggregator{} // atomic operations, which introduces the possibility that // checkpoints are inconsistent. For greater consistency and lower // performance, consider using Array or DDSketch aggregators. -func New() *Aggregator { - return &Aggregator{} +func New(desc *export.Descriptor) *Aggregator { + return &Aggregator{ + kind: desc.NumberKind(), + current: unsetMaxSumCount(desc.NumberKind()), + } +} + +func unsetMaxSumCount(kind core.NumberKind) state { + return state{max: kind.Minimum()} } // Sum returns the sum of values in the checkpoint. @@ -65,7 +73,16 @@ func (c *Aggregator) Count() (int64, error) { } // Max returns the maximum value in the checkpoint. +// The error value aggregator.ErrEmptyDataSet will be returned if +// (due to a race condition) the checkpoint was set prior to the +// current.max being computed in Update(). +// +// Note: If a measure's recorded values for a given checkpoint are +// all equal to NumberKind.Minimum(), Max() will return ErrEmptyDataSet func (c *Aggregator) Max() (core.Number, error) { + if c.checkpoint.max == c.kind.Minimum() { + return core.Number(0), aggregator.ErrEmptyDataSet + } return c.checkpoint.max, nil } @@ -73,7 +90,7 @@ func (c *Aggregator) Max() (core.Number, error) { // the empty set. Since no locks are taken, there is a chance that // the independent Max, Sum, and Count are not consistent with each // other. -func (c *Aggregator) Checkpoint(ctx context.Context, _ *export.Descriptor) { +func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { // N.B. There is no atomic operation that can update all three // values at once without a memory allocation. // @@ -86,7 +103,7 @@ func (c *Aggregator) Checkpoint(ctx context.Context, _ *export.Descriptor) { c.checkpoint.count.SetUint64(c.current.count.SwapUint64Atomic(0)) c.checkpoint.sum = c.current.sum.SwapNumberAtomic(core.Number(0)) - c.checkpoint.max = c.current.max.SwapNumberAtomic(core.Number(0)) + c.checkpoint.max = c.current.max.SwapNumberAtomic(c.kind.Minimum()) } // Update adds the recorded measurement to the current data set. diff --git a/sdk/metric/aggregator/maxsumcount/msc_test.go b/sdk/metric/aggregator/maxsumcount/msc_test.go index 461b7f783..a9d43a826 100644 --- a/sdk/metric/aggregator/maxsumcount/msc_test.go +++ b/sdk/metric/aggregator/maxsumcount/msc_test.go @@ -16,65 +16,114 @@ package maxsumcount import ( "context" + "math" + "math/rand" "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/api/core" export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" "go.opentelemetry.io/otel/sdk/metric/aggregator/test" ) const count = 100 +type policy struct { + name string + absolute bool + sign func() int +} + +var ( + positiveOnly = policy{ + name: "absolute", + absolute: true, + sign: func() int { return +1 }, + } + negativeOnly = policy{ + name: "negative", + absolute: false, + sign: func() int { return -1 }, + } + positiveAndNegative = policy{ + name: "positiveAndNegative", + absolute: false, + sign: func() int { + if rand.Uint32() > math.MaxUint32/2 { + return -1 + } + return 1 + }, + } +) + func TestMaxSumCountAbsolute(t *testing.T) { - ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { - record := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) - - agg := New() - - all := test.NewNumbers(profile.NumberKind) - - for i := 0; i < count; i++ { - x := profile.Random(+1) - all.Append(x) - test.CheckedUpdate(t, agg, x, record) - } - - agg.Checkpoint(ctx, record) - - all.Sort() - - asum, err := agg.Sum() - require.InEpsilon(t, - all.Sum().CoerceToFloat64(profile.NumberKind), - asum.CoerceToFloat64(profile.NumberKind), - 0.000000001, - "Same sum - absolute") - require.Nil(t, err) - - count, err := agg.Count() - require.Equal(t, all.Count(), count, "Same count - absolute") - require.Nil(t, err) - - max, err := agg.Max() - require.Nil(t, err) - require.Equal(t, - all.Max(), - max, - "Same max - absolute") + maxSumCount(t, profile, positiveOnly) }) } +func TestMaxSumCountNegativeOnly(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + maxSumCount(t, profile, negativeOnly) + }) +} + +func TestMaxSumCountPositiveAndNegative(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + maxSumCount(t, profile, positiveAndNegative) + }) +} + +// Validates max, sum and count for a given profile and policy +func maxSumCount(t *testing.T, profile test.Profile, policy policy) { + ctx := context.Background() + descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, !policy.absolute) + + agg := New(descriptor) + + all := test.NewNumbers(profile.NumberKind) + + for i := 0; i < count; i++ { + x := profile.Random(policy.sign()) + all.Append(x) + test.CheckedUpdate(t, agg, x, descriptor) + } + + agg.Checkpoint(ctx, descriptor) + + all.Sort() + + asum, err := agg.Sum() + require.InEpsilon(t, + all.Sum().CoerceToFloat64(profile.NumberKind), + asum.CoerceToFloat64(profile.NumberKind), + 0.000000001, + "Same sum - "+policy.name) + require.Nil(t, err) + + count, err := agg.Count() + require.Equal(t, all.Count(), count, "Same count -"+policy.name) + require.Nil(t, err) + + max, err := agg.Max() + require.Nil(t, err) + require.Equal(t, + all.Max(), + max, + "Same max -"+policy.name) +} + func TestMaxSumCountMerge(t *testing.T) { ctx := context.Background() test.RunProfiles(t, func(t *testing.T, profile test.Profile) { descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) - agg1 := New() - agg2 := New() + agg1 := New(descriptor) + agg2 := New(descriptor) all := test.NewNumbers(profile.NumberKind) @@ -116,3 +165,26 @@ func TestMaxSumCountMerge(t *testing.T) { "Same max - absolute") }) } + +func TestMaxSumCountNotSet(t *testing.T) { + ctx := context.Background() + + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + descriptor := test.NewAggregatorTest(export.MeasureKind, profile.NumberKind, false) + + agg := New(descriptor) + agg.Checkpoint(ctx, descriptor) + + asum, err := agg.Sum() + require.Equal(t, core.Number(0), asum, "Empty checkpoint sum = 0") + require.Nil(t, err) + + count, err := agg.Count() + require.Equal(t, int64(0), count, "Empty checkpoint count = 0") + require.Nil(t, err) + + max, err := agg.Max() + require.Equal(t, aggregator.ErrEmptyDataSet, err) + require.Equal(t, core.Number(0), max) + }) +} diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 71d4e7552..7cace1c2e 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -54,7 +54,7 @@ func (*benchFixture) AggregatorFor(descriptor *export.Descriptor) export.Aggrega return gauge.New() case export.MeasureKind: if strings.HasSuffix(descriptor.Name(), "maxsumcount") { - return maxsumcount.New() + return maxsumcount.New(descriptor) } else if strings.HasSuffix(descriptor.Name(), "ddsketch") { return ddsketch.New(ddsketch.NewDefaultConfig(), descriptor) } else if strings.HasSuffix(descriptor.Name(), "array") { diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index 2bf9fa67b..2276b2a05 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -71,7 +71,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *export.Descriptor) export.A case export.GaugeKind: return gauge.New() case export.MeasureKind: - return maxsumcount.New() + return maxsumcount.New(descriptor) default: return counter.New() }