mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2024-12-04 09:43:23 +02:00
Add array aggregation for raw measure metrics, improve testing (#282)
* Array aggregator part 1 * Improve median testing * More testing * More testing * Update other dist tests * Add to the benchmark * Move errors into aggregator package, use from ddsketch; update Max/Min/Quantile to return errors for array/ddsketch/maxsumcount * Lint * Test non-absolute ddsketch * Lint * Comment * Add note
This commit is contained in:
parent
6b48fce4ec
commit
9040d824ae
@ -507,6 +507,8 @@ func (n Number) CompareInt64(i int64) int {
|
||||
// returns the typical result of the compare function: -1 if the value
|
||||
// is less than the other, 0 if both are equal, 1 if the value is
|
||||
// greater than the other.
|
||||
//
|
||||
// Do not compare NaN values.
|
||||
func (n Number) CompareFloat64(f float64) int {
|
||||
this := n.AsFloat64()
|
||||
if this < f {
|
||||
|
191
sdk/metric/aggregator/array/array.go
Normal file
191
sdk/metric/aggregator/array/array.go
Normal file
@ -0,0 +1,191 @@
|
||||
// Copyright 2019, OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package array // import "go.opentelemetry.io/otel/sdk/metric/aggregator/array"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sort"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/sdk/export"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
type (
|
||||
Aggregator struct {
|
||||
lock sync.Mutex
|
||||
current Points
|
||||
checkpoint Points
|
||||
ckptSum core.Number
|
||||
}
|
||||
|
||||
Points []core.Number
|
||||
)
|
||||
|
||||
var _ export.MetricAggregator = &Aggregator{}
|
||||
|
||||
func New() *Aggregator {
|
||||
return &Aggregator{}
|
||||
}
|
||||
|
||||
// Sum returns the sum of the checkpoint.
|
||||
func (c *Aggregator) Sum() core.Number {
|
||||
return c.ckptSum
|
||||
}
|
||||
|
||||
// Count returns the count of the checkpoint.
|
||||
func (c *Aggregator) Count() int64 {
|
||||
return int64(len(c.checkpoint))
|
||||
}
|
||||
|
||||
// Max returns the max of the checkpoint.
|
||||
func (c *Aggregator) Max() (core.Number, error) {
|
||||
return c.checkpoint.Quantile(1)
|
||||
}
|
||||
|
||||
// Min returns the min of the checkpoint.
|
||||
func (c *Aggregator) Min() (core.Number, error) {
|
||||
return c.checkpoint.Quantile(0)
|
||||
}
|
||||
|
||||
// Quantile returns the estimated quantile of the checkpoint.
|
||||
func (c *Aggregator) Quantile(q float64) (core.Number, error) {
|
||||
return c.checkpoint.Quantile(q)
|
||||
}
|
||||
|
||||
func (c *Aggregator) Collect(ctx context.Context, rec export.MetricRecord, exp export.MetricBatcher) {
|
||||
c.lock.Lock()
|
||||
c.checkpoint, c.current = c.current, nil
|
||||
c.lock.Unlock()
|
||||
|
||||
desc := rec.Descriptor()
|
||||
kind := desc.NumberKind()
|
||||
|
||||
c.sort(kind)
|
||||
|
||||
c.ckptSum = core.Number(0)
|
||||
|
||||
for _, v := range c.checkpoint {
|
||||
c.ckptSum.AddNumber(kind, v)
|
||||
}
|
||||
|
||||
exp.Export(ctx, rec, c)
|
||||
}
|
||||
|
||||
func (c *Aggregator) Update(_ context.Context, number core.Number, rec export.MetricRecord) {
|
||||
desc := rec.Descriptor()
|
||||
kind := desc.NumberKind()
|
||||
|
||||
if kind == core.Float64NumberKind && math.IsNaN(number.AsFloat64()) {
|
||||
// TODO warn
|
||||
// NOTE: add this to the specification.
|
||||
return
|
||||
}
|
||||
|
||||
if !desc.Alternate() && number.IsNegative(kind) {
|
||||
// TODO warn
|
||||
return
|
||||
}
|
||||
|
||||
c.lock.Lock()
|
||||
c.current = append(c.current, number)
|
||||
c.lock.Unlock()
|
||||
}
|
||||
|
||||
func (c *Aggregator) Merge(oa export.MetricAggregator, desc *export.Descriptor) {
|
||||
o, _ := oa.(*Aggregator)
|
||||
if o == nil {
|
||||
// TODO warn
|
||||
return
|
||||
}
|
||||
|
||||
c.ckptSum.AddNumber(desc.NumberKind(), o.ckptSum)
|
||||
c.checkpoint = combine(c.checkpoint, o.checkpoint, desc.NumberKind())
|
||||
}
|
||||
|
||||
func (c *Aggregator) sort(kind core.NumberKind) {
|
||||
switch kind {
|
||||
case core.Float64NumberKind:
|
||||
sort.Float64s(*(*[]float64)(unsafe.Pointer(&c.checkpoint)))
|
||||
|
||||
case core.Int64NumberKind:
|
||||
sort.Sort(&c.checkpoint)
|
||||
|
||||
default:
|
||||
// NOTE: This can't happen because the SDK doesn't
|
||||
// support uint64-kind metric instruments.
|
||||
panic("Impossible case")
|
||||
}
|
||||
}
|
||||
|
||||
func combine(a, b Points, kind core.NumberKind) Points {
|
||||
result := make(Points, 0, len(a)+len(b))
|
||||
|
||||
for len(a) != 0 && len(b) != 0 {
|
||||
if a[0].CompareNumber(kind, b[0]) < 0 {
|
||||
result = append(result, a[0])
|
||||
a = a[1:]
|
||||
} else {
|
||||
result = append(result, b[0])
|
||||
b = b[1:]
|
||||
}
|
||||
}
|
||||
result = append(result, a...)
|
||||
result = append(result, b...)
|
||||
return result
|
||||
}
|
||||
|
||||
func (p *Points) Len() int {
|
||||
return len(*p)
|
||||
}
|
||||
|
||||
func (p *Points) Less(i, j int) bool {
|
||||
// Note this is specialized for int64, because float64 is
|
||||
// handled by `sort.Float64s` and uint64 numbers never appear
|
||||
// in this data.
|
||||
return int64((*p)[i]) < int64((*p)[j])
|
||||
}
|
||||
|
||||
func (p *Points) Swap(i, j int) {
|
||||
(*p)[i], (*p)[j] = (*p)[j], (*p)[i]
|
||||
}
|
||||
|
||||
// Quantile returns the least X such that Pr(x<X)>=q, where X is an
|
||||
// element of the data set.
|
||||
func (p *Points) Quantile(q float64) (core.Number, error) {
|
||||
if len(*p) == 0 {
|
||||
return core.Number(0), aggregator.ErrEmptyDataSet
|
||||
}
|
||||
|
||||
if q < 0 || q > 1 {
|
||||
return core.Number(0), aggregator.ErrInvalidQuantile
|
||||
}
|
||||
|
||||
if q == 0 || len(*p) == 1 {
|
||||
return (*p)[0], nil
|
||||
} else if q == 1 {
|
||||
return (*p)[len(*p)-1], nil
|
||||
}
|
||||
|
||||
// Note: There's no interpolation being done here. There are
|
||||
// many definitions for "quantile", some interpolate, some do
|
||||
// not. What is expected?
|
||||
position := float64(len(*p)-1) * q
|
||||
ceil := int(math.Ceil(position))
|
||||
return (*p)[ceil], nil
|
||||
}
|
297
sdk/metric/aggregator/array/array_test.go
Normal file
297
sdk/metric/aggregator/array/array_test.go
Normal file
@ -0,0 +1,297 @@
|
||||
// Copyright 2019, OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package array
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/sdk/export"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
|
||||
)
|
||||
|
||||
type updateTest struct {
|
||||
count int
|
||||
absolute bool
|
||||
}
|
||||
|
||||
func (ut *updateTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
|
||||
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, !ut.absolute)
|
||||
|
||||
agg := New()
|
||||
|
||||
all := test.NewNumbers(profile.NumberKind)
|
||||
|
||||
for i := 0; i < ut.count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all.Append(x)
|
||||
agg.Update(ctx, x, record)
|
||||
|
||||
if !ut.absolute {
|
||||
y := profile.Random(-1)
|
||||
all.Append(y)
|
||||
agg.Update(ctx, y, record)
|
||||
}
|
||||
}
|
||||
|
||||
agg.Collect(ctx, record, batcher)
|
||||
|
||||
all.Sort()
|
||||
|
||||
require.InEpsilon(t,
|
||||
all.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
agg.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
0.0000001,
|
||||
"Same sum - absolute")
|
||||
require.Equal(t, all.Count(), agg.Count(), "Same count - absolute")
|
||||
|
||||
min, err := agg.Min()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, all.Min(), min, "Same min - absolute")
|
||||
|
||||
max, err := agg.Max()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, all.Max(), max, "Same max - absolute")
|
||||
|
||||
qx, err := agg.Quantile(0.5)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, all.Median(), qx, "Same median - absolute")
|
||||
}
|
||||
|
||||
func TestArrayUpdate(t *testing.T) {
|
||||
// Test with an odd an even number of measurements
|
||||
for count := 999; count <= 1000; count++ {
|
||||
t.Run(fmt.Sprint("Odd=", count%2 == 1), func(t *testing.T) {
|
||||
// Test absolute and non-absolute
|
||||
for _, absolute := range []bool{false, true} {
|
||||
t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) {
|
||||
ut := updateTest{
|
||||
count: count,
|
||||
absolute: absolute,
|
||||
}
|
||||
|
||||
// Test integer and floating point
|
||||
test.RunProfiles(t, ut.run)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type mergeTest struct {
|
||||
count int
|
||||
absolute bool
|
||||
}
|
||||
|
||||
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
|
||||
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, !mt.absolute)
|
||||
|
||||
agg1 := New()
|
||||
agg2 := New()
|
||||
|
||||
all := test.NewNumbers(profile.NumberKind)
|
||||
|
||||
for i := 0; i < mt.count; i++ {
|
||||
x1 := profile.Random(+1)
|
||||
all.Append(x1)
|
||||
agg1.Update(ctx, x1, record)
|
||||
|
||||
x2 := profile.Random(+1)
|
||||
all.Append(x2)
|
||||
agg2.Update(ctx, x2, record)
|
||||
|
||||
if !mt.absolute {
|
||||
y1 := profile.Random(-1)
|
||||
all.Append(y1)
|
||||
agg1.Update(ctx, y1, record)
|
||||
|
||||
y2 := profile.Random(-1)
|
||||
all.Append(y2)
|
||||
agg2.Update(ctx, y2, record)
|
||||
}
|
||||
}
|
||||
|
||||
agg1.Collect(ctx, record, batcher)
|
||||
agg2.Collect(ctx, record, batcher)
|
||||
|
||||
agg1.Merge(agg2, record.Descriptor())
|
||||
|
||||
all.Sort()
|
||||
|
||||
require.InEpsilon(t,
|
||||
all.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
agg1.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
0.0000001,
|
||||
"Same sum - absolute")
|
||||
require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute")
|
||||
|
||||
min, err := agg1.Min()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, all.Min(), min, "Same min - absolute")
|
||||
|
||||
max, err := agg1.Max()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, all.Max(), max, "Same max - absolute")
|
||||
|
||||
qx, err := agg1.Quantile(0.5)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, all.Median(), qx, "Same median - absolute")
|
||||
}
|
||||
|
||||
func TestArrayMerge(t *testing.T) {
|
||||
// Test with an odd an even number of measurements
|
||||
for count := 999; count <= 1000; count++ {
|
||||
t.Run(fmt.Sprint("Odd=", count%2 == 1), func(t *testing.T) {
|
||||
// Test absolute and non-absolute
|
||||
for _, absolute := range []bool{false, true} {
|
||||
t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) {
|
||||
mt := mergeTest{
|
||||
count: count,
|
||||
absolute: absolute,
|
||||
}
|
||||
|
||||
// Test integer and floating point
|
||||
test.RunProfiles(t, mt.run)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestArrayErrors(t *testing.T) {
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
agg := New()
|
||||
|
||||
_, err := agg.Max()
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrEmptyDataSet)
|
||||
|
||||
_, err = agg.Min()
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrEmptyDataSet)
|
||||
|
||||
_, err = agg.Quantile(0.1)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrEmptyDataSet)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false)
|
||||
|
||||
agg.Update(ctx, core.Number(0), record)
|
||||
|
||||
if profile.NumberKind == core.Float64NumberKind {
|
||||
agg.Update(ctx, core.NewFloat64Number(math.NaN()), record)
|
||||
}
|
||||
agg.Collect(ctx, record, batcher)
|
||||
|
||||
require.Equal(t, int64(1), agg.Count(), "NaN value was not counted")
|
||||
|
||||
num, err := agg.Quantile(0)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, num, core.Number(0))
|
||||
|
||||
_, err = agg.Quantile(-0.0001)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrInvalidQuantile)
|
||||
|
||||
_, err = agg.Quantile(1.0001)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, aggregator.ErrInvalidQuantile)
|
||||
})
|
||||
}
|
||||
|
||||
func TestArrayFloat64(t *testing.T) {
|
||||
for _, absolute := range []bool{false, true} {
|
||||
t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) {
|
||||
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, core.Float64NumberKind, !absolute)
|
||||
|
||||
fpsf := func(sign int) []float64 {
|
||||
// Check behavior of a bunch of odd floating
|
||||
// points except for NaN, which is invalid.
|
||||
return []float64{
|
||||
0,
|
||||
math.Inf(sign),
|
||||
1 / math.Inf(sign),
|
||||
1,
|
||||
2,
|
||||
1e100,
|
||||
math.MaxFloat64,
|
||||
math.SmallestNonzeroFloat64,
|
||||
math.MaxFloat32,
|
||||
math.SmallestNonzeroFloat32,
|
||||
math.E,
|
||||
math.Pi,
|
||||
math.Phi,
|
||||
math.Sqrt2,
|
||||
math.SqrtE,
|
||||
math.SqrtPi,
|
||||
math.SqrtPhi,
|
||||
math.Ln2,
|
||||
math.Log2E,
|
||||
math.Ln10,
|
||||
math.Log10E,
|
||||
}
|
||||
}
|
||||
|
||||
all := test.NewNumbers(core.Float64NumberKind)
|
||||
|
||||
ctx := context.Background()
|
||||
agg := New()
|
||||
|
||||
for _, f := range fpsf(1) {
|
||||
all.Append(core.NewFloat64Number(f))
|
||||
agg.Update(ctx, core.NewFloat64Number(f), record)
|
||||
}
|
||||
|
||||
if !absolute {
|
||||
for _, f := range fpsf(-1) {
|
||||
all.Append(core.NewFloat64Number(f))
|
||||
agg.Update(ctx, core.NewFloat64Number(f), record)
|
||||
}
|
||||
}
|
||||
|
||||
agg.Collect(ctx, record, batcher)
|
||||
|
||||
all.Sort()
|
||||
|
||||
require.InEpsilon(t, all.Sum().AsFloat64(), agg.Sum().AsFloat64(), 0.0000001, "Same sum")
|
||||
|
||||
require.Equal(t, all.Count(), agg.Count(), "Same count")
|
||||
|
||||
min, err := agg.Min()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, all.Min(), min, "Same min")
|
||||
|
||||
max, err := agg.Max()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, all.Max(), max, "Same max")
|
||||
|
||||
qx, err := agg.Quantile(0.5)
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, all.Median(), qx, "Same median")
|
||||
})
|
||||
}
|
||||
}
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package counter
|
||||
package counter // import "go.opentelemetry.io/otel/sdk/metric/aggregator/counter"
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -11,16 +11,18 @@
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
package ddsketch
|
||||
package ddsketch // import "go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
sdk "github.com/DataDog/sketches-go/ddsketch"
|
||||
|
||||
"go.opentelemetry.io/otel/api/core"
|
||||
"go.opentelemetry.io/otel/sdk/export"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
// Aggregator aggregates measure events.
|
||||
@ -44,13 +46,17 @@ func New(cfg *sdk.Config, desc *export.Descriptor) *Aggregator {
|
||||
}
|
||||
|
||||
// NewDefaultConfig returns a new, default DDSketch config.
|
||||
//
|
||||
// TODO: The Config constructor should probably set minValue to -Inf
|
||||
// to aggregate metrics with absolute=false. This requires providing values
|
||||
// for alpha and maxNumBins
|
||||
func NewDefaultConfig() *sdk.Config {
|
||||
return sdk.NewDefaultConfig()
|
||||
}
|
||||
|
||||
// Sum returns the sum of the checkpoint.
|
||||
func (c *Aggregator) Sum() float64 {
|
||||
return c.checkpoint.Sum()
|
||||
func (c *Aggregator) Sum() core.Number {
|
||||
return c.toNumber(c.checkpoint.Sum())
|
||||
}
|
||||
|
||||
// Count returns the count of the checkpoint.
|
||||
@ -59,18 +65,29 @@ func (c *Aggregator) Count() int64 {
|
||||
}
|
||||
|
||||
// Max returns the max of the checkpoint.
|
||||
func (c *Aggregator) Max() float64 {
|
||||
return c.checkpoint.Quantile(1)
|
||||
func (c *Aggregator) Max() (core.Number, error) {
|
||||
return c.Quantile(1)
|
||||
}
|
||||
|
||||
// Min returns the min of the checkpoint.
|
||||
func (c *Aggregator) Min() float64 {
|
||||
return c.checkpoint.Quantile(0)
|
||||
func (c *Aggregator) Min() (core.Number, error) {
|
||||
return c.Quantile(0)
|
||||
}
|
||||
|
||||
// Quantile returns the estimated quantile of the checkpoint.
|
||||
func (c *Aggregator) Quantile(q float64) float64 {
|
||||
return c.checkpoint.Quantile(q)
|
||||
func (c *Aggregator) Quantile(q float64) (core.Number, error) {
|
||||
f := c.checkpoint.Quantile(q)
|
||||
if math.IsNaN(f) {
|
||||
return core.Number(0), aggregator.ErrInvalidQuantile
|
||||
}
|
||||
return c.toNumber(f), nil
|
||||
}
|
||||
|
||||
func (c *Aggregator) toNumber(f float64) core.Number {
|
||||
if c.kind == core.Float64NumberKind {
|
||||
return core.NewFloat64Number(f)
|
||||
}
|
||||
return core.NewInt64Number(int64(f))
|
||||
}
|
||||
|
||||
// Collect checkpoints the current value (atomically) and exports it.
|
||||
|
@ -16,6 +16,7 @@ package ddsketch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -24,90 +25,147 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
|
||||
)
|
||||
|
||||
const count = 100
|
||||
const count = 1000
|
||||
|
||||
// N.B. DDSketch only supports absolute measures
|
||||
type updateTest struct {
|
||||
absolute bool
|
||||
}
|
||||
|
||||
func TestDDSketchAbsolute(t *testing.T) {
|
||||
func (ut *updateTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, !ut.absolute)
|
||||
|
||||
agg := New(NewDefaultConfig(), record.Descriptor())
|
||||
|
||||
all := test.NewNumbers(profile.NumberKind)
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all.Append(x)
|
||||
agg.Update(ctx, x, record)
|
||||
|
||||
if !ut.absolute {
|
||||
y := profile.Random(-1)
|
||||
all.Append(y)
|
||||
agg.Update(ctx, y, record)
|
||||
}
|
||||
}
|
||||
|
||||
agg.Collect(ctx, record, batcher)
|
||||
|
||||
all.Sort()
|
||||
|
||||
require.InDelta(t,
|
||||
all.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
agg.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
1,
|
||||
"Same sum - absolute")
|
||||
require.Equal(t, all.Count(), agg.Count(), "Same count - absolute")
|
||||
|
||||
max, err := agg.Max()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t,
|
||||
all.Max(),
|
||||
max,
|
||||
"Same max - absolute")
|
||||
|
||||
median, err := agg.Quantile(0.5)
|
||||
require.Nil(t, err)
|
||||
require.InDelta(t,
|
||||
all.Median().CoerceToFloat64(profile.NumberKind),
|
||||
median.CoerceToFloat64(profile.NumberKind),
|
||||
10,
|
||||
"Same median - absolute")
|
||||
}
|
||||
|
||||
func TestDDSketchUpdate(t *testing.T) {
|
||||
// Test absolute and non-absolute
|
||||
for _, absolute := range []bool{false, true} {
|
||||
t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) {
|
||||
ut := updateTest{
|
||||
absolute: absolute,
|
||||
}
|
||||
// Test integer and floating point
|
||||
test.RunProfiles(t, ut.run)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type mergeTest struct {
|
||||
absolute bool
|
||||
}
|
||||
|
||||
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false)
|
||||
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, !mt.absolute)
|
||||
|
||||
agg := New(NewDefaultConfig(), record.Descriptor())
|
||||
agg1 := New(NewDefaultConfig(), record.Descriptor())
|
||||
agg2 := New(NewDefaultConfig(), record.Descriptor())
|
||||
|
||||
var all test.Numbers
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all = append(all, x)
|
||||
agg.Update(ctx, x, record)
|
||||
all := test.NewNumbers(profile.NumberKind)
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all.Append(x)
|
||||
agg1.Update(ctx, x, record)
|
||||
|
||||
if !mt.absolute {
|
||||
y := profile.Random(-1)
|
||||
all.Append(y)
|
||||
agg1.Update(ctx, y, record)
|
||||
}
|
||||
}
|
||||
|
||||
agg.Collect(ctx, record, batcher)
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all.Append(x)
|
||||
agg2.Update(ctx, x, record)
|
||||
|
||||
all.Sort()
|
||||
if !mt.absolute {
|
||||
y := profile.Random(-1)
|
||||
all.Append(y)
|
||||
agg2.Update(ctx, y, record)
|
||||
}
|
||||
}
|
||||
|
||||
require.InEpsilon(t,
|
||||
all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
|
||||
agg.Sum(),
|
||||
0.0000001,
|
||||
"Same sum - absolute")
|
||||
require.Equal(t, all.Count(), agg.Count(), "Same count - absolute")
|
||||
require.Equal(t,
|
||||
all[len(all)-1].CoerceToFloat64(profile.NumberKind),
|
||||
agg.Max(),
|
||||
"Same max - absolute")
|
||||
require.InEpsilon(t,
|
||||
all.Median(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
|
||||
agg.Quantile(0.5),
|
||||
0.1,
|
||||
"Same median - absolute")
|
||||
})
|
||||
agg1.Collect(ctx, record, batcher)
|
||||
agg2.Collect(ctx, record, batcher)
|
||||
|
||||
agg1.Merge(agg2, record.Descriptor())
|
||||
|
||||
all.Sort()
|
||||
|
||||
require.InDelta(t,
|
||||
all.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
agg1.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
1,
|
||||
"Same sum - absolute")
|
||||
require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute")
|
||||
|
||||
max, err := agg1.Max()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t,
|
||||
all.Max(),
|
||||
max,
|
||||
"Same max - absolute")
|
||||
|
||||
median, err := agg1.Quantile(0.5)
|
||||
require.Nil(t, err)
|
||||
require.InDelta(t,
|
||||
all.Median().CoerceToFloat64(profile.NumberKind),
|
||||
median.CoerceToFloat64(profile.NumberKind),
|
||||
10,
|
||||
"Same median - absolute")
|
||||
}
|
||||
|
||||
func TestDDSketchMerge(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
|
||||
batcher, record := test.NewAggregatorTest(export.MeasureMetricKind, profile.NumberKind, false)
|
||||
|
||||
agg1 := New(NewDefaultConfig(), record.Descriptor())
|
||||
agg2 := New(NewDefaultConfig(), record.Descriptor())
|
||||
|
||||
var all test.Numbers
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all = append(all, x)
|
||||
agg1.Update(ctx, x, record)
|
||||
}
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all = append(all, x)
|
||||
agg2.Update(ctx, x, record)
|
||||
}
|
||||
|
||||
agg1.Collect(ctx, record, batcher)
|
||||
agg2.Collect(ctx, record, batcher)
|
||||
|
||||
agg1.Merge(agg2, record.Descriptor())
|
||||
|
||||
all.Sort()
|
||||
|
||||
require.InEpsilon(t,
|
||||
all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
|
||||
agg1.Sum(),
|
||||
0.0000001,
|
||||
"Same sum - absolute")
|
||||
require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute")
|
||||
require.Equal(t,
|
||||
all[len(all)-1].CoerceToFloat64(profile.NumberKind),
|
||||
agg1.Max(),
|
||||
"Same max - absolute")
|
||||
require.InEpsilon(t,
|
||||
all.Median(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
|
||||
agg1.Quantile(0.5),
|
||||
0.1,
|
||||
"Same median - absolute")
|
||||
})
|
||||
// Test absolute and non-absolute
|
||||
for _, absolute := range []bool{false, true} {
|
||||
t.Run(fmt.Sprint("Absolute=", absolute), func(t *testing.T) {
|
||||
mt := mergeTest{
|
||||
absolute: absolute,
|
||||
}
|
||||
// Test integer and floating point
|
||||
test.RunProfiles(t, mt.run)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
22
sdk/metric/aggregator/errors.go
Normal file
22
sdk/metric/aggregator/errors.go
Normal file
@ -0,0 +1,22 @@
|
||||
// Copyright 2019, OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package aggregator
|
||||
|
||||
import "fmt"
|
||||
|
||||
var (
|
||||
ErrEmptyDataSet = fmt.Errorf("The result is not defined on an empty data set")
|
||||
ErrInvalidQuantile = fmt.Errorf("The requested quantile is out of range")
|
||||
)
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package gauge
|
||||
package gauge // import "go.opentelemetry.io/otel/sdk/metric/aggregator/gauge"
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package maxsumcount
|
||||
package maxsumcount // import "go.opentelemetry.io/otel/sdk/metric/aggregator/maxsumcount"
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -54,8 +54,8 @@ func (c *Aggregator) Count() int64 {
|
||||
}
|
||||
|
||||
// Max returns the accumulated max as a Number.
|
||||
func (c *Aggregator) Max() core.Number {
|
||||
return c.checkpoint.max
|
||||
func (c *Aggregator) Max() (core.Number, error) {
|
||||
return c.checkpoint.max, nil
|
||||
}
|
||||
|
||||
// Collect checkpoints the current value (atomically) and exports it.
|
||||
|
@ -34,10 +34,11 @@ func TestMaxSumCountAbsolute(t *testing.T) {
|
||||
|
||||
agg := New()
|
||||
|
||||
var all test.Numbers
|
||||
all := test.NewNumbers(profile.NumberKind)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all = append(all, x)
|
||||
all.Append(x)
|
||||
agg.Update(ctx, x, record)
|
||||
}
|
||||
|
||||
@ -46,14 +47,17 @@ func TestMaxSumCountAbsolute(t *testing.T) {
|
||||
all.Sort()
|
||||
|
||||
require.InEpsilon(t,
|
||||
all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
|
||||
all.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
agg.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
0.000000001,
|
||||
"Same sum - absolute")
|
||||
require.Equal(t, all.Count(), agg.Count(), "Same count - absolute")
|
||||
|
||||
max, err := agg.Max()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t,
|
||||
all[len(all)-1],
|
||||
agg.Max(),
|
||||
all.Max(),
|
||||
max,
|
||||
"Same max - absolute")
|
||||
})
|
||||
}
|
||||
@ -67,16 +71,16 @@ func TestMaxSumCountMerge(t *testing.T) {
|
||||
agg1 := New()
|
||||
agg2 := New()
|
||||
|
||||
var all test.Numbers
|
||||
all := test.NewNumbers(profile.NumberKind)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all = append(all, x)
|
||||
all.Append(x)
|
||||
agg1.Update(ctx, x, record)
|
||||
}
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all = append(all, x)
|
||||
all.Append(x)
|
||||
agg2.Update(ctx, x, record)
|
||||
}
|
||||
|
||||
@ -88,14 +92,17 @@ func TestMaxSumCountMerge(t *testing.T) {
|
||||
all.Sort()
|
||||
|
||||
require.InEpsilon(t,
|
||||
all.Sum(profile.NumberKind).CoerceToFloat64(profile.NumberKind),
|
||||
all.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
agg1.Sum().CoerceToFloat64(profile.NumberKind),
|
||||
0.000000001,
|
||||
"Same sum - absolute")
|
||||
require.Equal(t, all.Count(), agg1.Count(), "Same count - absolute")
|
||||
|
||||
max, err := agg1.Max()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t,
|
||||
all[len(all)-1],
|
||||
agg1.Max(),
|
||||
all.Max(),
|
||||
max,
|
||||
"Same max - absolute")
|
||||
})
|
||||
}
|
||||
|
@ -27,24 +27,29 @@ import (
|
||||
var _ export.MetricBatcher = &metricBatcher{}
|
||||
var _ export.MetricRecord = &metricRecord{}
|
||||
|
||||
const Magnitude = 1000
|
||||
|
||||
type Profile struct {
|
||||
NumberKind core.NumberKind
|
||||
Random func(sign int) core.Number
|
||||
}
|
||||
|
||||
var profiles = []Profile{
|
||||
{
|
||||
NumberKind: core.Int64NumberKind,
|
||||
Random: func(sign int) core.Number {
|
||||
return core.NewInt64Number(int64(sign) * int64(rand.Intn(100000)))
|
||||
func newProfiles() []Profile {
|
||||
rnd := rand.New(rand.NewSource(rand.Int63()))
|
||||
return []Profile{
|
||||
{
|
||||
NumberKind: core.Int64NumberKind,
|
||||
Random: func(sign int) core.Number {
|
||||
return core.NewInt64Number(int64(sign) * int64(rnd.Intn(Magnitude+1)))
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
NumberKind: core.Float64NumberKind,
|
||||
Random: func(sign int) core.Number {
|
||||
return core.NewFloat64Number(float64(sign) * rand.Float64() * 100000)
|
||||
{
|
||||
NumberKind: core.Float64NumberKind,
|
||||
Random: func(sign int) core.Number {
|
||||
return core.NewFloat64Number(float64(sign) * rnd.Float64() * Magnitude)
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type metricBatcher struct {
|
||||
@ -75,66 +80,70 @@ func (m *metricBatcher) Export(context.Context, export.MetricRecord, export.Metr
|
||||
}
|
||||
|
||||
func RunProfiles(t *testing.T, f func(*testing.T, Profile)) {
|
||||
for _, profile := range profiles {
|
||||
for _, profile := range newProfiles() {
|
||||
t.Run(profile.NumberKind.String(), func(t *testing.T) {
|
||||
f(t, profile)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type Numbers []core.Number
|
||||
type Numbers struct {
|
||||
kind core.NumberKind
|
||||
numbers []core.Number
|
||||
}
|
||||
|
||||
func NewNumbers(kind core.NumberKind) Numbers {
|
||||
return Numbers{
|
||||
kind: kind,
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Numbers) Append(v core.Number) {
|
||||
n.numbers = append(n.numbers, v)
|
||||
}
|
||||
|
||||
func (n *Numbers) Sort() {
|
||||
sort.Sort(n)
|
||||
}
|
||||
|
||||
func (n *Numbers) Less(i, j int) bool {
|
||||
return (*n)[i] < (*n)[j]
|
||||
return n.numbers[i].CompareNumber(n.kind, n.numbers[j]) < 0
|
||||
}
|
||||
|
||||
func (n *Numbers) Len() int {
|
||||
return len(*n)
|
||||
return len(n.numbers)
|
||||
}
|
||||
|
||||
func (n *Numbers) Swap(i, j int) {
|
||||
(*n)[i], (*n)[j] = (*n)[j], (*n)[i]
|
||||
n.numbers[i], n.numbers[j] = n.numbers[j], n.numbers[i]
|
||||
}
|
||||
|
||||
func (n *Numbers) Sum(kind core.NumberKind) core.Number {
|
||||
func (n *Numbers) Sum() core.Number {
|
||||
var sum core.Number
|
||||
for _, num := range *n {
|
||||
sum.AddNumber(kind, num)
|
||||
for _, num := range n.numbers {
|
||||
sum.AddNumber(n.kind, num)
|
||||
}
|
||||
return sum
|
||||
}
|
||||
|
||||
func (n *Numbers) Count() int64 {
|
||||
return int64(len(*n))
|
||||
return int64(len(n.numbers))
|
||||
}
|
||||
|
||||
func (n *Numbers) Median(kind core.NumberKind) core.Number {
|
||||
if !sort.IsSorted(n) {
|
||||
panic("Sort these numbers before calling Median")
|
||||
}
|
||||
|
||||
l := len(*n)
|
||||
if l%2 == 1 {
|
||||
return (*n)[l/2]
|
||||
}
|
||||
|
||||
lower := (*n)[l/2-1]
|
||||
upper := (*n)[l/2]
|
||||
|
||||
sum := lower
|
||||
sum.AddNumber(kind, upper)
|
||||
|
||||
switch kind {
|
||||
case core.Uint64NumberKind:
|
||||
return core.NewUint64Number(sum.AsUint64() / 2)
|
||||
case core.Int64NumberKind:
|
||||
return core.NewInt64Number(sum.AsInt64() / 2)
|
||||
case core.Float64NumberKind:
|
||||
return core.NewFloat64Number(sum.AsFloat64() / 2)
|
||||
}
|
||||
panic("unknown number kind")
|
||||
func (n *Numbers) Min() core.Number {
|
||||
return n.numbers[0]
|
||||
}
|
||||
|
||||
func (n *Numbers) Max() core.Number {
|
||||
return n.numbers[len(n.numbers)-1]
|
||||
}
|
||||
|
||||
// Median() is an alias for Quantile(0.5).
|
||||
func (n *Numbers) Median() core.Number {
|
||||
// Note that len(n.numbers) is 1 greater than the max element
|
||||
// index, so dividing by two rounds up. This gives the
|
||||
// intended definition for Quantile() in tests, which is to
|
||||
// return the smallest element that is at or above the
|
||||
// specified quantile.
|
||||
return n.numbers[len(n.numbers)/2]
|
||||
}
|
||||
|
@ -56,6 +56,8 @@ func (bf *benchFixture) AggregatorFor(rec export.MetricRecord) export.MetricAggr
|
||||
return maxsumcount.New()
|
||||
} else if strings.HasSuffix(rec.Descriptor().Name(), "ddsketch") {
|
||||
return ddsketch.New(ddsketch.NewDefaultConfig(), rec.Descriptor())
|
||||
} else if strings.HasSuffix(rec.Descriptor().Name(), "array") {
|
||||
return ddsketch.New(ddsketch.NewDefaultConfig(), rec.Descriptor())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -410,3 +412,29 @@ func BenchmarkFloat64DDSketchAcquireHandle(b *testing.B) {
|
||||
func BenchmarkFloat64DDSketchHandleAdd(b *testing.B) {
|
||||
benchmarkFloat64MeasureHandleAdd(b, "float64.ddsketch")
|
||||
}
|
||||
|
||||
// Array
|
||||
|
||||
func BenchmarkInt64ArrayAdd(b *testing.B) {
|
||||
benchmarkInt64MeasureAdd(b, "int64.array")
|
||||
}
|
||||
|
||||
func BenchmarkInt64ArrayAcquireHandle(b *testing.B) {
|
||||
benchmarkInt64MeasureAcquireHandle(b, "int64.array")
|
||||
}
|
||||
|
||||
func BenchmarkInt64ArrayHandleAdd(b *testing.B) {
|
||||
benchmarkInt64MeasureHandleAdd(b, "int64.array")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64ArrayAdd(b *testing.B) {
|
||||
benchmarkFloat64MeasureAdd(b, "float64.array")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64ArrayAcquireHandle(b *testing.B) {
|
||||
benchmarkFloat64MeasureAcquireHandle(b, "float64.array")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64ArrayHandleAdd(b *testing.B) {
|
||||
benchmarkFloat64MeasureHandleAdd(b, "float64.array")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user