diff --git a/api/core/number.go b/api/core/number.go index f4af83046..c68fa6b51 100644 --- a/api/core/number.go +++ b/api/core/number.go @@ -507,6 +507,8 @@ func (n Number) CompareInt64(i int64) int { // returns the typical result of the compare function: -1 if the value // is less than the other, 0 if both are equal, 1 if the value is // greater than the other. +// +// Do not compare NaN values. func (n Number) CompareFloat64(f float64) int { this := n.AsFloat64() if this < f { diff --git a/sdk/metric/aggregator/array/array.go b/sdk/metric/aggregator/array/array.go new file mode 100644 index 000000000..ce5ceddc9 --- /dev/null +++ b/sdk/metric/aggregator/array/array.go @@ -0,0 +1,191 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package array // import "go.opentelemetry.io/otel/sdk/metric/aggregator/array" + +import ( + "context" + "math" + "sort" + "sync" + "unsafe" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/sdk/export" + "go.opentelemetry.io/otel/sdk/metric/aggregator" +) + +type ( + Aggregator struct { + lock sync.Mutex + current Points + checkpoint Points + ckptSum core.Number + } + + Points []core.Number +) + +var _ export.MetricAggregator = &Aggregator{} + +func New() *Aggregator { + return &Aggregator{} +} + +// Sum returns the sum of the checkpoint. +func (c *Aggregator) Sum() core.Number { + return c.ckptSum +} + +// Count returns the count of the checkpoint. +func (c *Aggregator) Count() int64 { + return int64(len(c.checkpoint)) +} + +// Max returns the max of the checkpoint. +func (c *Aggregator) Max() (core.Number, error) { + return c.checkpoint.Quantile(1) +} + +// Min returns the min of the checkpoint. +func (c *Aggregator) Min() (core.Number, error) { + return c.checkpoint.Quantile(0) +} + +// Quantile returns the estimated quantile of the checkpoint. +func (c *Aggregator) Quantile(q float64) (core.Number, error) { + return c.checkpoint.Quantile(q) +} + +func (c *Aggregator) Collect(ctx context.Context, rec export.MetricRecord, exp export.MetricBatcher) { + c.lock.Lock() + c.checkpoint, c.current = c.current, nil + c.lock.Unlock() + + desc := rec.Descriptor() + kind := desc.NumberKind() + + c.sort(kind) + + c.ckptSum = core.Number(0) + + for _, v := range c.checkpoint { + c.ckptSum.AddNumber(kind, v) + } + + exp.Export(ctx, rec, c) +} + +func (c *Aggregator) Update(_ context.Context, number core.Number, rec export.MetricRecord) { + desc := rec.Descriptor() + kind := desc.NumberKind() + + if kind == core.Float64NumberKind && math.IsNaN(number.AsFloat64()) { + // TODO warn + // NOTE: add this to the specification. + return + } + + if !desc.Alternate() && number.IsNegative(kind) { + // TODO warn + return + } + + c.lock.Lock() + c.current = append(c.current, number) + c.lock.Unlock() +} + +func (c *Aggregator) Merge(oa export.MetricAggregator, desc *export.Descriptor) { + o, _ := oa.(*Aggregator) + if o == nil { + // TODO warn + return + } + + c.ckptSum.AddNumber(desc.NumberKind(), o.ckptSum) + c.checkpoint = combine(c.checkpoint, o.checkpoint, desc.NumberKind()) +} + +func (c *Aggregator) sort(kind core.NumberKind) { + switch kind { + case core.Float64NumberKind: + sort.Float64s(*(*[]float64)(unsafe.Pointer(&c.checkpoint))) + + case core.Int64NumberKind: + sort.Sort(&c.checkpoint) + + default: + // NOTE: This can't happen because the SDK doesn't + // support uint64-kind metric instruments. + panic("Impossible case") + } +} + +func combine(a, b Points, kind core.NumberKind) Points { + result := make(Points, 0, len(a)+len(b)) + + for len(a) != 0 && len(b) != 0 { + if a[0].CompareNumber(kind, b[0]) < 0 { + result = append(result, a[0]) + a = a[1:] + } else { + result = append(result, b[0]) + b = b[1:] + } + } + result = append(result, a...) + result = append(result, b...) + return result +} + +func (p *Points) Len() int { + return len(*p) +} + +func (p *Points) Less(i, j int) bool { + // Note this is specialized for int64, because float64 is + // handled by `sort.Float64s` and uint64 numbers never appear + // in this data. + return int64((*p)[i]) < int64((*p)[j]) +} + +func (p *Points) Swap(i, j int) { + (*p)[i], (*p)[j] = (*p)[j], (*p)[i] +} + +// Quantile returns the least X such that Pr(x=q, where X is an +// element of the data set. +func (p *Points) Quantile(q float64) (core.Number, error) { + if len(*p) == 0 { + return core.Number(0), aggregator.ErrEmptyDataSet + } + + if q < 0 || q > 1 { + return core.Number(0), aggregator.ErrInvalidQuantile + } + + if q == 0 || len(*p) == 1 { + return (*p)[0], nil + } else if q == 1 { + return (*p)[len(*p)-1], nil + } + + // Note: There's no interpolation being done here. There are + // many definitions for "quantile", some interpolate, some do + // not. What is expected? + position := float64(len(*p)-1) * q + ceil := int(math.Ceil(position)) + return (*p)[ceil], nil +} diff --git a/sdk/metric/aggregator/array/array_test.go b/sdk/metric/aggregator/array/array_test.go new file mode 100644 index 000000000..555f0a114 --- /dev/null +++ b/sdk/metric/aggregator/array/array_test.go @@ -0,0 +1,297 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package array + +import ( + "context" + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/sdk/export" + "go.opentelemetry.io/otel/sdk/metric/aggregator" + "go.opentelemetry.io/otel/sdk/metric/aggregator/test" +) + +type updateTest struct { + count int + absolute bool +} + +func (ut *updateTest) run(t *testing.T, profile test.Profile) { + ctx := context.Background() + + batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, !ut.absolute) + + agg := New() + + all := test.NewNumbers(profile.NumberKind) + + for i := 0; i < ut.count; i++ { + x := profile.Random(+1) + all.Append(x) + agg.Update(ctx, x, record) + + if !ut.absolute { + y := profile.Random(-1) + all.Append(y) + agg.Update(ctx, y, record) + } + } + + agg.Collect(ctx, record, batcher) + + all.Sort() + + require.InEpsilon(t, + all.Sum().CoerceToFloat64(profile.NumberKind), + agg.Sum().CoerceToFloat64(profile.NumberKind), + 0.0000001, + "Same sum - absolute") + require.Equal(t, all.Count(), agg.Count(), "Same count - absolute") + + min, err := agg.Min() + require.Nil(t, err) + require.Equal(t, all.Min(), min, "Same min - absolute") + + max, err := agg.Max() + require.Nil(t, err) + require.Equal(t, all.Max(), max, "Same max - absolute") + + qx, err := agg.Quantile(0.5) + require.Nil(t, err) + require.Equal(t, all.Median(), qx, "Same median - absolute") +} + +func TestArrayUpdate(t *testing.T) { + // Test with an odd an even number of measurements + for count := 999; count <= 1000; count++ { + t.Run(fmt.Sprint("Odd=", count%2 == 1), func(t *testing.T) { + // Test absolute and non-absolute + for _, absolute := range []bool{false, true} { + t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) { + ut := updateTest{ + count: count, + absolute: absolute, + } + + // Test integer and floating point + test.RunProfiles(t, ut.run) + }) + } + }) + } +} + +type mergeTest struct { + count int + absolute bool +} + +func (mt *mergeTest) run(t *testing.T, profile test.Profile) { + ctx := context.Background() + + batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, !mt.absolute) + + agg1 := New() + agg2 := New() + + all := test.NewNumbers(profile.NumberKind) + + for i := 0; i < mt.count; i++ { + x1 := profile.Random(+1) + all.Append(x1) + agg1.Update(ctx, x1, record) + + x2 := profile.Random(+1) + all.Append(x2) + agg2.Update(ctx, x2, record) + + if !mt.absolute { + y1 := profile.Random(-1) + all.Append(y1) + agg1.Update(ctx, y1, record) + + y2 := profile.Random(-1) + all.Append(y2) + agg2.Update(ctx, y2, record) + } + } + + agg1.Collect(ctx, record, batcher) + agg2.Collect(ctx, record, batcher) + + agg1.Merge(agg2, record.Descriptor()) + + all.Sort() + + require.InEpsilon(t, + all.Sum().CoerceToFloat64(profile.NumberKind), + agg1.Sum().CoerceToFloat64(profile.NumberKind), + 0.0000001, + "Same sum - absolute") + require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute") + + min, err := agg1.Min() + require.Nil(t, err) + require.Equal(t, all.Min(), min, "Same min - absolute") + + max, err := agg1.Max() + require.Nil(t, err) + require.Equal(t, all.Max(), max, "Same max - absolute") + + qx, err := agg1.Quantile(0.5) + require.Nil(t, err) + require.Equal(t, all.Median(), qx, "Same median - absolute") +} + +func TestArrayMerge(t *testing.T) { + // Test with an odd an even number of measurements + for count := 999; count <= 1000; count++ { + t.Run(fmt.Sprint("Odd=", count%2 == 1), func(t *testing.T) { + // Test absolute and non-absolute + for _, absolute := range []bool{false, true} { + t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) { + mt := mergeTest{ + count: count, + absolute: absolute, + } + + // Test integer and floating point + test.RunProfiles(t, mt.run) + }) + } + }) + } +} + +func TestArrayErrors(t *testing.T) { + test.RunProfiles(t, func(t *testing.T, profile test.Profile) { + agg := New() + + _, err := agg.Max() + require.Error(t, err) + require.Equal(t, err, aggregator.ErrEmptyDataSet) + + _, err = agg.Min() + require.Error(t, err) + require.Equal(t, err, aggregator.ErrEmptyDataSet) + + _, err = agg.Quantile(0.1) + require.Error(t, err) + require.Equal(t, err, aggregator.ErrEmptyDataSet) + + ctx := context.Background() + + batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false) + + agg.Update(ctx, core.Number(0), record) + + if profile.NumberKind == core.Float64NumberKind { + agg.Update(ctx, core.NewFloat64Number(math.NaN()), record) + } + agg.Collect(ctx, record, batcher) + + require.Equal(t, int64(1), agg.Count(), "NaN value was not counted") + + num, err := agg.Quantile(0) + require.Nil(t, err) + require.Equal(t, num, core.Number(0)) + + _, err = agg.Quantile(-0.0001) + require.Error(t, err) + require.Equal(t, err, aggregator.ErrInvalidQuantile) + + _, err = agg.Quantile(1.0001) + require.Error(t, err) + require.Equal(t, err, aggregator.ErrInvalidQuantile) + }) +} + +func TestArrayFloat64(t *testing.T) { + for _, absolute := range []bool{false, true} { + t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) { + batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, core.Float64NumberKind, !absolute) + + fpsf := func(sign int) []float64 { + // Check behavior of a bunch of odd floating + // points except for NaN, which is invalid. + return []float64{ + 0, + math.Inf(sign), + 1 / math.Inf(sign), + 1, + 2, + 1e100, + math.MaxFloat64, + math.SmallestNonzeroFloat64, + math.MaxFloat32, + math.SmallestNonzeroFloat32, + math.E, + math.Pi, + math.Phi, + math.Sqrt2, + math.SqrtE, + math.SqrtPi, + math.SqrtPhi, + math.Ln2, + math.Log2E, + math.Ln10, + math.Log10E, + } + } + + all := test.NewNumbers(core.Float64NumberKind) + + ctx := context.Background() + agg := New() + + for _, f := range fpsf(1) { + all.Append(core.NewFloat64Number(f)) + agg.Update(ctx, core.NewFloat64Number(f), record) + } + + if !absolute { + for _, f := range fpsf(-1) { + all.Append(core.NewFloat64Number(f)) + agg.Update(ctx, core.NewFloat64Number(f), record) + } + } + + agg.Collect(ctx, record, batcher) + + all.Sort() + + require.InEpsilon(t, all.Sum().AsFloat64(), agg.Sum().AsFloat64(), 0.0000001, "Same sum") + + require.Equal(t, all.Count(), agg.Count(), "Same count") + + min, err := agg.Min() + require.Nil(t, err) + require.Equal(t, all.Min(), min, "Same min") + + max, err := agg.Max() + require.Nil(t, err) + require.Equal(t, all.Max(), max, "Same max") + + qx, err := agg.Quantile(0.5) + require.Nil(t, err) + require.Equal(t, all.Median(), qx, "Same median") + }) + } +} diff --git a/sdk/metric/aggregator/counter/counter.go b/sdk/metric/aggregator/counter/counter.go index 9254fe1cd..d58a2ca90 100644 --- a/sdk/metric/aggregator/counter/counter.go +++ b/sdk/metric/aggregator/counter/counter.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package counter +package counter // import "go.opentelemetry.io/otel/sdk/metric/aggregator/counter" import ( "context" diff --git a/sdk/metric/aggregator/ddsketch/ddsketch.go b/sdk/metric/aggregator/ddsketch/ddsketch.go index 9bf40fe83..cc4f55956 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch.go @@ -11,16 +11,18 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package ddsketch +package ddsketch // import "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch" import ( "context" + "math" "sync" sdk "github.com/DataDog/sketches-go/ddsketch" "go.opentelemetry.io/otel/api/core" "go.opentelemetry.io/otel/sdk/export" + "go.opentelemetry.io/otel/sdk/metric/aggregator" ) // Aggregator aggregates measure events. @@ -44,13 +46,17 @@ func New(cfg *sdk.Config, desc *export.Descriptor) *Aggregator { } // NewDefaultConfig returns a new, default DDSketch config. +// +// TODO: The Config constructor should probably set minValue to -Inf +// to aggregate metrics with absolute=false. This requires providing values +// for alpha and maxNumBins func NewDefaultConfig() *sdk.Config { return sdk.NewDefaultConfig() } // Sum returns the sum of the checkpoint. -func (c *Aggregator) Sum() float64 { - return c.checkpoint.Sum() +func (c *Aggregator) Sum() core.Number { + return c.toNumber(c.checkpoint.Sum()) } // Count returns the count of the checkpoint. @@ -59,18 +65,29 @@ func (c *Aggregator) Count() int64 { } // Max returns the max of the checkpoint. -func (c *Aggregator) Max() float64 { - return c.checkpoint.Quantile(1) +func (c *Aggregator) Max() (core.Number, error) { + return c.Quantile(1) } // Min returns the min of the checkpoint. -func (c *Aggregator) Min() float64 { - return c.checkpoint.Quantile(0) +func (c *Aggregator) Min() (core.Number, error) { + return c.Quantile(0) } // Quantile returns the estimated quantile of the checkpoint. -func (c *Aggregator) Quantile(q float64) float64 { - return c.checkpoint.Quantile(q) +func (c *Aggregator) Quantile(q float64) (core.Number, error) { + f := c.checkpoint.Quantile(q) + if math.IsNaN(f) { + return core.Number(0), aggregator.ErrInvalidQuantile + } + return c.toNumber(f), nil +} + +func (c *Aggregator) toNumber(f float64) core.Number { + if c.kind == core.Float64NumberKind { + return core.NewFloat64Number(f) + } + return core.NewInt64Number(int64(f)) } // Collect checkpoints the current value (atomically) and exports it. diff --git a/sdk/metric/aggregator/ddsketch/ddsketch_test.go b/sdk/metric/aggregator/ddsketch/ddsketch_test.go index 993482cfd..a56e45af6 100644 --- a/sdk/metric/aggregator/ddsketch/ddsketch_test.go +++ b/sdk/metric/aggregator/ddsketch/ddsketch_test.go @@ -16,6 +16,7 @@ package ddsketch import ( "context" + "fmt" "testing" "github.com/stretchr/testify/require" @@ -24,90 +25,147 @@ import ( "go.opentelemetry.io/otel/sdk/metric/aggregator/test" ) -const count = 100 +const count = 1000 -// N.B. DDSketch only supports absolute measures +type updateTest struct { + absolute bool +} -func TestDDSketchAbsolute(t *testing.T) { +func (ut *updateTest) run(t *testing.T, profile test.Profile) { + ctx := context.Background() + batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, !ut.absolute) + + agg := New(NewDefaultConfig(), record.Descriptor()) + + all := test.NewNumbers(profile.NumberKind) + for i := 0; i < count; i++ { + x := profile.Random(+1) + all.Append(x) + agg.Update(ctx, x, record) + + if !ut.absolute { + y := profile.Random(-1) + all.Append(y) + agg.Update(ctx, y, record) + } + } + + agg.Collect(ctx, record, batcher) + + all.Sort() + + require.InDelta(t, + all.Sum().CoerceToFloat64(profile.NumberKind), + agg.Sum().CoerceToFloat64(profile.NumberKind), + 1, + "Same sum - absolute") + require.Equal(t, all.Count(), agg.Count(), "Same count - absolute") + + max, err := agg.Max() + require.Nil(t, err) + require.Equal(t, + all.Max(), + max, + "Same max - absolute") + + median, err := agg.Quantile(0.5) + require.Nil(t, err) + require.InDelta(t, + all.Median().CoerceToFloat64(profile.NumberKind), + median.CoerceToFloat64(profile.NumberKind), + 10, + "Same median - absolute") +} + +func TestDDSketchUpdate(t *testing.T) { + // Test absolute and non-absolute + for _, absolute := range []bool{false, true} { + t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) { + ut := updateTest{ + absolute: absolute, + } + // Test integer and floating point + test.RunProfiles(t, ut.run) + }) + } +} + +type mergeTest struct { + absolute bool +} + +func (mt *mergeTest) run(t *testing.T, profile test.Profile) { ctx := context.Background() - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { - batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false) + batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, !mt.absolute) - agg := New(NewDefaultConfig(), record.Descriptor()) + agg1 := New(NewDefaultConfig(), record.Descriptor()) + agg2 := New(NewDefaultConfig(), record.Descriptor()) - var all test.Numbers - for i := 0; i < count; i++ { - x := profile.Random(+1) - all = append(all, x) - agg.Update(ctx, x, record) + all := test.NewNumbers(profile.NumberKind) + for i := 0; i < count; i++ { + x := profile.Random(+1) + all.Append(x) + agg1.Update(ctx, x, record) + + if !mt.absolute { + y := profile.Random(-1) + all.Append(y) + agg1.Update(ctx, y, record) } + } - agg.Collect(ctx, record, batcher) + for i := 0; i < count; i++ { + x := profile.Random(+1) + all.Append(x) + agg2.Update(ctx, x, record) - all.Sort() + if !mt.absolute { + y := profile.Random(-1) + all.Append(y) + agg2.Update(ctx, y, record) + } + } - require.InEpsilon(t, - all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind), - agg.Sum(), - 0.0000001, - "Same sum - absolute") - require.Equal(t, all.Count(), agg.Count(), "Same count - absolute") - require.Equal(t, - all[len(all)-1].CoerceToFloat64(profile.NumberKind), - agg.Max(), - "Same max - absolute") - require.InEpsilon(t, - all.Median(profile.NumberKind).CoerceToFloat64(profile.NumberKind), - agg.Quantile(0.5), - 0.1, - "Same median - absolute") - }) + agg1.Collect(ctx, record, batcher) + agg2.Collect(ctx, record, batcher) + + agg1.Merge(agg2, record.Descriptor()) + + all.Sort() + + require.InDelta(t, + all.Sum().CoerceToFloat64(profile.NumberKind), + agg1.Sum().CoerceToFloat64(profile.NumberKind), + 1, + "Same sum - absolute") + require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute") + + max, err := agg1.Max() + require.Nil(t, err) + require.Equal(t, + all.Max(), + max, + "Same max - absolute") + + median, err := agg1.Quantile(0.5) + require.Nil(t, err) + require.InDelta(t, + all.Median().CoerceToFloat64(profile.NumberKind), + median.CoerceToFloat64(profile.NumberKind), + 10, + "Same median - absolute") } func TestDDSketchMerge(t *testing.T) { - ctx := context.Background() - - test.RunProfiles(t, func(t *testing.T, profile test.Profile) { - batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false) - - agg1 := New(NewDefaultConfig(), record.Descriptor()) - agg2 := New(NewDefaultConfig(), record.Descriptor()) - - var all test.Numbers - for i := 0; i < count; i++ { - x := profile.Random(+1) - all = append(all, x) - agg1.Update(ctx, x, record) - } - - for i := 0; i < count; i++ { - x := profile.Random(+1) - all = append(all, x) - agg2.Update(ctx, x, record) - } - - agg1.Collect(ctx, record, batcher) - agg2.Collect(ctx, record, batcher) - - agg1.Merge(agg2, record.Descriptor()) - - all.Sort() - - require.InEpsilon(t, - all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind), - agg1.Sum(), - 0.0000001, - "Same sum - absolute") - require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute") - require.Equal(t, - all[len(all)-1].CoerceToFloat64(profile.NumberKind), - agg1.Max(), - "Same max - absolute") - require.InEpsilon(t, - all.Median(profile.NumberKind).CoerceToFloat64(profile.NumberKind), - agg1.Quantile(0.5), - 0.1, - "Same median - absolute") - }) + // Test absolute and non-absolute + for _, absolute := range []bool{false, true} { + t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) { + mt := mergeTest{ + absolute: absolute, + } + // Test integer and floating point + test.RunProfiles(t, mt.run) + }) + } } diff --git a/sdk/metric/aggregator/errors.go b/sdk/metric/aggregator/errors.go new file mode 100644 index 000000000..5ecffda66 --- /dev/null +++ b/sdk/metric/aggregator/errors.go @@ -0,0 +1,22 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregator + +import "fmt" + +var ( + ErrEmptyDataSet = fmt.Errorf("The result is not defined on an empty data set") + ErrInvalidQuantile = fmt.Errorf("The requested quantile is out of range") +) diff --git a/sdk/metric/aggregator/gauge/gauge.go b/sdk/metric/aggregator/gauge/gauge.go index 8ba85e99b..4ce0dbb7b 100644 --- a/sdk/metric/aggregator/gauge/gauge.go +++ b/sdk/metric/aggregator/gauge/gauge.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package gauge +package gauge // import "go.opentelemetry.io/otel/sdk/metric/aggregator/gauge" import ( "context" diff --git a/sdk/metric/aggregator/maxsumcount/msc.go b/sdk/metric/aggregator/maxsumcount/msc.go index e0c5935b7..42822707d 100644 --- a/sdk/metric/aggregator/maxsumcount/msc.go +++ b/sdk/metric/aggregator/maxsumcount/msc.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package maxsumcount +package maxsumcount // import "go.opentelemetry.io/otel/sdk/metric/aggregator/maxsumcount" import ( "context" @@ -54,8 +54,8 @@ func (c *Aggregator) Count() int64 { } // Max returns the accumulated max as a Number. -func (c *Aggregator) Max() core.Number { - return c.checkpoint.max +func (c *Aggregator) Max() (core.Number, error) { + return c.checkpoint.max, nil } // Collect checkpoints the current value (atomically) and exports it. diff --git a/sdk/metric/aggregator/maxsumcount/msc_test.go b/sdk/metric/aggregator/maxsumcount/msc_test.go index 7121c52f3..2b8ceb515 100644 --- a/sdk/metric/aggregator/maxsumcount/msc_test.go +++ b/sdk/metric/aggregator/maxsumcount/msc_test.go @@ -34,10 +34,11 @@ func TestMaxSumCountAbsolute(t *testing.T) { agg := New() - var all test.Numbers + all := test.NewNumbers(profile.NumberKind) + for i := 0; i < count; i++ { x := profile.Random(+1) - all = append(all, x) + all.Append(x) agg.Update(ctx, x, record) } @@ -46,14 +47,17 @@ func TestMaxSumCountAbsolute(t *testing.T) { all.Sort() require.InEpsilon(t, - all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind), + all.Sum().CoerceToFloat64(profile.NumberKind), agg.Sum().CoerceToFloat64(profile.NumberKind), 0.000000001, "Same sum - absolute") require.Equal(t, all.Count(), agg.Count(), "Same count - absolute") + + max, err := agg.Max() + require.Nil(t, err) require.Equal(t, - all[len(all)-1], - agg.Max(), + all.Max(), + max, "Same max - absolute") }) } @@ -67,16 +71,16 @@ func TestMaxSumCountMerge(t *testing.T) { agg1 := New() agg2 := New() - var all test.Numbers + all := test.NewNumbers(profile.NumberKind) for i := 0; i < count; i++ { x := profile.Random(+1) - all = append(all, x) + all.Append(x) agg1.Update(ctx, x, record) } for i := 0; i < count; i++ { x := profile.Random(+1) - all = append(all, x) + all.Append(x) agg2.Update(ctx, x, record) } @@ -88,14 +92,17 @@ func TestMaxSumCountMerge(t *testing.T) { all.Sort() require.InEpsilon(t, - all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind), + all.Sum().CoerceToFloat64(profile.NumberKind), agg1.Sum().CoerceToFloat64(profile.NumberKind), 0.000000001, "Same sum - absolute") require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute") + + max, err := agg1.Max() + require.Nil(t, err) require.Equal(t, - all[len(all)-1], - agg1.Max(), + all.Max(), + max, "Same max - absolute") }) } diff --git a/sdk/metric/aggregator/test/test.go b/sdk/metric/aggregator/test/test.go index 88967a661..2063435ce 100644 --- a/sdk/metric/aggregator/test/test.go +++ b/sdk/metric/aggregator/test/test.go @@ -27,24 +27,29 @@ import ( var _ export.MetricBatcher = &metricBatcher{} var _ export.MetricRecord = &metricRecord{} +const Magnitude = 1000 + type Profile struct { NumberKind core.NumberKind Random func(sign int) core.Number } -var profiles = []Profile{ - { - NumberKind: core.Int64NumberKind, - Random: func(sign int) core.Number { - return core.NewInt64Number(int64(sign) * int64(rand.Intn(100000))) +func newProfiles() []Profile { + rnd := rand.New(rand.NewSource(rand.Int63())) + return []Profile{ + { + NumberKind: core.Int64NumberKind, + Random: func(sign int) core.Number { + return core.NewInt64Number(int64(sign) * int64(rnd.Intn(Magnitude+1))) + }, }, - }, - { - NumberKind: core.Float64NumberKind, - Random: func(sign int) core.Number { - return core.NewFloat64Number(float64(sign) * rand.Float64() * 100000) + { + NumberKind: core.Float64NumberKind, + Random: func(sign int) core.Number { + return core.NewFloat64Number(float64(sign) * rnd.Float64() * Magnitude) + }, }, - }, + } } type metricBatcher struct { @@ -75,66 +80,70 @@ func (m *metricBatcher) Export(context.Context, export.MetricRecord, export.Metr } func RunProfiles(t *testing.T, f func(*testing.T, Profile)) { - for _, profile := range profiles { + for _, profile := range newProfiles() { t.Run(profile.NumberKind.String(), func(t *testing.T) { f(t, profile) }) } } -type Numbers []core.Number +type Numbers struct { + kind core.NumberKind + numbers []core.Number +} + +func NewNumbers(kind core.NumberKind) Numbers { + return Numbers{ + kind: kind, + } +} + +func (n *Numbers) Append(v core.Number) { + n.numbers = append(n.numbers, v) +} func (n *Numbers) Sort() { sort.Sort(n) } func (n *Numbers) Less(i, j int) bool { - return (*n)[i] < (*n)[j] + return n.numbers[i].CompareNumber(n.kind, n.numbers[j]) < 0 } func (n *Numbers) Len() int { - return len(*n) + return len(n.numbers) } func (n *Numbers) Swap(i, j int) { - (*n)[i], (*n)[j] = (*n)[j], (*n)[i] + n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i] } -func (n *Numbers) Sum(kind core.NumberKind) core.Number { +func (n *Numbers) Sum() core.Number { var sum core.Number - for _, num := range *n { - sum.AddNumber(kind, num) + for _, num := range n.numbers { + sum.AddNumber(n.kind, num) } return sum } func (n *Numbers) Count() int64 { - return int64(len(*n)) + return int64(len(n.numbers)) } -func (n *Numbers) Median(kind core.NumberKind) core.Number { - if !sort.IsSorted(n) { - panic("Sort these numbers before calling Median") - } - - l := len(*n) - if l%2 == 1 { - return (*n)[l/2] - } - - lower := (*n)[l/2-1] - upper := (*n)[l/2] - - sum := lower - sum.AddNumber(kind, upper) - - switch kind { - case core.Uint64NumberKind: - return core.NewUint64Number(sum.AsUint64() / 2) - case core.Int64NumberKind: - return core.NewInt64Number(sum.AsInt64() / 2) - case core.Float64NumberKind: - return core.NewFloat64Number(sum.AsFloat64() / 2) - } - panic("unknown number kind") +func (n *Numbers) Min() core.Number { + return n.numbers[0] +} + +func (n *Numbers) Max() core.Number { + return n.numbers[len(n.numbers)-1] +} + +// Median() is an alias for Quantile(0.5). +func (n *Numbers) Median() core.Number { + // Note that len(n.numbers) is 1 greater than the max element + // index, so dividing by two rounds up. This gives the + // intended definition for Quantile() in tests, which is to + // return the smallest element that is at or above the + // specified quantile. + return n.numbers[len(n.numbers)/2] } diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 39a19045c..bd8759b25 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -56,6 +56,8 @@ func (bf *benchFixture) AggregatorFor(rec export.MetricRecord) export.MetricAggr return maxsumcount.New() } else if strings.HasSuffix(rec.Descriptor().Name(), "ddsketch") { return ddsketch.New(ddsketch.NewDefaultConfig(), rec.Descriptor()) + } else if strings.HasSuffix(rec.Descriptor().Name(), "array") { + return ddsketch.New(ddsketch.NewDefaultConfig(), rec.Descriptor()) } } return nil @@ -410,3 +412,29 @@ func BenchmarkFloat64DDSketchAcquireHandle(b *testing.B) { func BenchmarkFloat64DDSketchHandleAdd(b *testing.B) { benchmarkFloat64MeasureHandleAdd(b, "float64.ddsketch") } + +// Array + +func BenchmarkInt64ArrayAdd(b *testing.B) { + benchmarkInt64MeasureAdd(b, "int64.array") +} + +func BenchmarkInt64ArrayAcquireHandle(b *testing.B) { + benchmarkInt64MeasureAcquireHandle(b, "int64.array") +} + +func BenchmarkInt64ArrayHandleAdd(b *testing.B) { + benchmarkInt64MeasureHandleAdd(b, "int64.array") +} + +func BenchmarkFloat64ArrayAdd(b *testing.B) { + benchmarkFloat64MeasureAdd(b, "float64.array") +} + +func BenchmarkFloat64ArrayAcquireHandle(b *testing.B) { + benchmarkFloat64MeasureAcquireHandle(b, "float64.array") +} + +func BenchmarkFloat64ArrayHandleAdd(b *testing.B) { + benchmarkFloat64MeasureHandleAdd(b, "float64.array") +}