1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-09-16 09:26:25 +02:00

Single-state Aggregator and test refactor (#812)

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
Joshua MacDonald
2020-06-13 00:55:01 -07:00
committed by GitHub
parent 288a3dd435
commit 9925ebe517
34 changed files with 1058 additions and 849 deletions

View File

@@ -16,7 +16,6 @@ package internal_test
import (
"context"
"strings"
"testing"
"go.opentelemetry.io/otel/api/global"
@@ -26,9 +25,7 @@ import (
"go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)
@@ -37,6 +34,7 @@ var Must = metric.Must
// benchFixture is copied from sdk/metric/benchmark_test.go.
// TODO refactor to share this code.
type benchFixture struct {
export.AggregationSelector
accumulator *sdk.Accumulator
meter metric.Meter
B *testing.B
@@ -47,7 +45,8 @@ var _ metric.Provider = &benchFixture{}
func newFixture(b *testing.B) *benchFixture {
b.ReportAllocs()
bf := &benchFixture{
B: b,
B: b,
AggregationSelector: test.AggregationSelector(),
}
bf.accumulator = sdk.NewAccumulator(bf)
@@ -55,22 +54,6 @@ func newFixture(b *testing.B) *benchFixture {
return bf
}
func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
switch descriptor.MetricKind() {
case metric.CounterKind:
return sum.New()
case metric.ValueRecorderKind:
if strings.HasSuffix(descriptor.Name(), "minmaxsumcount") {
return minmaxsumcount.New(descriptor)
} else if strings.HasSuffix(descriptor.Name(), "ddsketch") {
return ddsketch.New(descriptor, ddsketch.NewDefaultConfig())
} else if strings.HasSuffix(descriptor.Name(), "array") {
return ddsketch.New(descriptor, ddsketch.NewDefaultConfig())
}
}
return nil
}
func (*benchFixture) Process(export.Record) error {
return nil
}

View File

@@ -18,6 +18,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
@@ -99,11 +100,13 @@ func TestStdoutTimestamp(t *testing.T) {
ctx := context.Background()
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Int64NumberKind)
lvagg := lastvalue.New()
aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
lvagg.Checkpoint(&desc)
checkpointSet.Add(&desc, lvagg)
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
aggtest.CheckedUpdate(t, lvagg, metric.NewInt64Number(321), &desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
checkpointSet.Add(&desc, ckpt)
if err := exporter.Export(ctx, checkpointSet); err != nil {
t.Fatal("Unexpected export error: ", err)
@@ -144,11 +147,13 @@ func TestStdoutCounterFormat(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.CounterKind, metric.Int64NumberKind)
cagg := sum.New()
aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
cagg.Checkpoint(&desc)
checkpointSet.Add(&desc, cagg, kv.String("A", "B"), kv.String("C", "D"))
cagg, ckpt := test.Unslice2(sum.New(2))
aggtest.CheckedUpdate(fix.t, cagg, metric.NewInt64Number(123), &desc)
require.NoError(t, cagg.SynchronizedCopy(ckpt, &desc))
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
fix.Export(checkpointSet)
@@ -161,11 +166,12 @@ func TestStdoutLastValueFormat(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
lvagg.Checkpoint(&desc)
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
fix.Export(checkpointSet)
@@ -178,12 +184,14 @@ func TestStdoutMinMaxSumCount(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := minmaxsumcount.New(&desc)
magg, ckpt := test.Unslice2(minmaxsumcount.New(2, &desc))
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(123.456), &desc)
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(876.543), &desc)
magg.Checkpoint(&desc)
require.NoError(t, magg.SynchronizedCopy(ckpt, &desc))
checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
fix.Export(checkpointSet)
@@ -198,15 +206,15 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
magg := array.New()
aagg, ckpt := test.Unslice2(array.New(2))
for i := 0; i < 1000; i++ {
aggtest.CheckedUpdate(fix.t, magg, metric.NewFloat64Number(float64(i)+0.5), &desc)
aggtest.CheckedUpdate(fix.t, aagg, metric.NewFloat64Number(float64(i)+0.5), &desc)
}
magg.Checkpoint(&desc)
require.NoError(t, aagg.SynchronizedCopy(ckpt, &desc))
checkpointSet.Add(&desc, magg, kv.String("A", "B"), kv.String("C", "D"))
checkpointSet.Add(&desc, ckpt, kv.String("A", "B"), kv.String("C", "D"))
fix.Export(checkpointSet)
@@ -239,28 +247,27 @@ func TestStdoutValueRecorderFormat(t *testing.T) {
func TestStdoutNoData(t *testing.T) {
desc := metric.NewDescriptor("test.name", metric.ValueRecorderKind, metric.Float64NumberKind)
for name, tc := range map[string]export.Aggregator{
"ddsketch": ddsketch.New(&desc, ddsketch.NewDefaultConfig()),
"minmaxsumcount": minmaxsumcount.New(&desc),
} {
tc := tc
t.Run(name, func(t *testing.T) {
runTwoAggs := func(agg, ckpt export.Aggregator) {
t.Run(fmt.Sprintf("%T", agg), func(t *testing.T) {
t.Parallel()
fix := newFixture(t, stdout.Config{})
checkpointSet := test.NewCheckpointSet(testResource)
magg := tc
magg.Checkpoint(&desc)
require.NoError(t, agg.SynchronizedCopy(ckpt, &desc))
checkpointSet.Add(&desc, magg)
checkpointSet.Add(&desc, ckpt)
fix.Export(checkpointSet)
require.Equal(t, `{"updates":null}`, fix.Output())
})
}
runTwoAggs(test.Unslice2(ddsketch.New(2, &desc, ddsketch.NewDefaultConfig())))
runTwoAggs(test.Unslice2(minmaxsumcount.New(2, &desc)))
}
func TestStdoutLastValueNotSet(t *testing.T) {
@@ -269,8 +276,9 @@ func TestStdoutLastValueNotSet(t *testing.T) {
checkpointSet := test.NewCheckpointSet(testResource)
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
lvagg.Checkpoint(&desc)
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
checkpointSet.Add(&desc, lvagg, kv.String("A", "B"), kv.String("C", "D"))
@@ -319,11 +327,12 @@ func TestStdoutResource(t *testing.T) {
checkpointSet := test.NewCheckpointSet(tc.res)
desc := metric.NewDescriptor("test.name", metric.ValueObserverKind, metric.Float64NumberKind)
lvagg := lastvalue.New()
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
lvagg.Checkpoint(&desc)
lvagg, ckpt := test.Unslice2(lastvalue.New(2))
checkpointSet.Add(&desc, lvagg, tc.attrs...)
aggtest.CheckedUpdate(fix.t, lvagg, metric.NewFloat64Number(123.456), &desc)
require.NoError(t, lvagg.SynchronizedCopy(ckpt, &desc))
checkpointSet.Add(&desc, ckpt, tc.attrs...)
fix.Export(checkpointSet)

View File

@@ -17,6 +17,7 @@ package test
import (
"context"
"errors"
"reflect"
"sync"
"go.opentelemetry.io/otel/api/kv"
@@ -24,10 +25,6 @@ import (
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
)
@@ -36,6 +33,7 @@ type mapkey struct {
distinct label.Distinct
}
// CheckpointSet is useful for testing Exporters.
type CheckpointSet struct {
sync.RWMutex
records map[mapkey]export.Record
@@ -43,6 +41,26 @@ type CheckpointSet struct {
resource *resource.Resource
}
// NoopAggregator is useful for testing Exporters.
type NoopAggregator struct{}
var _ export.Aggregator = (*NoopAggregator)(nil)
// Update implements export.Aggregator.
func (*NoopAggregator) Update(context.Context, metric.Number, *metric.Descriptor) error {
return nil
}
// SynchronizedCopy implements export.Aggregator.
func (*NoopAggregator) SynchronizedCopy(export.Aggregator, *metric.Descriptor) error {
return nil
}
// Merge implements export.Aggregator.
func (*NoopAggregator) Merge(export.Aggregator, *metric.Descriptor) error {
return nil
}
// NewCheckpointSet returns a test CheckpointSet that new records could be added.
// Records are grouped by their encoded labels.
func NewCheckpointSet(resource *resource.Resource) *CheckpointSet {
@@ -52,12 +70,13 @@ func NewCheckpointSet(resource *resource.Resource) *CheckpointSet {
}
}
// Reset clears the Aggregator state.
func (p *CheckpointSet) Reset() {
p.records = make(map[mapkey]export.Record)
p.updates = nil
}
// Add a new descriptor to a Checkpoint.
// Add a new record to a CheckpointSet.
//
// If there is an existing record with the same descriptor and labels,
// the stored aggregator will be returned and should be merged.
@@ -78,43 +97,6 @@ func (p *CheckpointSet) Add(desc *metric.Descriptor, newAgg export.Aggregator, l
return newAgg, true
}
func createNumber(desc *metric.Descriptor, v float64) metric.Number {
if desc.NumberKind() == metric.Float64NumberKind {
return metric.NewFloat64Number(v)
}
return metric.NewInt64Number(int64(v))
}
func (p *CheckpointSet) AddLastValue(desc *metric.Descriptor, v float64, labels ...kv.KeyValue) {
p.updateAggregator(desc, lastvalue.New(), v, labels...)
}
func (p *CheckpointSet) AddCounter(desc *metric.Descriptor, v float64, labels ...kv.KeyValue) {
p.updateAggregator(desc, sum.New(), v, labels...)
}
func (p *CheckpointSet) AddValueRecorder(desc *metric.Descriptor, v float64, labels ...kv.KeyValue) {
p.updateAggregator(desc, array.New(), v, labels...)
}
func (p *CheckpointSet) AddHistogramValueRecorder(desc *metric.Descriptor, boundaries []float64, v float64, labels ...kv.KeyValue) {
p.updateAggregator(desc, histogram.New(desc, boundaries), v, labels...)
}
func (p *CheckpointSet) updateAggregator(desc *metric.Descriptor, newAgg export.Aggregator, v float64, labels ...kv.KeyValue) {
ctx := context.Background()
// Updates and checkpoint the new aggregator
_ = newAgg.Update(ctx, createNumber(desc, v), desc)
newAgg.Checkpoint(desc)
// Try to add this aggregator to the CheckpointSet
agg, added := p.Add(desc, newAgg, labels...)
if !added {
// An aggregator already exist for this descriptor and label set, we should merge them.
_ = agg.Merge(newAgg, desc)
}
}
func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
for _, r := range p.updates {
if err := f(r); err != nil && !errors.Is(err, aggregation.ErrNoData) {
@@ -123,3 +105,17 @@ func (p *CheckpointSet) ForEach(f func(export.Record) error) error {
}
return nil
}
// Takes a slice of []some.Aggregator and returns a slice of []export.Aggregator
func Unslice2(sl interface{}) (one, two export.Aggregator) {
slv := reflect.ValueOf(sl)
if slv.Type().Kind() != reflect.Slice {
panic("Invalid Unslice2")
}
if slv.Len() != 2 {
panic("Invalid Unslice2: length > 2")
}
one = slv.Index(0).Addr().Interface().(export.Aggregator)
two = slv.Index(1).Addr().Interface().(export.Aggregator)
return
}

View File

@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestUnslice(t *testing.T) {
in := make([]NoopAggregator, 2)
a, b := Unslice2(in)
require.Equal(t, a.(*NoopAggregator), &in[0])
require.Equal(t, b.(*NoopAggregator), &in[1])
}

View File

@@ -22,11 +22,13 @@ import (
commonpb "github.com/open-telemetry/opentelemetry-proto/gen/go/common/v1"
metricpb "github.com/open-telemetry/opentelemetry-proto/gen/go/metrics/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/unit"
"go.opentelemetry.io/otel/exporters/metric/test"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
sumAgg "go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
@@ -80,17 +82,18 @@ func TestStringKeyValues(t *testing.T) {
}
func TestMinMaxSumCountValue(t *testing.T) {
mmsc := minmaxsumcount.New(&metric.Descriptor{})
mmsc, ckpt := test.Unslice2(minmaxsumcount.New(2, &metric.Descriptor{}))
assert.NoError(t, mmsc.Update(context.Background(), 1, &metric.Descriptor{}))
assert.NoError(t, mmsc.Update(context.Background(), 10, &metric.Descriptor{}))
// Prior to checkpointing ErrNoData should be returned.
_, _, _, _, err := minMaxSumCountValues(mmsc)
_, _, _, _, err := minMaxSumCountValues(ckpt.(aggregation.MinMaxSumCount))
assert.EqualError(t, err, aggregation.ErrNoData.Error())
// Checkpoint to set non-zero values
mmsc.Checkpoint(&metric.Descriptor{})
min, max, sum, count, err := minMaxSumCountValues(mmsc)
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &metric.Descriptor{}))
min, max, sum, count, err := minMaxSumCountValues(ckpt.(aggregation.MinMaxSumCount))
if assert.NoError(t, err) {
assert.Equal(t, min, metric.NewInt64Number(1))
assert.Equal(t, max, metric.NewInt64Number(10))
@@ -142,17 +145,17 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
}
ctx := context.Background()
mmsc := minmaxsumcount.New(&metric.Descriptor{})
mmsc, ckpt := test.Unslice2(minmaxsumcount.New(2, &metric.Descriptor{}))
if !assert.NoError(t, mmsc.Update(ctx, 1, &metric.Descriptor{})) {
return
}
mmsc.Checkpoint(&metric.Descriptor{})
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &metric.Descriptor{}))
for _, test := range tests {
desc := metric.NewDescriptor(test.name, test.metricKind, test.numberKind,
metric.WithDescription(test.description),
metric.WithUnit(test.unit))
labels := label.NewSet(test.labels...)
got, err := minMaxSumCount(&desc, &labels, mmsc)
got, err := minMaxSumCount(&desc, &labels, ckpt.(aggregation.MinMaxSumCount))
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
}
@@ -162,10 +165,11 @@ func TestMinMaxSumCountMetricDescriptor(t *testing.T) {
func TestMinMaxSumCountDatapoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderKind, metric.Int64NumberKind)
labels := label.NewSet()
mmsc := minmaxsumcount.New(&desc)
mmsc, ckpt := test.Unslice2(minmaxsumcount.New(2, &desc))
assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
mmsc.Checkpoint(&desc)
require.NoError(t, mmsc.SynchronizedCopy(ckpt, &desc))
expected := []*metricpb.SummaryDataPoint{
{
Count: 2,
@@ -182,7 +186,7 @@ func TestMinMaxSumCountDatapoints(t *testing.T) {
},
},
}
m, err := minMaxSumCount(&desc, &labels, mmsc)
m, err := minMaxSumCount(&desc, &labels, ckpt.(aggregation.MinMaxSumCount))
if assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
@@ -195,7 +199,7 @@ func TestMinMaxSumCountPropagatesErrors(t *testing.T) {
// ErrNoData should be returned by both the Min and Max values of
// a MinMaxSumCount Aggregator. Use this fact to check the error is
// correctly returned.
mmsc := minmaxsumcount.New(&metric.Descriptor{})
mmsc := &minmaxsumcount.New(1, &metric.Descriptor{})[0]
_, _, _, _, err := minMaxSumCountValues(mmsc)
assert.Error(t, err)
assert.Equal(t, aggregation.ErrNoData, err)
@@ -249,7 +253,7 @@ func TestSumMetricDescriptor(t *testing.T) {
metric.WithUnit(test.unit),
)
labels := label.NewSet(test.labels...)
got, err := sum(&desc, &labels, sumAgg.New())
got, err := sum(&desc, &labels, &sumAgg.New(1)[0])
if assert.NoError(t, err) {
assert.Equal(t, test.expected, got.MetricDescriptor)
}
@@ -259,10 +263,10 @@ func TestSumMetricDescriptor(t *testing.T) {
func TestSumInt64DataPoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderKind, metric.Int64NumberKind)
labels := label.NewSet()
s := sumAgg.New()
s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.Number(1), &desc))
s.Checkpoint(&desc)
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
if m, err := sum(&desc, &labels, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint{{Value: 1}}, m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint(nil), m.DoubleDataPoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints)
@@ -273,10 +277,10 @@ func TestSumInt64DataPoints(t *testing.T) {
func TestSumFloat64DataPoints(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderKind, metric.Float64NumberKind)
labels := label.NewSet()
s := sumAgg.New()
s, ckpt := test.Unslice2(sumAgg.New(2))
assert.NoError(t, s.Update(context.Background(), metric.NewFloat64Number(1), &desc))
s.Checkpoint(&desc)
if m, err := sum(&desc, &labels, s); assert.NoError(t, err) {
require.NoError(t, s.SynchronizedCopy(ckpt, &desc))
if m, err := sum(&desc, &labels, ckpt.(aggregation.Sum)); assert.NoError(t, err) {
assert.Equal(t, []*metricpb.Int64DataPoint(nil), m.Int64DataPoints)
assert.Equal(t, []*metricpb.DoubleDataPoint{{Value: 1}}, m.DoubleDataPoints)
assert.Equal(t, []*metricpb.HistogramDataPoint(nil), m.HistogramDataPoints)
@@ -287,7 +291,7 @@ func TestSumFloat64DataPoints(t *testing.T) {
func TestSumErrUnknownValueType(t *testing.T) {
desc := metric.NewDescriptor("", metric.ValueRecorderKind, metric.NumberKind(-1))
labels := label.NewSet()
s := sumAgg.New()
s := &sumAgg.New(1)[0]
_, err := sum(&desc, &labels, s)
assert.Error(t, err)
if !errors.Is(err, ErrUnknownValueType) {

View File

@@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/test"
metricsdk "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
@@ -662,12 +663,12 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
desc := metric.NewDescriptor(r.name, r.mKind, r.nKind, r.opts...)
labs := label.NewSet(r.labels...)
var agg metricsdk.Aggregator
var agg, ckpt metricsdk.Aggregator
switch r.mKind {
case metric.CounterKind:
agg = sum.New()
agg, ckpt = test.Unslice2(sum.New(2))
default:
agg = minmaxsumcount.New(&desc)
agg, ckpt = test.Unslice2(minmaxsumcount.New(2, &desc))
}
ctx := context.Background()
@@ -684,11 +685,11 @@ func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []me
default:
t.Fatalf("invalid number kind: %v", r.nKind)
}
agg.Checkpoint(&desc)
require.NoError(t, agg.SynchronizedCopy(ckpt, &desc))
equiv := r.resource.Equivalent()
resources[equiv] = r.resource
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, agg))
recs[equiv] = append(recs[equiv], metricsdk.NewRecord(&desc, &labs, r.resource, ckpt))
}
for _, records := range recs {
assert.NoError(t, exp.Export(context.Background(), &checkpointSet{records: records}))

View File

@@ -73,17 +73,24 @@ type Integrator interface {
// AggregationSelector supports selecting the kind of Aggregator to
// use at runtime for a specific metric instrument.
type AggregationSelector interface {
// AggregatorFor returns the kind of aggregator suited to the
// requested export. Returning `nil` indicates to ignore this
// metric instrument. This must return a consistent type to
// avoid confusion in later stages of the metrics export
// process, i.e., when Merging multiple aggregators for a
// specific instrument.
// AggregatorFor allocates a variable number of aggregators of
// a kind suitable for the requested export. This method
// initializes a `...*Aggregator`, to support making a single
// allocation.
//
// When the call returns without initializing the *Aggregator
// to a non-nil value, the metric instrument is explicitly
// disabled.
//
// This must return a consistent type to avoid confusion in
// later stages of the metrics export process, i.e., when
// Merging or Checkpointing aggregators for a specific
// instrument.
//
// Note: This is context-free because the aggregator should
// not relate to the incoming context. This call should not
// block.
AggregatorFor(*metric.Descriptor) Aggregator
AggregatorFor(*metric.Descriptor, ...*Aggregator)
}
// Aggregator implements a specific aggregation behavior, e.g., a
@@ -99,35 +106,42 @@ type AggregationSelector interface {
// MinMaxSumCount aggregator to a Counter instrument.
type Aggregator interface {
// Update receives a new measured value and incorporates it
// into the aggregation. Update() calls may arrive
// concurrently as the SDK does not provide synchronization.
// into the aggregation. Update() calls may be called
// concurrently.
//
// Descriptor.NumberKind() should be consulted to determine
// whether the provided number is an int64 or float64.
//
// The Context argument comes from user-level code and could be
// inspected for distributed or span context.
// inspected for a `correlation.Map` or `trace.SpanContext`.
Update(context.Context, metric.Number, *metric.Descriptor) error
// Checkpoint is called during collection to finish one period
// of aggregation by atomically saving the current value.
// Checkpoint() is called concurrently with Update().
// Checkpoint should reset the current state to the empty
// state, in order to begin computing a new delta for the next
// collection period.
// SynchronizedCopy is called during collection to finish one
// period of aggregation by atomically saving the
// currently-updating state into the argument Aggregator.
//
// After the checkpoint is taken, the current value may be
// accessed using by converting to one a suitable interface
// types in the `aggregator` sub-package.
// SynchronizedCopy() is called concurrently with Update(). These
// two methods must be synchronized with respect to each
// other, for correctness.
//
// After saving a synchronized copy, the Aggregator can be converted
// into one or more of the interfaces in the `aggregation` sub-package,
// according to kind of Aggregator that was selected.
//
// This method will return an InconsistentAggregatorError if
// this Aggregator cannot be copied into the destination due
// to an incompatible type.
//
// This call has no Context argument because it is expected to
// perform only computation.
Checkpoint(*metric.Descriptor)
SynchronizedCopy(destination Aggregator, descriptor *metric.Descriptor) error
// Merge combines the checkpointed state from the argument
// aggregator into this aggregator's checkpointed state.
// Merge() is called in a single-threaded context, no locking
// is required.
// Aggregator into this Aggregator. Merge is not synchronized
// with respect to Update or SynchronizedCopy.
//
// The owner of an Aggregator being merged is responsible for
// synchronization of both Aggregator states.
Merge(Aggregator, *metric.Descriptor) error
}

View File

@@ -23,11 +23,11 @@ import (
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
)
// NewInconsistentMergeError formats an error describing an attempt to
// merge different-type aggregators. The result can be unwrapped as
// NewInconsistentAggregatorError formats an error describing an attempt to
// Checkpoint or Merge different-type aggregators. The result can be unwrapped as
// an ErrInconsistentType.
func NewInconsistentMergeError(a1, a2 export.Aggregator) error {
return fmt.Errorf("cannot merge %T with %T: %w", a1, a2, aggregation.ErrInconsistentType)
func NewInconsistentAggregatorError(a1, a2 export.Aggregator) error {
return fmt.Errorf("%w: %T and %T", aggregation.ErrInconsistentType, a1, a2)
}
// RangeTest is a commmon routine for testing for valid input values.

View File

@@ -28,11 +28,11 @@ import (
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)
func TestInconsistentMergeErr(t *testing.T) {
err := aggregator.NewInconsistentMergeError(sum.New(), lastvalue.New())
func TestInconsistentAggregatorErr(t *testing.T) {
err := aggregator.NewInconsistentAggregatorError(&sum.New(1)[0], &lastvalue.New(1)[0])
require.Equal(
t,
"cannot merge *sum.Aggregator with *lastvalue.Aggregator: inconsistent aggregator types",
"inconsistent aggregator types: *sum.Aggregator and *lastvalue.Aggregator",
err.Error(),
)
require.True(t, errors.Is(err, aggregation.ErrInconsistentType))

View File

@@ -31,11 +31,9 @@ type (
// Aggregator aggregates events that form a distribution, keeping
// an array with the exact set of values.
Aggregator struct {
// ckptSum needs to be aligned for 64-bit atomic operations.
ckptSum metric.Number
lock sync.Mutex
current points
checkpoint points
lock sync.Mutex
sum metric.Number
points points
}
points []metric.Number
@@ -48,9 +46,9 @@ var _ aggregation.Points = &Aggregator{}
// New returns a new array aggregator, which aggregates recorded
// measurements by storing them in an array. This type uses a mutex
// for Update() and Checkpoint() concurrency.
func New() *Aggregator {
return &Aggregator{}
// for Update() and SynchronizedCopy() concurrency.
func New(cnt int) []Aggregator {
return make([]Aggregator, cnt)
}
// Kind returns aggregation.ExactKind.
@@ -60,64 +58,65 @@ func (c *Aggregator) Kind() aggregation.Kind {
// Sum returns the sum of values in the checkpoint.
func (c *Aggregator) Sum() (metric.Number, error) {
return c.ckptSum, nil
return c.sum, nil
}
// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
return int64(len(c.checkpoint)), nil
return int64(len(c.points)), nil
}
// Max returns the maximum value in the checkpoint.
func (c *Aggregator) Max() (metric.Number, error) {
return c.checkpoint.Quantile(1)
return c.points.Quantile(1)
}
// Min returns the mininum value in the checkpoint.
func (c *Aggregator) Min() (metric.Number, error) {
return c.checkpoint.Quantile(0)
return c.points.Quantile(0)
}
// Quantile returns the estimated quantile of data in the checkpoint.
// It is an error if `q` is less than 0 or greated than 1.
func (c *Aggregator) Quantile(q float64) (metric.Number, error) {
return c.checkpoint.Quantile(q)
return c.points.Quantile(q)
}
// Points returns access to the raw data set.
func (c *Aggregator) Points() ([]metric.Number, error) {
return c.checkpoint, nil
return c.points, nil
}
// Checkpoint saves the current state and resets the current state to
// SynchronizedCopy saves the current state to oa and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
c.lock.Lock()
c.checkpoint, c.current = c.current, nil
c.lock.Unlock()
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
kind := desc.NumberKind()
c.lock.Lock()
o.points, c.points = c.points, nil
o.sum, c.sum = c.sum, 0
c.lock.Unlock()
// TODO: This sort should be done lazily, only when quantiles
// are requested. The SDK specification says you can use this
// aggregator to simply list values in the order they were
// received as an alternative to requesting quantile information.
c.sort(kind)
c.ckptSum = metric.Number(0)
for _, v := range c.checkpoint {
c.ckptSum.AddNumber(kind, v)
}
o.sort(desc.NumberKind())
return nil
}
// Update adds the recorded measurement to the current data set.
// Update takes a lock to prevent concurrent Update() and Checkpoint()
// Update takes a lock to prevent concurrent Update() and SynchronizedCopy()
// calls.
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
c.lock.Lock()
c.current = append(c.current, number)
c.points = append(c.points, number)
c.sum.AddNumber(desc.NumberKind(), number)
c.lock.Unlock()
return nil
}
@@ -125,21 +124,24 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentMergeError(c, oa)
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.ckptSum.AddNumber(desc.NumberKind(), o.ckptSum)
c.checkpoint = combine(c.checkpoint, o.checkpoint, desc.NumberKind())
// Note: Current assumption is that `o` was checkpointed,
// therefore is already sorted. See the TODO above, since
// this is an open question.
c.sum.AddNumber(desc.NumberKind(), o.sum)
c.points = combine(c.points, o.points, desc.NumberKind())
return nil
}
func (c *Aggregator) sort(kind metric.NumberKind) {
switch kind {
case metric.Float64NumberKind:
sort.Float64s(*(*[]float64)(unsafe.Pointer(&c.checkpoint)))
sort.Float64s(*(*[]float64)(unsafe.Pointer(&c.points)))
case metric.Int64NumberKind:
sort.Sort(&c.checkpoint)
sort.Sort(&c.points)
default:
// NOTE: This can't happen because the SDK doesn't
@@ -185,11 +187,11 @@ func (p *points) Swap(i, j int) {
// of a quantile.
func (p *points) Quantile(q float64) (metric.Number, error) {
if len(*p) == 0 {
return metric.Number(0), aggregation.ErrNoData
return 0, aggregation.ErrNoData
}
if q < 0 || q > 1 {
return metric.Number(0), aggregation.ErrInvalidQuantile
return 0, aggregation.ErrInvalidQuantile
}
if q == 0 || len(*p) == 1 {

View File

@@ -15,43 +15,55 @@
package array
import (
"errors"
"fmt"
"math"
"os"
"testing"
"unsafe"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/metric"
ottest "go.opentelemetry.io/otel/internal/testing"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
)
// Ensure struct alignment prior to running tests.
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "Aggregator.ckptSum",
Offset: unsafe.Offsetof(Aggregator{}.ckptSum),
},
}
if !ottest.Aligned8Byte(fields, os.Stderr) {
os.Exit(1)
}
os.Exit(m.Run())
}
type updateTest struct {
count int
}
func checkZero(t *testing.T, agg *Aggregator, desc *metric.Descriptor) {
kind := desc.NumberKind()
sum, err := agg.Sum()
require.NoError(t, err)
require.Equal(t, kind.Zero(), sum)
count, err := agg.Count()
require.NoError(t, err)
require.Equal(t, int64(0), count)
max, err := agg.Max()
require.True(t, errors.Is(err, aggregation.ErrNoData))
require.Equal(t, kind.Zero(), max)
min, err := agg.Min()
require.True(t, errors.Is(err, aggregation.ErrNoData))
require.Equal(t, kind.Zero(), min)
}
func new2() (_, _ *Aggregator) {
alloc := New(2)
return &alloc[0], &alloc[1]
}
func new4() (_, _, _, _ *Aggregator) {
alloc := New(4)
return &alloc[0], &alloc[1], &alloc[2], &alloc[3]
}
func (ut *updateTest) run(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New()
agg, ckpt := new2()
all := test.NewNumbers(profile.NumberKind)
@@ -65,11 +77,14 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}
agg.Checkpoint(descriptor)
err := agg.SynchronizedCopy(ckpt, descriptor)
require.NoError(t, err)
checkZero(t, agg, descriptor)
all.Sort()
sum, err := agg.Sum()
sum, err := ckpt.Sum()
require.Nil(t, err)
allSum := all.Sum()
require.InEpsilon(t,
@@ -77,19 +92,19 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
sum.CoerceToFloat64(profile.NumberKind),
0.0000001,
"Same sum")
count, err := agg.Count()
count, err := ckpt.Count()
require.Nil(t, err)
require.Equal(t, all.Count(), count, "Same count")
min, err := agg.Min()
min, err := ckpt.Min()
require.Nil(t, err)
require.Equal(t, all.Min(), min, "Same min")
max, err := agg.Max()
max, err := ckpt.Max()
require.Nil(t, err)
require.Equal(t, all.Max(), max, "Same max")
qx, err := agg.Quantile(0.5)
qx, err := ckpt.Quantile(0.5)
require.Nil(t, err)
require.Equal(t, all.Median(), qx, "Same median")
}
@@ -115,9 +130,7 @@ type mergeTest struct {
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg1 := New()
agg2 := New()
agg1, agg2, ckpt1, ckpt2 := new4()
all := test.NewNumbers(profile.NumberKind)
@@ -141,14 +154,17 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
test.CheckedMerge(t, agg1, agg2, descriptor)
checkZero(t, agg1, descriptor)
checkZero(t, agg2, descriptor)
test.CheckedMerge(t, ckpt1, ckpt2, descriptor)
all.Sort()
sum, err := agg1.Sum()
sum, err := ckpt1.Sum()
require.Nil(t, err)
allSum := all.Sum()
require.InEpsilon(t,
@@ -156,19 +172,19 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
sum.CoerceToFloat64(profile.NumberKind),
0.0000001,
"Same sum - absolute")
count, err := agg1.Count()
count, err := ckpt1.Count()
require.Nil(t, err)
require.Equal(t, all.Count(), count, "Same count - absolute")
min, err := agg1.Min()
min, err := ckpt1.Min()
require.Nil(t, err)
require.Equal(t, all.Min(), min, "Same min - absolute")
max, err := agg1.Max()
max, err := ckpt1.Max()
require.Nil(t, err)
require.Equal(t, all.Max(), max, "Same max - absolute")
qx, err := agg1.Quantile(0.5)
qx, err := ckpt1.Quantile(0.5)
require.Nil(t, err)
require.Equal(t, all.Median(), qx, "Same median - absolute")
}
@@ -195,17 +211,17 @@ func TestArrayMerge(t *testing.T) {
func TestArrayErrors(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg := New()
agg, ckpt := new2()
_, err := agg.Max()
_, err := ckpt.Max()
require.Error(t, err)
require.Equal(t, err, aggregation.ErrNoData)
_, err = agg.Min()
_, err = ckpt.Min()
require.Error(t, err)
require.Equal(t, err, aggregation.ErrNoData)
_, err = agg.Quantile(0.1)
_, err = ckpt.Quantile(0.1)
require.Error(t, err)
require.Equal(t, err, aggregation.ErrNoData)
@@ -216,23 +232,23 @@ func TestArrayErrors(t *testing.T) {
if profile.NumberKind == metric.Float64NumberKind {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(math.NaN()), descriptor)
}
agg.Checkpoint(descriptor)
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
count, err := agg.Count()
count, err := ckpt.Count()
require.Equal(t, int64(1), count, "NaN value was not counted")
require.Nil(t, err)
num, err := agg.Quantile(0)
num, err := ckpt.Quantile(0)
require.Nil(t, err)
require.Equal(t, num, metric.Number(0))
_, err = agg.Quantile(-0.0001)
_, err = ckpt.Quantile(-0.0001)
require.Error(t, err)
require.Equal(t, err, aggregation.ErrInvalidQuantile)
require.True(t, errors.Is(err, aggregation.ErrInvalidQuantile))
_, err = agg.Quantile(1.0001)
require.Error(t, err)
require.Equal(t, err, aggregation.ErrInvalidQuantile)
require.True(t, errors.Is(err, aggregation.ErrNoData))
})
}
@@ -269,7 +285,7 @@ func TestArrayFloat64(t *testing.T) {
all := test.NewNumbers(metric.Float64NumberKind)
agg := New()
agg, ckpt := new2()
for _, f := range fpsf(1) {
all.Append(metric.NewFloat64Number(f))
@@ -281,32 +297,32 @@ func TestArrayFloat64(t *testing.T) {
test.CheckedUpdate(t, agg, metric.NewFloat64Number(f), descriptor)
}
agg.Checkpoint(descriptor)
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
all.Sort()
sum, err := agg.Sum()
sum, err := ckpt.Sum()
require.Nil(t, err)
allSum := all.Sum()
require.InEpsilon(t, (&allSum).AsFloat64(), sum.AsFloat64(), 0.0000001, "Same sum")
count, err := agg.Count()
count, err := ckpt.Count()
require.Equal(t, all.Count(), count, "Same count")
require.Nil(t, err)
min, err := agg.Min()
min, err := ckpt.Min()
require.Nil(t, err)
require.Equal(t, all.Min(), min, "Same min")
max, err := agg.Max()
max, err := ckpt.Max()
require.Nil(t, err)
require.Equal(t, all.Max(), max, "Same max")
qx, err := agg.Quantile(0.5)
qx, err := ckpt.Quantile(0.5)
require.Nil(t, err)
require.Equal(t, all.Median(), qx, "Same median")
po, err := agg.Points()
po, err := ckpt.Points()
require.Nil(t, err)
require.Equal(t, all.Len(), len(po), "Points() must have same length of updates")
for i := 0; i < len(po); i++ {

View File

@@ -31,11 +31,10 @@ type Config = sdk.Config
// Aggregator aggregates events into a distribution.
type Aggregator struct {
lock sync.Mutex
cfg *Config
kind metric.NumberKind
current *sdk.DDSketch
checkpoint *sdk.DDSketch
lock sync.Mutex
cfg *Config
kind metric.NumberKind
sketch *sdk.DDSketch
}
var _ export.Aggregator = &Aggregator{}
@@ -43,13 +42,16 @@ var _ aggregation.MinMaxSumCount = &Aggregator{}
var _ aggregation.Distribution = &Aggregator{}
// New returns a new DDSketch aggregator.
func New(desc *metric.Descriptor, cfg *Config) *Aggregator {
return &Aggregator{
cfg: cfg,
kind: desc.NumberKind(),
current: sdk.NewDDSketch(cfg),
checkpoint: sdk.NewDDSketch(cfg),
func New(cnt int, desc *metric.Descriptor, cfg *Config) []Aggregator {
aggs := make([]Aggregator, cnt)
for i := range aggs {
aggs[i] = Aggregator{
cfg: cfg,
kind: desc.NumberKind(),
sketch: sdk.NewDDSketch(cfg),
}
}
return aggs
}
// Kind returns aggregation.SketchKind.
@@ -58,22 +60,18 @@ func (c *Aggregator) Kind() aggregation.Kind {
}
// NewDefaultConfig returns a new, default DDSketch config.
//
// TODO: Should the Config constructor set minValue to -Inf to
// when the descriptor has absolute=false? This requires providing
// values for alpha and maxNumBins, apparently.
func NewDefaultConfig() *Config {
return sdk.NewDefaultConfig()
}
// Sum returns the sum of values in the checkpoint.
func (c *Aggregator) Sum() (metric.Number, error) {
return c.toNumber(c.checkpoint.Sum()), nil
return c.toNumber(c.sketch.Sum()), nil
}
// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
return c.checkpoint.Count(), nil
return c.sketch.Count(), nil
}
// Max returns the maximum value in the checkpoint.
@@ -89,12 +87,12 @@ func (c *Aggregator) Min() (metric.Number, error) {
// Quantile returns the estimated quantile of data in the checkpoint.
// It is an error if `q` is less than 0 or greated than 1.
func (c *Aggregator) Quantile(q float64) (metric.Number, error) {
if c.checkpoint.Count() == 0 {
return metric.Number(0), aggregation.ErrNoData
if c.sketch.Count() == 0 {
return 0, aggregation.ErrNoData
}
f := c.checkpoint.Quantile(q)
f := c.sketch.Quantile(q)
if math.IsNaN(f) {
return metric.Number(0), aggregation.ErrInvalidQuantile
return 0, aggregation.ErrInvalidQuantile
}
return c.toNumber(f), nil
}
@@ -106,24 +104,29 @@ func (c *Aggregator) toNumber(f float64) metric.Number {
return metric.NewInt64Number(int64(f))
}
// Checkpoint saves the current state and resets the current state to
// the empty set, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) Checkpoint(*metric.Descriptor) {
// SynchronizedCopy saves the current state into oa and resets the current state to
// a new sketch, taking a lock to prevent concurrent Update() calls.
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
replace := sdk.NewDDSketch(c.cfg)
c.lock.Lock()
c.checkpoint = c.current
c.current = replace
o.sketch, c.sketch = c.sketch, replace
c.lock.Unlock()
return nil
}
// Update adds the recorded measurement to the current data set.
// Update takes a lock to prevent concurrent Update() and Checkpoint()
// Update takes a lock to prevent concurrent Update() and SynchronizedCopy()
// calls.
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
c.lock.Lock()
defer c.lock.Unlock()
c.current.Add(number.CoerceToFloat64(desc.NumberKind()))
c.sketch.Add(number.CoerceToFloat64(desc.NumberKind()))
return nil
}
@@ -131,9 +134,9 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
func (c *Aggregator) Merge(oa export.Aggregator, d *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentMergeError(c, oa)
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.checkpoint.Merge(o.checkpoint)
c.sketch.Merge(o.sketch)
return nil
}

View File

@@ -15,12 +15,14 @@
package ddsketch
import (
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/test"
)
@@ -29,9 +31,43 @@ const count = 1000
type updateTest struct {
}
func new2(desc *metric.Descriptor) (_, _ *Aggregator) {
alloc := New(2, desc, NewDefaultConfig())
return &alloc[0], &alloc[1]
}
func new4(desc *metric.Descriptor) (_, _, _, _ *Aggregator) {
alloc := New(4, desc, NewDefaultConfig())
return &alloc[0], &alloc[1], &alloc[2], &alloc[3]
}
func checkZero(t *testing.T, agg *Aggregator, desc *metric.Descriptor) {
kind := desc.NumberKind()
sum, err := agg.Sum()
require.NoError(t, err)
require.Equal(t, kind.Zero(), sum)
count, err := agg.Count()
require.NoError(t, err)
require.Equal(t, int64(0), count)
max, err := agg.Max()
require.True(t, errors.Is(err, aggregation.ErrNoData))
require.Equal(t, kind.Zero(), max)
median, err := agg.Quantile(0.5)
require.True(t, errors.Is(err, aggregation.ErrNoData))
require.Equal(t, kind.Zero(), median)
min, err := agg.Min()
require.True(t, errors.Is(err, aggregation.ErrNoData))
require.Equal(t, kind.Zero(), min)
}
func (ut *updateTest) run(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor, NewDefaultConfig())
agg, ckpt := new2(descriptor)
all := test.NewNumbers(profile.NumberKind)
for i := 0; i < count; i++ {
@@ -44,11 +80,14 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
test.CheckedUpdate(t, agg, y, descriptor)
}
agg.Checkpoint(descriptor)
err := agg.SynchronizedCopy(ckpt, descriptor)
require.NoError(t, err)
checkZero(t, agg, descriptor)
all.Sort()
sum, err := agg.Sum()
sum, err := ckpt.Sum()
require.Nil(t, err)
allSum := all.Sum()
require.InDelta(t,
@@ -57,18 +96,18 @@ func (ut *updateTest) run(t *testing.T, profile test.Profile) {
1,
"Same sum")
count, err := agg.Count()
count, err := ckpt.Count()
require.Equal(t, all.Count(), count, "Same count")
require.Nil(t, err)
max, err := agg.Max()
max, err := ckpt.Max()
require.Nil(t, err)
require.Equal(t,
all.Max(),
max,
"Same max")
median, err := agg.Quantile(0.5)
median, err := ckpt.Quantile(0.5)
require.Nil(t, err)
allMedian := all.Median()
require.InDelta(t,
@@ -90,8 +129,7 @@ type mergeTest struct {
func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg1 := New(descriptor, NewDefaultConfig())
agg2 := New(descriptor, NewDefaultConfig())
agg1, agg2, ckpt1, ckpt2 := new4(descriptor)
all := test.NewNumbers(profile.NumberKind)
for i := 0; i < count; i++ {
@@ -118,14 +156,17 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
}
}
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
test.CheckedMerge(t, agg1, agg2, descriptor)
checkZero(t, agg1, descriptor)
checkZero(t, agg1, descriptor)
test.CheckedMerge(t, ckpt1, ckpt2, descriptor)
all.Sort()
aggSum, err := agg1.Sum()
aggSum, err := ckpt1.Sum()
require.Nil(t, err)
allSum := all.Sum()
require.InDelta(t,
@@ -134,18 +175,18 @@ func (mt *mergeTest) run(t *testing.T, profile test.Profile) {
1,
"Same sum")
count, err := agg1.Count()
count, err := ckpt1.Count()
require.Equal(t, all.Count(), count, "Same count")
require.Nil(t, err)
max, err := agg1.Max()
max, err := ckpt1.Max()
require.Nil(t, err)
require.Equal(t,
all.Max(),
max,
"Same max")
median, err := agg1.Quantile(0.5)
median, err := ckpt1.Quantile(0.5)
require.Nil(t, err)
allMedian := all.Median()
require.InDelta(t,

View File

@@ -38,7 +38,7 @@ func benchmarkHistogramSearchFloat64(b *testing.B, size int) {
values[i] = rand.Float64() * inputRange
}
desc := test.NewAggregatorTest(metric.ValueRecorderKind, metric.Float64NumberKind)
agg := histogram.New(desc, boundaries)
agg := &histogram.New(1, desc, boundaries)[0]
ctx := context.Background()
b.ReportAllocs()
@@ -89,7 +89,7 @@ func benchmarkHistogramSearchInt64(b *testing.B, size int) {
values[i] = int64(rand.Float64() * inputRange)
}
desc := test.NewAggregatorTest(metric.ValueRecorderKind, metric.Int64NumberKind)
agg := histogram.New(desc, boundaries)
agg := &histogram.New(1, desc, boundaries)[0]
ctx := context.Background()
b.ReportAllocs()

View File

@@ -35,10 +35,9 @@ type (
// It also calculates the sum and count of all events.
Aggregator struct {
lock sync.Mutex
current state
checkpoint state
boundaries []float64
kind metric.NumberKind
state state
}
// state represents the state of a histogram, consisting of
@@ -64,7 +63,9 @@ var _ aggregation.Histogram = &Aggregator{}
// Note that this aggregator maintains each value using independent
// atomic operations, which introduces the possibility that
// checkpoints are inconsistent.
func New(desc *metric.Descriptor, boundaries []float64) *Aggregator {
func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator {
aggs := make([]Aggregator, cnt)
// Boundaries MUST be ordered otherwise the histogram could not
// be properly computed.
sortedBoundaries := make([]float64, len(boundaries))
@@ -72,12 +73,14 @@ func New(desc *metric.Descriptor, boundaries []float64) *Aggregator {
copy(sortedBoundaries, boundaries)
sort.Float64s(sortedBoundaries)
return &Aggregator{
kind: desc.NumberKind(),
boundaries: sortedBoundaries,
current: emptyState(sortedBoundaries),
checkpoint: emptyState(sortedBoundaries),
for i := range aggs {
aggs[i] = Aggregator{
kind: desc.NumberKind(),
boundaries: sortedBoundaries,
state: emptyState(sortedBoundaries),
}
}
return aggs
}
// Kind returns aggregation.HistogramKind.
@@ -87,36 +90,36 @@ func (c *Aggregator) Kind() aggregation.Kind {
// Sum returns the sum of all values in the checkpoint.
func (c *Aggregator) Sum() (metric.Number, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint.sum, nil
return c.state.sum, nil
}
// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
c.lock.Lock()
defer c.lock.Unlock()
return int64(c.checkpoint.count), nil
return int64(c.state.count), nil
}
// Histogram returns the count of events in pre-determined buckets.
func (c *Aggregator) Histogram() (aggregation.Buckets, error) {
c.lock.Lock()
defer c.lock.Unlock()
return aggregation.Buckets{
Boundaries: c.boundaries,
Counts: c.checkpoint.bucketCounts,
Counts: c.state.bucketCounts,
}, nil
}
// Checkpoint saves the current state and resets the current state to
// SynchronizedCopy saves the current state into oa and resets the current state to
// the empty set. Since no locks are taken, there is a chance that
// the independent Sum, Count and Bucket Count are not consistent with each
// other.
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.lock.Lock()
c.checkpoint, c.current = c.current, emptyState(c.boundaries)
o.state, c.state = c.state, emptyState(c.boundaries)
c.lock.Unlock()
return nil
}
func emptyState(boundaries []float64) state {
@@ -152,9 +155,9 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
c.lock.Lock()
defer c.lock.Unlock()
c.current.count.AddInt64(1)
c.current.sum.AddNumber(kind, number)
c.current.bucketCounts[bucketID]++
c.state.count.AddInt64(1)
c.state.sum.AddNumber(kind, number)
c.state.bucketCounts[bucketID]++
return nil
}
@@ -163,14 +166,14 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentMergeError(c, oa)
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.checkpoint.count.AddNumber(metric.Uint64NumberKind, o.checkpoint.count)
c.state.sum.AddNumber(desc.NumberKind(), o.state.sum)
c.state.count.AddNumber(metric.Uint64NumberKind, o.state.count)
for i := 0; i < len(c.checkpoint.bucketCounts); i++ {
c.checkpoint.bucketCounts[i] += o.checkpoint.bucketCounts[i]
for i := 0; i < len(c.state.bucketCounts); i++ {
c.state.bucketCounts[i] += o.state.bucketCounts[i]
}
return nil
}

View File

@@ -60,6 +60,35 @@ var (
boundaries = []float64{500, 250, 750}
)
func new2(desc *metric.Descriptor) (_, _ *histogram.Aggregator) {
alloc := histogram.New(2, desc, boundaries)
return &alloc[0], &alloc[1]
}
func new4(desc *metric.Descriptor) (_, _, _, _ *histogram.Aggregator) {
alloc := histogram.New(4, desc, boundaries)
return &alloc[0], &alloc[1], &alloc[2], &alloc[3]
}
func checkZero(t *testing.T, agg *histogram.Aggregator, desc *metric.Descriptor) {
asum, err := agg.Sum()
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")
require.NoError(t, err)
count, err := agg.Count()
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
require.NoError(t, err)
buckets, err := agg.Histogram()
require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range buckets.Counts {
require.Equal(t, uint64(0), uint64(bCount), "Bucket #%d must have 0 observed values", i)
}
}
func TestHistogramAbsolute(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
testHistogram(t, profile, positiveOnly)
@@ -82,7 +111,7 @@ func TestHistogramPositiveAndNegative(t *testing.T) {
func testHistogram(t *testing.T, profile test.Profile, policy policy) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := histogram.New(descriptor, boundaries)
agg, ckpt := new2(descriptor)
all := test.NewNumbers(profile.NumberKind)
@@ -92,11 +121,13 @@ func testHistogram(t *testing.T, profile test.Profile, policy policy) {
test.CheckedUpdate(t, agg, x, descriptor)
}
agg.Checkpoint(descriptor)
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
checkZero(t, agg, descriptor)
all.Sort()
asum, err := agg.Sum()
asum, err := ckpt.Sum()
sum := all.Sum()
require.InEpsilon(t,
sum.CoerceToFloat64(profile.NumberKind),
@@ -105,11 +136,11 @@ func testHistogram(t *testing.T, profile test.Profile, policy policy) {
"Same sum - "+policy.name)
require.NoError(t, err)
count, err := agg.Count()
count, err := ckpt.Count()
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
require.NoError(t, err)
buckets, err := agg.Histogram()
buckets, err := ckpt.Histogram()
require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
@@ -125,7 +156,7 @@ func TestHistogramInitial(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := histogram.New(descriptor, boundaries)
agg := &histogram.New(1, descriptor, boundaries)[0]
buckets, err := agg.Histogram()
require.NoError(t, err)
@@ -138,8 +169,7 @@ func TestHistogramMerge(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg1 := histogram.New(descriptor, boundaries)
agg2 := histogram.New(descriptor, boundaries)
agg1, agg2, ckpt1, ckpt2 := new4(descriptor)
all := test.NewNumbers(profile.NumberKind)
@@ -154,14 +184,14 @@ func TestHistogramMerge(t *testing.T) {
test.CheckedUpdate(t, agg2, x, descriptor)
}
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
test.CheckedMerge(t, agg1, agg2, descriptor)
test.CheckedMerge(t, ckpt1, ckpt2, descriptor)
all.Sort()
asum, err := agg1.Sum()
asum, err := ckpt1.Sum()
sum := all.Sum()
require.InEpsilon(t,
sum.CoerceToFloat64(profile.NumberKind),
@@ -170,11 +200,11 @@ func TestHistogramMerge(t *testing.T) {
"Same sum - absolute")
require.NoError(t, err)
count, err := agg1.Count()
count, err := ckpt1.Count()
require.Equal(t, all.Count(), count, "Same count - absolute")
require.NoError(t, err)
buckets, err := agg1.Histogram()
buckets, err := ckpt1.Histogram()
require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
@@ -191,24 +221,13 @@ func TestHistogramNotSet(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := histogram.New(descriptor, boundaries)
agg.Checkpoint(descriptor)
agg, ckpt := new2(descriptor)
asum, err := agg.Sum()
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")
err := agg.SynchronizedCopy(ckpt, descriptor)
require.NoError(t, err)
count, err := agg.Count()
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
require.NoError(t, err)
buckets, err := agg.Histogram()
require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range buckets.Counts {
require.Equal(t, uint64(0), uint64(bCount), "Bucket #%d must have 0 observed values", i)
}
checkZero(t, agg, descriptor)
checkZero(t, ckpt, descriptor)
})
}

View File

@@ -30,11 +30,8 @@ type (
// Aggregator aggregates lastValue events.
Aggregator struct {
// current is an atomic pointer to *lastValueData. It is never nil.
current unsafe.Pointer
// checkpoint is a copy of the current value taken in Checkpoint()
checkpoint unsafe.Pointer
// value is an atomic pointer to *lastValueData. It is never nil.
value unsafe.Pointer
}
// lastValueData stores the current value of a lastValue along with
@@ -61,11 +58,14 @@ var unsetLastValue = &lastValueData{}
// New returns a new lastValue aggregator. This aggregator retains the
// last value and timestamp that were recorded.
func New() *Aggregator {
return &Aggregator{
current: unsafe.Pointer(unsetLastValue),
checkpoint: unsafe.Pointer(unsetLastValue),
func New(cnt int) []Aggregator {
aggs := make([]Aggregator, cnt)
for i := range aggs {
aggs[i] = Aggregator{
value: unsafe.Pointer(unsetLastValue),
}
}
return aggs
}
// Kind returns aggregation.LastValueKind.
@@ -78,16 +78,21 @@ func (g *Aggregator) Kind() aggregation.Kind {
// will be returned if (due to a race condition) the checkpoint was
// computed before the first value was set.
func (g *Aggregator) LastValue() (metric.Number, time.Time, error) {
gd := (*lastValueData)(g.checkpoint)
gd := (*lastValueData)(g.value)
if gd == unsetLastValue {
return metric.Number(0), time.Time{}, aggregation.ErrNoData
return 0, time.Time{}, aggregation.ErrNoData
}
return gd.value.AsNumber(), gd.timestamp, nil
}
// Checkpoint atomically saves the current value.
func (g *Aggregator) Checkpoint(*metric.Descriptor) {
g.checkpoint = atomic.LoadPointer(&g.current)
// SynchronizedCopy atomically saves the current value.
func (g *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(g, oa)
}
o.value = atomic.SwapPointer(&g.value, unsafe.Pointer(unsetLastValue))
return nil
}
// Update atomically sets the current "last" value.
@@ -96,7 +101,7 @@ func (g *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
value: number,
timestamp: time.Now(),
}
atomic.StorePointer(&g.current, unsafe.Pointer(ngd))
atomic.StorePointer(&g.value, unsafe.Pointer(ngd))
return nil
}
@@ -105,16 +110,16 @@ func (g *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
func (g *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentMergeError(g, oa)
return aggregator.NewInconsistentAggregatorError(g, oa)
}
ggd := (*lastValueData)(atomic.LoadPointer(&g.checkpoint))
ogd := (*lastValueData)(atomic.LoadPointer(&o.checkpoint))
ggd := (*lastValueData)(atomic.LoadPointer(&g.value))
ogd := (*lastValueData)(atomic.LoadPointer(&o.value))
if ggd.timestamp.After(ogd.timestamp) {
return nil
}
g.checkpoint = unsafe.Pointer(ogd)
g.value = unsafe.Pointer(ogd)
return nil
}

View File

@@ -15,9 +15,11 @@
package lastvalue
import (
"errors"
"math/rand"
"os"
"testing"
"time"
"unsafe"
"github.com/stretchr/testify/require"
@@ -48,9 +50,26 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
func new2() (_, _ *Aggregator) {
alloc := New(2)
return &alloc[0], &alloc[1]
}
func new4() (_, _, _, _ *Aggregator) {
alloc := New(4)
return &alloc[0], &alloc[1], &alloc[2], &alloc[3]
}
func checkZero(t *testing.T, agg *Aggregator) {
lv, ts, err := agg.LastValue()
require.True(t, errors.Is(err, aggregation.ErrNoData))
require.Equal(t, time.Time{}, ts)
require.Equal(t, metric.Number(0), lv)
}
func TestLastValueUpdate(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg := New()
agg, ckpt := new2()
record := test.NewAggregatorTest(metric.ValueObserverKind, profile.NumberKind)
@@ -61,9 +80,10 @@ func TestLastValueUpdate(t *testing.T) {
test.CheckedUpdate(t, agg, x, record)
}
agg.Checkpoint(record)
err := agg.SynchronizedCopy(ckpt, record)
require.NoError(t, err)
lv, _, err := agg.LastValue()
lv, _, err := ckpt.LastValue()
require.Equal(t, last, lv, "Same last value - non-monotonic")
require.Nil(t, err)
})
@@ -71,8 +91,7 @@ func TestLastValueUpdate(t *testing.T) {
func TestLastValueMerge(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()
agg1, agg2, ckpt1, ckpt2 := new4()
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, profile.NumberKind)
@@ -83,18 +102,21 @@ func TestLastValueMerge(t *testing.T) {
test.CheckedUpdate(t, agg1, first1, descriptor)
test.CheckedUpdate(t, agg2, first2, descriptor)
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
_, t1, err := agg1.LastValue()
checkZero(t, agg1)
checkZero(t, agg2)
_, t1, err := ckpt1.LastValue()
require.Nil(t, err)
_, t2, err := agg2.LastValue()
_, t2, err := ckpt2.LastValue()
require.Nil(t, err)
require.True(t, t1.Before(t2))
test.CheckedMerge(t, agg1, agg2, descriptor)
test.CheckedMerge(t, ckpt1, ckpt2, descriptor)
lv, ts, err := agg1.LastValue()
lv, ts, err := ckpt1.LastValue()
require.Nil(t, err)
require.Equal(t, t2, ts, "Merged timestamp - non-monotonic")
require.Equal(t, first2, lv, "Merged value - non-monotonic")
@@ -104,11 +126,8 @@ func TestLastValueMerge(t *testing.T) {
func TestLastValueNotSet(t *testing.T) {
descriptor := test.NewAggregatorTest(metric.ValueObserverKind, metric.Int64NumberKind)
g := New()
g.Checkpoint(descriptor)
g, ckpt := new2()
require.NoError(t, g.SynchronizedCopy(ckpt, descriptor))
value, timestamp, err := g.LastValue()
require.Equal(t, aggregation.ErrNoData, err)
require.True(t, timestamp.IsZero())
require.Equal(t, metric.Number(0), value)
checkZero(t, g)
}

View File

@@ -28,17 +28,16 @@ type (
// Aggregator aggregates events that form a distribution,
// keeping only the min, max, sum, and count.
Aggregator struct {
lock sync.Mutex
current state
checkpoint state
kind metric.NumberKind
lock sync.Mutex
kind metric.NumberKind
state
}
state struct {
count metric.Number
sum metric.Number
min metric.Number
max metric.Number
count int64
}
)
@@ -49,18 +48,17 @@ var _ aggregation.MinMaxSumCount = &Aggregator{}
// count. It does not compute quantile information other than Min and
// Max.
//
// This type uses a mutex for Update() and Checkpoint() concurrency.
func New(desc *metric.Descriptor) *Aggregator {
// This type uses a mutex for Update() and SynchronizedCopy() concurrency.
func New(cnt int, desc *metric.Descriptor) []Aggregator {
kind := desc.NumberKind()
return &Aggregator{
kind: kind,
current: state{
count: metric.NewUint64Number(0),
sum: kind.Zero(),
min: kind.Maximum(),
max: kind.Minimum(),
},
aggs := make([]Aggregator, cnt)
for i := range aggs {
aggs[i] = Aggregator{
kind: kind,
state: emptyState(kind),
}
}
return aggs
}
// Kind returns aggregation.MinMaxSumCountKind.
@@ -70,55 +68,56 @@ func (c *Aggregator) Kind() aggregation.Kind {
// Sum returns the sum of values in the checkpoint.
func (c *Aggregator) Sum() (metric.Number, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint.sum, nil
return c.sum, nil
}
// Count returns the number of values in the checkpoint.
func (c *Aggregator) Count() (int64, error) {
c.lock.Lock()
defer c.lock.Unlock()
return c.checkpoint.count.CoerceToInt64(metric.Uint64NumberKind), nil
return c.count, nil
}
// Min returns the minimum value in the checkpoint.
// The error value aggregation.ErrNoData will be returned
// if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Min() (metric.Number, error) {
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint.count.IsZero(metric.Uint64NumberKind) {
return c.kind.Zero(), aggregation.ErrNoData
if c.count == 0 {
return 0, aggregation.ErrNoData
}
return c.checkpoint.min, nil
return c.min, nil
}
// Max returns the maximum value in the checkpoint.
// The error value aggregation.ErrNoData will be returned
// if there were no measurements recorded during the checkpoint.
func (c *Aggregator) Max() (metric.Number, error) {
c.lock.Lock()
defer c.lock.Unlock()
if c.checkpoint.count.IsZero(metric.Uint64NumberKind) {
return c.kind.Zero(), aggregation.ErrNoData
if c.count == 0 {
return 0, aggregation.ErrNoData
}
return c.checkpoint.max, nil
return c.max, nil
}
// Checkpoint saves the current state and resets the current state to
// SynchronizedCopy saves the current state into oa and resets the current state to
// the empty set.
func (c *Aggregator) Checkpoint(desc *metric.Descriptor) {
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
// TODO: It is incorrect to use an Aggregator of different
// kind. Should we test that o.kind == c.kind? (The same question
// occurs for several of the other aggregators in ../*.)
c.lock.Lock()
c.checkpoint, c.current = c.current, c.emptyState()
o.state, c.state = c.state, emptyState(c.kind)
c.lock.Unlock()
return nil
}
func (c *Aggregator) emptyState() state {
kind := c.kind
func emptyState(kind metric.NumberKind) state {
return state{
count: metric.NewUint64Number(0),
sum: kind.Zero(),
count: 0,
sum: 0,
min: kind.Maximum(),
max: kind.Minimum(),
}
@@ -130,13 +129,13 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
c.lock.Lock()
defer c.lock.Unlock()
c.current.count.AddInt64(1)
c.current.sum.AddNumber(kind, number)
if number.CompareNumber(kind, c.current.min) < 0 {
c.current.min = number
c.count++
c.sum.AddNumber(kind, number)
if number.CompareNumber(kind, c.min) < 0 {
c.min = number
}
if number.CompareNumber(kind, c.current.max) > 0 {
c.current.max = number
if number.CompareNumber(kind, c.max) > 0 {
c.max = number
}
return nil
}
@@ -145,17 +144,17 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentMergeError(c, oa)
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.checkpoint.count.AddNumber(metric.Uint64NumberKind, o.checkpoint.count)
c.checkpoint.sum.AddNumber(desc.NumberKind(), o.checkpoint.sum)
c.count += o.count
c.sum.AddNumber(desc.NumberKind(), o.sum)
if c.checkpoint.min.CompareNumber(desc.NumberKind(), o.checkpoint.min) > 0 {
c.checkpoint.min.SetNumber(o.checkpoint.min)
if c.min.CompareNumber(desc.NumberKind(), o.min) > 0 {
c.min.SetNumber(o.min)
}
if c.checkpoint.max.CompareNumber(desc.NumberKind(), o.checkpoint.max) < 0 {
c.checkpoint.max.SetNumber(o.checkpoint.max)
if c.max.CompareNumber(desc.NumberKind(), o.max) < 0 {
c.max.SetNumber(o.max)
}
return nil
}

View File

@@ -15,6 +15,7 @@
package minmaxsumcount
import (
"errors"
"math"
"math/rand"
"testing"
@@ -75,11 +76,41 @@ func TestMinMaxSumCountPositiveAndNegative(t *testing.T) {
})
}
func new2(desc *metric.Descriptor) (_, _ *Aggregator) {
alloc := New(2, desc)
return &alloc[0], &alloc[1]
}
func new4(desc *metric.Descriptor) (_, _, _, _ *Aggregator) {
alloc := New(4, desc)
return &alloc[0], &alloc[1], &alloc[2], &alloc[3]
}
func checkZero(t *testing.T, agg *Aggregator, desc *metric.Descriptor) {
kind := desc.NumberKind()
sum, err := agg.Sum()
require.NoError(t, err)
require.Equal(t, kind.Zero(), sum)
count, err := agg.Count()
require.NoError(t, err)
require.Equal(t, int64(0), count)
max, err := agg.Max()
require.True(t, errors.Is(err, aggregation.ErrNoData))
require.Equal(t, kind.Zero(), max)
min, err := agg.Min()
require.True(t, errors.Is(err, aggregation.ErrNoData))
require.Equal(t, kind.Zero(), min)
}
// Validates min, max, sum and count for a given profile and policy
func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor)
agg, ckpt := new2(descriptor)
all := test.NewNumbers(profile.NumberKind)
@@ -89,11 +120,13 @@ func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
test.CheckedUpdate(t, agg, x, descriptor)
}
agg.Checkpoint(descriptor)
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
checkZero(t, agg, descriptor)
all.Sort()
aggSum, err := agg.Sum()
aggSum, err := ckpt.Sum()
require.Nil(t, err)
allSum := all.Sum()
require.InEpsilon(t,
@@ -102,18 +135,18 @@ func minMaxSumCount(t *testing.T, profile test.Profile, policy policy) {
0.000000001,
"Same sum - "+policy.name)
count, err := agg.Count()
count, err := ckpt.Count()
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
require.Nil(t, err)
min, err := agg.Min()
min, err := ckpt.Min()
require.Nil(t, err)
require.Equal(t,
all.Min(),
min,
"Same min -"+policy.name)
max, err := agg.Max()
max, err := ckpt.Max()
require.Nil(t, err)
require.Equal(t,
all.Max(),
@@ -125,8 +158,7 @@ func TestMinMaxSumCountMerge(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg1 := New(descriptor)
agg2 := New(descriptor)
agg1, agg2, ckpt1, ckpt2 := new4(descriptor)
all := test.NewNumbers(profile.NumberKind)
@@ -141,14 +173,17 @@ func TestMinMaxSumCountMerge(t *testing.T) {
test.CheckedUpdate(t, agg2, x, descriptor)
}
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
test.CheckedMerge(t, agg1, agg2, descriptor)
checkZero(t, agg1, descriptor)
checkZero(t, agg2, descriptor)
test.CheckedMerge(t, ckpt1, ckpt2, descriptor)
all.Sort()
aggSum, err := agg1.Sum()
aggSum, err := ckpt1.Sum()
require.Nil(t, err)
allSum := all.Sum()
require.InEpsilon(t,
@@ -157,18 +192,18 @@ func TestMinMaxSumCountMerge(t *testing.T) {
0.000000001,
"Same sum - absolute")
count, err := agg1.Count()
count, err := ckpt1.Count()
require.Equal(t, all.Count(), count, "Same count - absolute")
require.Nil(t, err)
min, err := agg1.Min()
min, err := ckpt1.Min()
require.Nil(t, err)
require.Equal(t,
all.Min(),
min,
"Same min - absolute")
max, err := agg1.Max()
max, err := ckpt1.Max()
require.Nil(t, err)
require.Equal(t,
all.Max(),
@@ -181,18 +216,20 @@ func TestMaxSumCountNotSet(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
agg := New(descriptor)
agg.Checkpoint(descriptor)
alloc := New(2, descriptor)
agg, ckpt := &alloc[0], &alloc[1]
asum, err := agg.Sum()
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
asum, err := ckpt.Sum()
require.Equal(t, metric.Number(0), asum, "Empty checkpoint sum = 0")
require.Nil(t, err)
count, err := agg.Count()
count, err := ckpt.Count()
require.Equal(t, int64(0), count, "Empty checkpoint count = 0")
require.Nil(t, err)
max, err := agg.Max()
max, err := ckpt.Max()
require.Equal(t, aggregation.ErrNoData, err)
require.Equal(t, metric.Number(0), max)
})

View File

@@ -27,11 +27,7 @@ import (
type Aggregator struct {
// current holds current increments to this counter record
// current needs to be aligned for 64-bit atomic operations.
current metric.Number
// checkpoint is a temporary used during Checkpoint()
// checkpoint needs to be aligned for 64-bit atomic operations.
checkpoint metric.Number
value metric.Number
}
var _ export.Aggregator = &Aggregator{}
@@ -40,8 +36,8 @@ var _ aggregation.Sum = &Aggregator{}
// New returns a new counter aggregator implemented by atomic
// operations. This aggregator implements the aggregation.Sum
// export interface.
func New() *Aggregator {
return &Aggregator{}
func New(cnt int) []Aggregator {
return make([]Aggregator, cnt)
}
// Kind returns aggregation.SumKind.
@@ -52,18 +48,23 @@ func (c *Aggregator) Kind() aggregation.Kind {
// Sum returns the last-checkpointed sum. This will never return an
// error.
func (c *Aggregator) Sum() (metric.Number, error) {
return c.checkpoint, nil
return c.value, nil
}
// Checkpoint atomically saves the current value and resets the
// SynchronizedCopy atomically saves the current value into oa and resets the
// current sum to zero.
func (c *Aggregator) Checkpoint(*metric.Descriptor) {
c.checkpoint = c.current.SwapNumberAtomic(metric.Number(0))
func (c *Aggregator) SynchronizedCopy(oa export.Aggregator, _ *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentAggregatorError(c, oa)
}
o.value = c.value.SwapNumberAtomic(metric.Number(0))
return nil
}
// Update atomically adds to the current value.
func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metric.Descriptor) error {
c.current.AddNumberAtomic(desc.NumberKind(), number)
c.value.AddNumberAtomic(desc.NumberKind(), number)
return nil
}
@@ -71,8 +72,8 @@ func (c *Aggregator) Update(_ context.Context, number metric.Number, desc *metri
func (c *Aggregator) Merge(oa export.Aggregator, desc *metric.Descriptor) error {
o, _ := oa.(*Aggregator)
if o == nil {
return aggregator.NewInconsistentMergeError(c, oa)
return aggregator.NewInconsistentAggregatorError(c, oa)
}
c.checkpoint.AddNumber(desc.NumberKind(), o.checkpoint)
c.value.AddNumber(desc.NumberKind(), o.value)
return nil
}

View File

@@ -32,12 +32,8 @@ const count = 100
func TestMain(m *testing.M) {
fields := []ottest.FieldOffset{
{
Name: "Aggregator.current",
Offset: unsafe.Offsetof(Aggregator{}.current),
},
{
Name: "Aggregator.checkpoint",
Offset: unsafe.Offsetof(Aggregator{}.checkpoint),
Name: "Aggregator.value",
Offset: unsafe.Offsetof(Aggregator{}.value),
},
}
if !ottest.Aligned8Byte(fields, os.Stderr) {
@@ -47,9 +43,27 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}
func new2() (_, _ *Aggregator) {
alloc := New(2)
return &alloc[0], &alloc[1]
}
func new4() (_, _, _, _ *Aggregator) {
alloc := New(4)
return &alloc[0], &alloc[1], &alloc[2], &alloc[3]
}
func checkZero(t *testing.T, agg *Aggregator, desc *metric.Descriptor) {
kind := desc.NumberKind()
sum, err := agg.Sum()
require.NoError(t, err)
require.Equal(t, kind.Zero(), sum)
}
func TestCounterSum(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg := New()
agg, ckpt := new2()
descriptor := test.NewAggregatorTest(metric.CounterKind, profile.NumberKind)
@@ -60,9 +74,12 @@ func TestCounterSum(t *testing.T) {
test.CheckedUpdate(t, agg, x, descriptor)
}
agg.Checkpoint(descriptor)
err := agg.SynchronizedCopy(ckpt, descriptor)
require.NoError(t, err)
asum, err := agg.Sum()
checkZero(t, agg, descriptor)
asum, err := ckpt.Sum()
require.Equal(t, sum, asum, "Same sum - monotonic")
require.Nil(t, err)
})
@@ -70,7 +87,7 @@ func TestCounterSum(t *testing.T) {
func TestValueRecorderSum(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg := New()
agg, ckpt := new2()
descriptor := test.NewAggregatorTest(metric.ValueRecorderKind, profile.NumberKind)
@@ -85,9 +102,10 @@ func TestValueRecorderSum(t *testing.T) {
sum.AddNumber(profile.NumberKind, r2)
}
agg.Checkpoint(descriptor)
require.NoError(t, agg.SynchronizedCopy(ckpt, descriptor))
checkZero(t, agg, descriptor)
asum, err := agg.Sum()
asum, err := ckpt.Sum()
require.Equal(t, sum, asum, "Same sum - monotonic")
require.Nil(t, err)
})
@@ -95,8 +113,7 @@ func TestValueRecorderSum(t *testing.T) {
func TestCounterMerge(t *testing.T) {
test.RunProfiles(t, func(t *testing.T, profile test.Profile) {
agg1 := New()
agg2 := New()
agg1, agg2, ckpt1, ckpt2 := new4()
descriptor := test.NewAggregatorTest(metric.CounterKind, profile.NumberKind)
@@ -108,14 +125,17 @@ func TestCounterMerge(t *testing.T) {
test.CheckedUpdate(t, agg2, x, descriptor)
}
agg1.Checkpoint(descriptor)
agg2.Checkpoint(descriptor)
require.NoError(t, agg1.SynchronizedCopy(ckpt1, descriptor))
require.NoError(t, agg2.SynchronizedCopy(ckpt2, descriptor))
test.CheckedMerge(t, agg1, agg2, descriptor)
checkZero(t, agg1, descriptor)
checkZero(t, agg2, descriptor)
test.CheckedMerge(t, ckpt1, ckpt2, descriptor)
sum.AddNumber(descriptor.NumberKind(), sum)
asum, err := agg1.Sum()
asum, err := ckpt1.Sum()
require.Equal(t, sum, asum, "Same sum - monotonic")
require.Nil(t, err)
})

View File

@@ -18,7 +18,6 @@ import (
"context"
"fmt"
"math/rand"
"strings"
"testing"
"go.opentelemetry.io/otel/api/kv"
@@ -26,22 +25,21 @@ import (
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
)
type benchFixture struct {
meter metric.MeterMust
accumulator *sdk.Accumulator
B *testing.B
export.AggregationSelector
}
func newFixture(b *testing.B) *benchFixture {
b.ReportAllocs()
bf := &benchFixture{
B: b,
B: b,
AggregationSelector: test.AggregationSelector(),
}
bf.accumulator = sdk.NewAccumulator(bf)
@@ -49,25 +47,6 @@ func newFixture(b *testing.B) *benchFixture {
return bf
}
func (*benchFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
name := descriptor.Name()
switch {
case strings.HasSuffix(name, "counter"):
return sum.New()
case strings.HasSuffix(name, "lastvalue"):
return lastvalue.New()
default:
if strings.HasSuffix(descriptor.Name(), "minmaxsumcount") {
return minmaxsumcount.New(descriptor)
} else if strings.HasSuffix(descriptor.Name(), "ddsketch") {
return ddsketch.New(descriptor, ddsketch.NewDefaultConfig())
} else if strings.HasSuffix(descriptor.Name(), "array") {
return ddsketch.New(descriptor, ddsketch.NewDefaultConfig())
}
}
return nil
}
func (f *benchFixture) Process(rec export.Record) error {
return nil
}

View File

@@ -28,12 +28,13 @@ import (
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/test"
exporterTest "go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
controllerTest "go.opentelemetry.io/otel/sdk/metric/controller/test"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
integratorTest "go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/resource"
)
@@ -74,14 +75,12 @@ type testExporter struct {
}
type testFixture struct {
checkpointSet *test.CheckpointSet
checkpointSet *exporterTest.CheckpointSet
exporter *testExporter
}
type testSelector struct{}
func newFixture(t *testing.T) testFixture {
checkpointSet := test.NewCheckpointSet(testResource)
checkpointSet := exporterTest.NewCheckpointSet(testResource)
exporter := &testExporter{
t: t,
@@ -92,10 +91,6 @@ func newFixture(t *testing.T) testFixture {
}
}
func (testSelector) AggregatorFor(*metric.Descriptor) export.Aggregator {
return sum.New()
}
func (e *testExporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
e.lock.Lock()
defer e.lock.Unlock()
@@ -126,7 +121,7 @@ func (e *testExporter) resetRecords() ([]export.Record, int) {
func TestPushDoubleStop(t *testing.T) {
fix := newFixture(t)
p := push.New(testSelector{}, fix.exporter)
p := push.New(integratorTest.AggregationSelector(), fix.exporter)
p.Start()
p.Stop()
p.Stop()
@@ -134,7 +129,7 @@ func TestPushDoubleStop(t *testing.T) {
func TestPushDoubleStart(t *testing.T) {
fix := newFixture(t)
p := push.New(testSelector{}, fix.exporter)
p := push.New(test.AggregationSelector(), fix.exporter)
p.Start()
p.Start()
p.Stop()
@@ -144,7 +139,7 @@ func TestPushTicker(t *testing.T) {
fix := newFixture(t)
p := push.New(
testSelector{},
test.AggregationSelector(),
fix.exporter,
push.WithPeriod(time.Second),
push.WithResource(testResource),
@@ -156,7 +151,7 @@ func TestPushTicker(t *testing.T) {
ctx := context.Background()
counter := metric.Must(meter).NewInt64Counter("counter")
counter := metric.Must(meter).NewInt64Counter("counter.sum")
p.Start()
@@ -172,7 +167,7 @@ func TestPushTicker(t *testing.T) {
records, exports = fix.exporter.resetRecords()
require.Equal(t, 1, exports)
require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name())
require.Equal(t, "counter.sum", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
sum, err := records[0].Aggregator().(aggregation.Sum).Sum()
@@ -189,7 +184,7 @@ func TestPushTicker(t *testing.T) {
records, exports = fix.exporter.resetRecords()
require.Equal(t, 2, exports)
require.Equal(t, 1, len(records))
require.Equal(t, "counter", records[0].Descriptor().Name())
require.Equal(t, "counter.sum", records[0].Descriptor().Name())
require.Equal(t, "R=V", records[0].Resource().Encoded(label.DefaultEncoder()))
sum, err = records[0].Aggregator().(aggregation.Sum).Sum()
@@ -215,17 +210,17 @@ func TestPushExportError(t *testing.T) {
expectedDescriptors []string
expectedError error
}{
{"errNone", nil, []string{"counter1{R=V,X=Y}", "counter2{R=V,}"}, nil},
{"errNoData", aggregation.ErrNoData, []string{"counter2{R=V,}"}, nil},
{"errNone", nil, []string{"counter1.sum{R=V,X=Y}", "counter2.sum{R=V,}"}, nil},
{"errNoData", aggregation.ErrNoData, []string{"counter2.sum{R=V,}"}, nil},
{"errUnexpected", errAggregator, []string{}, errAggregator},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fix := newFixture(t)
fix.exporter.injectErr = injector("counter1", tt.injectedError)
fix.exporter.injectErr = injector("counter1.sum", tt.injectedError)
p := push.New(
testSelector{},
test.AggregationSelector(),
fix.exporter,
push.WithPeriod(time.Second),
push.WithResource(testResource),
@@ -237,8 +232,8 @@ func TestPushExportError(t *testing.T) {
ctx := context.Background()
meter := p.Provider().Meter("name")
counter1 := metric.Must(meter).NewInt64Counter("counter1")
counter2 := metric.Must(meter).NewInt64Counter("counter2")
counter1 := metric.Must(meter).NewInt64Counter("counter1.sum")
counter2 := metric.Must(meter).NewInt64Counter("counter2.sum")
p.Start()
runtime.Gosched()

View File

@@ -18,9 +18,7 @@ import (
"context"
"fmt"
"math"
"strings"
"sync"
"sync/atomic"
"testing"
"github.com/stretchr/testify/require"
@@ -32,8 +30,7 @@ import (
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
metricsdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
batchTest "go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/resource"
)
@@ -74,17 +71,27 @@ func init() {
}
type correctnessIntegrator struct {
newAggCount int64
t *testing.T
*testSelector
records []export.Record
}
type testSelector struct {
selector export.AggregationSelector
newAggCount int
}
func (ts *testSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
ts.newAggCount += len(aggPtrs)
test.AggregationSelector().AggregatorFor(desc, aggPtrs...)
}
func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessIntegrator) {
testHandler.Reset()
integrator := &correctnessIntegrator{
t: t,
t: t,
testSelector: &testSelector{selector: test.AggregationSelector()},
}
accum := metricsdk.NewAccumulator(
integrator,
@@ -94,23 +101,6 @@ func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessInt
return meter, accum, integrator
}
func (ci *correctnessIntegrator) AggregatorFor(descriptor *metric.Descriptor) (agg export.Aggregator) {
name := descriptor.Name()
switch {
case strings.HasSuffix(name, ".counter"):
agg = sum.New()
case strings.HasSuffix(name, ".disabled"):
agg = nil
default:
agg = array.New()
}
if agg != nil {
atomic.AddInt64(&ci.newAggCount, 1)
}
return
}
func (ci *correctnessIntegrator) CheckpointSet() export.CheckpointSet {
ci.t.Fatal("Should not be called")
return nil
@@ -128,7 +118,7 @@ func TestInputRangeCounter(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
counter := Must(meter).NewInt64Counter("name.counter")
counter := Must(meter).NewInt64Counter("name.sum")
counter.Add(ctx, -1)
require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
@@ -150,7 +140,7 @@ func TestInputRangeUpDownCounter(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
counter := Must(meter).NewInt64UpDownCounter("name.updowncounter")
counter := Must(meter).NewInt64UpDownCounter("name.sum")
counter.Add(ctx, -1)
counter.Add(ctx, -1)
@@ -169,7 +159,7 @@ func TestInputRangeValueRecorder(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
valuerecorder := Must(meter).NewFloat64ValueRecorder("name.valuerecorder")
valuerecorder := Must(meter).NewFloat64ValueRecorder("name.exact")
valuerecorder.Record(ctx, math.NaN())
require.Equal(t, aggregation.ErrNaNInput, testHandler.Flush())
@@ -207,7 +197,7 @@ func TestRecordNaN(t *testing.T) {
ctx := context.Background()
meter, _, _ := newSDK(t)
c := Must(meter).NewFloat64Counter("sum.name")
c := Must(meter).NewFloat64Counter("name.sum")
require.Nil(t, testHandler.Flush())
c.Add(ctx, math.NaN())
@@ -218,7 +208,7 @@ func TestSDKLabelsDeduplication(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
counter := Must(meter).NewInt64Counter("counter")
counter := Must(meter).NewInt64Counter("name.sum")
const (
maxKeys = 21
@@ -317,13 +307,13 @@ func TestObserverCollection(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
_ = Must(meter).NewFloat64ValueObserver("float.valueobserver", func(_ context.Context, result metric.Float64ObserverResult) {
_ = Must(meter).NewFloat64ValueObserver("float.valueobserver.lastvalue", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(1, kv.String("A", "B"))
// last value wins
result.Observe(-1, kv.String("A", "B"))
result.Observe(-1, kv.String("C", "D"))
})
_ = Must(meter).NewInt64ValueObserver("int.valueobserver", func(_ context.Context, result metric.Int64ObserverResult) {
_ = Must(meter).NewInt64ValueObserver("int.valueobserver.lastvalue", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(-1, kv.String("A", "B"))
result.Observe(1)
// last value wins
@@ -331,12 +321,12 @@ func TestObserverCollection(t *testing.T) {
result.Observe(1)
})
_ = Must(meter).NewFloat64SumObserver("float.sumobserver", func(_ context.Context, result metric.Float64ObserverResult) {
_ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(1, kv.String("A", "B"))
result.Observe(2, kv.String("A", "B"))
result.Observe(1, kv.String("C", "D"))
})
_ = Must(meter).NewInt64SumObserver("int.sumobserver", func(_ context.Context, result metric.Int64ObserverResult) {
_ = Must(meter).NewInt64SumObserver("int.sumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(2, kv.String("A", "B"))
result.Observe(1)
// last value wins
@@ -344,12 +334,12 @@ func TestObserverCollection(t *testing.T) {
result.Observe(1)
})
_ = Must(meter).NewFloat64UpDownSumObserver("float.updownsumobserver", func(_ context.Context, result metric.Float64ObserverResult) {
_ = Must(meter).NewFloat64UpDownSumObserver("float.updownsumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(1, kv.String("A", "B"))
result.Observe(-2, kv.String("A", "B"))
result.Observe(1, kv.String("C", "D"))
})
_ = Must(meter).NewInt64UpDownSumObserver("int.updownsumobserver", func(_ context.Context, result metric.Int64ObserverResult) {
_ = Must(meter).NewInt64UpDownSumObserver("int.updownsumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(2, kv.String("A", "B"))
result.Observe(1)
// last value wins
@@ -357,7 +347,7 @@ func TestObserverCollection(t *testing.T) {
result.Observe(-1)
})
_ = Must(meter).NewInt64ValueObserver("empty.valueobserver", func(_ context.Context, result metric.Int64ObserverResult) {
_ = Must(meter).NewInt64ValueObserver("empty.valueobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) {
})
collected := sdk.Collect(ctx)
@@ -369,20 +359,20 @@ func TestObserverCollection(t *testing.T) {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"float.valueobserver/A=B/R=V": -1,
"float.valueobserver/C=D/R=V": -1,
"int.valueobserver//R=V": 1,
"int.valueobserver/A=B/R=V": 1,
"float.valueobserver.lastvalue/A=B/R=V": -1,
"float.valueobserver.lastvalue/C=D/R=V": -1,
"int.valueobserver.lastvalue//R=V": 1,
"int.valueobserver.lastvalue/A=B/R=V": 1,
"float.sumobserver/A=B/R=V": 2,
"float.sumobserver/C=D/R=V": 1,
"int.sumobserver//R=V": 1,
"int.sumobserver/A=B/R=V": 1,
"float.sumobserver.sum/A=B/R=V": 2,
"float.sumobserver.sum/C=D/R=V": 1,
"int.sumobserver.sum//R=V": 1,
"int.sumobserver.sum/A=B/R=V": 1,
"float.updownsumobserver/A=B/R=V": -2,
"float.updownsumobserver/C=D/R=V": 1,
"int.updownsumobserver//R=V": -1,
"int.updownsumobserver/A=B/R=V": 1,
"float.updownsumobserver.sum/A=B/R=V": -2,
"float.updownsumobserver.sum/C=D/R=V": 1,
"int.updownsumobserver.sum//R=V": -1,
"int.updownsumobserver.sum/A=B/R=V": 1,
}, out.Map)
}
@@ -390,13 +380,14 @@ func TestSumObserverInputRange(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
_ = Must(meter).NewFloat64SumObserver("float.sumobserver", func(_ context.Context, result metric.Float64ObserverResult) {
// TODO: these tests are testing for negative values, not for _descending values_. Fix.
_ = Must(meter).NewFloat64SumObserver("float.sumobserver.sum", func(_ context.Context, result metric.Float64ObserverResult) {
result.Observe(-2, kv.String("A", "B"))
require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
result.Observe(-1, kv.String("C", "D"))
require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
})
_ = Must(meter).NewInt64SumObserver("int.sumobserver", func(_ context.Context, result metric.Int64ObserverResult) {
_ = Must(meter).NewInt64SumObserver("int.sumobserver.sum", func(_ context.Context, result metric.Int64ObserverResult) {
result.Observe(-1, kv.String("A", "B"))
require.Equal(t, aggregation.ErrNegativeInput, testHandler.Flush())
result.Observe(-1)
@@ -455,12 +446,12 @@ func TestObserverBatch(t *testing.T) {
intUpDownSumObs.Observation(10),
)
})
floatValueObs = batch.NewFloat64ValueObserver("float.valueobserver")
intValueObs = batch.NewInt64ValueObserver("int.valueobserver")
floatSumObs = batch.NewFloat64SumObserver("float.sumobserver")
intSumObs = batch.NewInt64SumObserver("int.sumobserver")
floatUpDownSumObs = batch.NewFloat64UpDownSumObserver("float.updownsumobserver")
intUpDownSumObs = batch.NewInt64UpDownSumObserver("int.updownsumobserver")
floatValueObs = batch.NewFloat64ValueObserver("float.valueobserver.lastvalue")
intValueObs = batch.NewInt64ValueObserver("int.valueobserver.lastvalue")
floatSumObs = batch.NewFloat64SumObserver("float.sumobserver.sum")
intSumObs = batch.NewInt64SumObserver("int.sumobserver.sum")
floatUpDownSumObs = batch.NewFloat64UpDownSumObserver("float.updownsumobserver.sum")
intUpDownSumObs = batch.NewInt64UpDownSumObserver("int.updownsumobserver.sum")
collected := sdk.Collect(ctx)
@@ -471,20 +462,20 @@ func TestObserverBatch(t *testing.T) {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"float.sumobserver//R=V": 1.1,
"float.sumobserver/A=B/R=V": 1000,
"int.sumobserver//R=V": 10,
"int.sumobserver/A=B/R=V": 100,
"float.sumobserver.sum//R=V": 1.1,
"float.sumobserver.sum/A=B/R=V": 1000,
"int.sumobserver.sum//R=V": 10,
"int.sumobserver.sum/A=B/R=V": 100,
"int.updownsumobserver/A=B/R=V": -100,
"float.updownsumobserver/A=B/R=V": -1000,
"int.updownsumobserver//R=V": 10,
"float.updownsumobserver/C=D/R=V": -1,
"int.updownsumobserver.sum/A=B/R=V": -100,
"float.updownsumobserver.sum/A=B/R=V": -1000,
"int.updownsumobserver.sum//R=V": 10,
"float.updownsumobserver.sum/C=D/R=V": -1,
"float.valueobserver/A=B/R=V": -1,
"float.valueobserver/C=D/R=V": -1,
"int.valueobserver//R=V": 1,
"int.valueobserver/A=B/R=V": 1,
"float.valueobserver.lastvalue/A=B/R=V": -1,
"float.valueobserver.lastvalue/C=D/R=V": -1,
"int.valueobserver.lastvalue//R=V": 1,
"int.valueobserver.lastvalue/A=B/R=V": 1,
}, out.Map)
}
@@ -492,10 +483,10 @@ func TestRecordBatch(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
counter1 := Must(meter).NewInt64Counter("int64.counter")
counter2 := Must(meter).NewFloat64Counter("float64.counter")
valuerecorder1 := Must(meter).NewInt64ValueRecorder("int64.valuerecorder")
valuerecorder2 := Must(meter).NewFloat64ValueRecorder("float64.valuerecorder")
counter1 := Must(meter).NewInt64Counter("int64.sum")
counter2 := Must(meter).NewFloat64Counter("float64.sum")
valuerecorder1 := Must(meter).NewInt64ValueRecorder("int64.exact")
valuerecorder2 := Must(meter).NewFloat64ValueRecorder("float64.exact")
sdk.RecordBatch(
ctx,
@@ -516,10 +507,10 @@ func TestRecordBatch(t *testing.T) {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"int64.counter/A=B,C=D/R=V": 1,
"float64.counter/A=B,C=D/R=V": 2,
"int64.valuerecorder/A=B,C=D/R=V": 3,
"float64.valuerecorder/A=B,C=D/R=V": 4,
"int64.sum/A=B,C=D/R=V": 1,
"float64.sum/A=B,C=D/R=V": 2,
"int64.exact/A=B,C=D/R=V": 3,
"float64.exact/A=B,C=D/R=V": 4,
}, out.Map)
}
@@ -530,7 +521,7 @@ func TestRecordPersistence(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
c := Must(meter).NewFloat64Counter("sum.name")
c := Must(meter).NewFloat64Counter("name.sum")
b := c.Bind(kv.String("bound", "true"))
uk := kv.String("bound", "false")
@@ -540,7 +531,7 @@ func TestRecordPersistence(t *testing.T) {
sdk.Collect(ctx)
}
require.Equal(t, int64(2), integrator.newAggCount)
require.Equal(t, 4, integrator.newAggCount)
}
func TestIncorrectInstruments(t *testing.T) {
@@ -564,7 +555,7 @@ func TestIncorrectInstruments(t *testing.T) {
// Now try with instruments from another SDK.
var noopMeter metric.Meter
counter = metric.Must(noopMeter).NewInt64Counter("counter")
counter = metric.Must(noopMeter).NewInt64Counter("name.sum")
observer = metric.Must(noopMeter).NewBatchObserver(
func(context.Context, metric.BatchObserverResult) {},
).NewInt64ValueObserver("observer")
@@ -583,8 +574,8 @@ func TestSyncInAsync(t *testing.T) {
ctx := context.Background()
meter, sdk, integrator := newSDK(t)
counter := Must(meter).NewFloat64Counter("counter")
_ = Must(meter).NewInt64ValueObserver("observer",
counter := Must(meter).NewFloat64Counter("counter.sum")
_ = Must(meter).NewInt64ValueObserver("observer.lastvalue",
func(ctx context.Context, result metric.Int64ObserverResult) {
result.Observe(10)
counter.Add(ctx, 100)
@@ -598,7 +589,7 @@ func TestSyncInAsync(t *testing.T) {
_ = out.AddTo(rec)
}
require.EqualValues(t, map[string]float64{
"counter//R=V": 100,
"observer//R=V": 10,
"counter.sum//R=V": 100,
"observer.lastvalue//R=V": 10,
}, out.Map)
}

View File

@@ -20,13 +20,17 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
)
func TestStressInt64Histogram(t *testing.T) {
desc := metric.NewDescriptor("some_metric", metric.ValueRecorderKind, metric.Int64NumberKind)
h := histogram.New(&desc, []float64{25, 50, 75})
alloc := histogram.New(2, &desc, []float64{25, 50, 75})
h, ckpt := &alloc[0], &alloc[1]
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
@@ -44,10 +48,10 @@ func TestStressInt64Histogram(t *testing.T) {
startTime := time.Now()
for time.Since(startTime) < time.Second {
h.Checkpoint(&desc)
require.NoError(t, h.SynchronizedCopy(ckpt, &desc))
b, _ := h.Histogram()
c, _ := h.Count()
b, _ := ckpt.Histogram()
c, _ := ckpt.Count()
var realCount int64
for _, c := range b.Counts {

View File

@@ -89,7 +89,7 @@ func (b *Integrator) Process(record export.Record) error {
tmp := agg
// Note: the call to AggregatorFor() followed by Merge
// is effectively a Clone() operation.
agg = b.AggregatorFor(desc)
b.AggregatorFor(desc, &agg)
if err := agg.Merge(tmp, desc); err != nil {
return err
}

View File

@@ -18,66 +18,129 @@ import (
"context"
"testing"
"go.opentelemetry.io/otel/api/metric"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/integrator/simple"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
"go.opentelemetry.io/otel/sdk/resource"
)
// These tests use the ../test label encoding.
// Note: This var block and the helpers below will disappear in a
// future PR (see the draft in #799). The test has been completely
// rewritten there, so this code will simply be dropped.
var (
// Resource is applied to all test records built in this package.
Resource = resource.New(kv.String("R", "V"))
// LastValueADesc and LastValueBDesc group by "G"
LastValueADesc = metric.NewDescriptor(
"a.lastvalue", metric.ValueObserverKind, metric.Int64NumberKind)
LastValueBDesc = metric.NewDescriptor(
"b.lastvalue", metric.ValueObserverKind, metric.Int64NumberKind)
// CounterADesc and CounterBDesc group by "C"
CounterADesc = metric.NewDescriptor(
"a.sum", metric.CounterKind, metric.Int64NumberKind)
CounterBDesc = metric.NewDescriptor(
"b.sum", metric.CounterKind, metric.Int64NumberKind)
// LastValue groups are (labels1), (labels2+labels3)
// Counter groups are (labels1+labels2), (labels3)
// Labels1 has G=H and C=D
Labels1 = makeLabels(kv.String("G", "H"), kv.String("C", "D"))
// Labels2 has C=D and E=F
Labels2 = makeLabels(kv.String("C", "D"), kv.String("E", "F"))
// Labels3 is the empty set
Labels3 = makeLabels()
)
func makeLabels(labels ...kv.KeyValue) *label.Set {
s := label.NewSet(labels...)
return &s
}
// LastValueAgg returns a checkpointed lastValue aggregator w/ the specified descriptor and value.
func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator {
ctx := context.Background()
gagg := &lastvalue.New(1)[0]
_ = gagg.Update(ctx, metric.NewInt64Number(v), desc)
return gagg
}
// Convenience method for building a test exported lastValue record.
func NewLastValueRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record {
return export.NewRecord(desc, labels, Resource, LastValueAgg(desc, value))
}
// Convenience method for building a test exported counter record.
func NewCounterRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record {
return export.NewRecord(desc, labels, Resource, CounterAgg(desc, value))
}
// CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value.
func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator {
ctx := context.Background()
cagg := &sum.New(1)[0]
_ = cagg.Update(ctx, metric.NewInt64Number(v), desc)
return cagg
}
func TestSimpleStateless(t *testing.T) {
b := simple.New(test.NewAggregationSelector(), false)
b := simple.New(test.AggregationSelector(), false)
// Set initial lastValue values
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 10))
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels2, 20))
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels3, 30))
_ = b.Process(NewLastValueRecord(&LastValueADesc, Labels1, 10))
_ = b.Process(NewLastValueRecord(&LastValueADesc, Labels2, 20))
_ = b.Process(NewLastValueRecord(&LastValueADesc, Labels3, 30))
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 10))
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels2, 20))
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels3, 30))
_ = b.Process(NewLastValueRecord(&LastValueBDesc, Labels1, 10))
_ = b.Process(NewLastValueRecord(&LastValueBDesc, Labels2, 20))
_ = b.Process(NewLastValueRecord(&LastValueBDesc, Labels3, 30))
// Another lastValue Set for Labels1
_ = b.Process(test.NewLastValueRecord(&test.LastValueADesc, test.Labels1, 50))
_ = b.Process(test.NewLastValueRecord(&test.LastValueBDesc, test.Labels1, 50))
_ = b.Process(NewLastValueRecord(&LastValueADesc, Labels1, 50))
_ = b.Process(NewLastValueRecord(&LastValueBDesc, Labels1, 50))
// Set initial counter values
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10))
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels2, 20))
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels3, 40))
_ = b.Process(NewCounterRecord(&CounterADesc, Labels1, 10))
_ = b.Process(NewCounterRecord(&CounterADesc, Labels2, 20))
_ = b.Process(NewCounterRecord(&CounterADesc, Labels3, 40))
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10))
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels2, 20))
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels3, 40))
_ = b.Process(NewCounterRecord(&CounterBDesc, Labels1, 10))
_ = b.Process(NewCounterRecord(&CounterBDesc, Labels2, 20))
_ = b.Process(NewCounterRecord(&CounterBDesc, Labels3, 40))
// Another counter Add for Labels1
_ = b.Process(test.NewCounterRecord(&test.CounterADesc, test.Labels1, 50))
_ = b.Process(test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 50))
_ = b.Process(NewCounterRecord(&CounterADesc, Labels1, 50))
_ = b.Process(NewCounterRecord(&CounterBDesc, Labels1, 50))
checkpointSet := b.CheckpointSet()
records := test.NewOutput(test.SdkEncoder)
records := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records.AddTo)
// Output lastvalue should have only the "G=H" and "G=" keys.
// Output counter should have only the "C=D" and "C=" keys.
require.EqualValues(t, map[string]float64{
"sum.a/C~D&G~H/R~V": 60, // labels1
"sum.a/C~D&E~F/R~V": 20, // labels2
"sum.a//R~V": 40, // labels3
"sum.b/C~D&G~H/R~V": 60, // labels1
"sum.b/C~D&E~F/R~V": 20, // labels2
"sum.b//R~V": 40, // labels3
"lastvalue.a/C~D&G~H/R~V": 50, // labels1
"lastvalue.a/C~D&E~F/R~V": 20, // labels2
"lastvalue.a//R~V": 30, // labels3
"lastvalue.b/C~D&G~H/R~V": 50, // labels1
"lastvalue.b/C~D&E~F/R~V": 20, // labels2
"lastvalue.b//R~V": 30, // labels3
"a.sum/C=D,G=H/R=V": 60, // labels1
"a.sum/C=D,E=F/R=V": 20, // labels2
"a.sum//R=V": 40, // labels3
"b.sum/C=D,G=H/R=V": 60, // labels1
"b.sum/C=D,E=F/R=V": 20, // labels2
"b.sum//R=V": 40, // labels3
"a.lastvalue/C=D,G=H/R=V": 50, // labels1
"a.lastvalue/C=D,E=F/R=V": 20, // labels2
"a.lastvalue//R=V": 30, // labels3
"b.lastvalue/C=D,G=H/R=V": 50, // labels1
"b.lastvalue/C=D,E=F/R=V": 20, // labels2
"b.lastvalue//R=V": 30, // labels3
}, records.Map)
b.FinishedCollection()
@@ -92,64 +155,67 @@ func TestSimpleStateless(t *testing.T) {
func TestSimpleStateful(t *testing.T) {
ctx := context.Background()
b := simple.New(test.NewAggregationSelector(), true)
b := simple.New(test.AggregationSelector(), true)
counterA := test.NewCounterRecord(&test.CounterADesc, test.Labels1, 10)
caggA := counterA.Aggregator()
counterA := NewCounterRecord(&CounterADesc, Labels1, 10)
_ = b.Process(counterA)
counterB := test.NewCounterRecord(&test.CounterBDesc, test.Labels1, 10)
caggB := counterB.Aggregator()
counterB := NewCounterRecord(&CounterBDesc, Labels1, 10)
_ = b.Process(counterB)
checkpointSet := b.CheckpointSet()
b.FinishedCollection()
records1 := test.NewOutput(test.SdkEncoder)
records1 := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records1.AddTo)
require.EqualValues(t, map[string]float64{
"sum.a/C~D&G~H/R~V": 10, // labels1
"sum.b/C~D&G~H/R~V": 10, // labels1
"a.sum/C=D,G=H/R=V": 10, // labels1
"b.sum/C=D,G=H/R=V": 10, // labels1
}, records1.Map)
alloc := sum.New(4)
caggA, caggB, ckptA, ckptB := &alloc[0], &alloc[1], &alloc[2], &alloc[3]
// Test that state was NOT reset
checkpointSet = b.CheckpointSet()
records2 := test.NewOutput(test.SdkEncoder)
records2 := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records2.AddTo)
require.EqualValues(t, records1.Map, records2.Map)
b.FinishedCollection()
// Update and re-checkpoint the original record.
_ = caggA.Update(ctx, metric.NewInt64Number(20), &test.CounterADesc)
_ = caggB.Update(ctx, metric.NewInt64Number(20), &test.CounterBDesc)
caggA.Checkpoint(&test.CounterADesc)
caggB.Checkpoint(&test.CounterBDesc)
_ = caggA.Update(ctx, metric.NewInt64Number(20), &CounterADesc)
_ = caggB.Update(ctx, metric.NewInt64Number(20), &CounterBDesc)
err := caggA.SynchronizedCopy(ckptA, &CounterADesc)
require.NoError(t, err)
err = caggB.SynchronizedCopy(ckptB, &CounterBDesc)
require.NoError(t, err)
// As yet cagg has not been passed to Integrator.Process. Should
// not see an update.
checkpointSet = b.CheckpointSet()
records3 := test.NewOutput(test.SdkEncoder)
records3 := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records3.AddTo)
require.EqualValues(t, records1.Map, records3.Map)
b.FinishedCollection()
// Now process the second update
_ = b.Process(export.NewRecord(&test.CounterADesc, test.Labels1, test.Resource, caggA))
_ = b.Process(export.NewRecord(&test.CounterBDesc, test.Labels1, test.Resource, caggB))
_ = b.Process(export.NewRecord(&CounterADesc, Labels1, Resource, ckptA))
_ = b.Process(export.NewRecord(&CounterBDesc, Labels1, Resource, ckptB))
checkpointSet = b.CheckpointSet()
records4 := test.NewOutput(test.SdkEncoder)
records4 := test.NewOutput(label.DefaultEncoder())
_ = checkpointSet.ForEach(records4.AddTo)
require.EqualValues(t, map[string]float64{
"sum.a/C~D&G~H/R~V": 30,
"sum.b/C~D&G~H/R~V": 30,
"a.sum/C=D,G=H/R=V": 30,
"b.sum/C=D,G=H/R=V": 30,
}, records4.Map)
b.FinishedCollection()
}

View File

@@ -15,24 +15,22 @@
package test
import (
"context"
"fmt"
"strings"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/label"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/resource"
)
type (
// Encoder is an alternate label encoder to validate grouping logic.
Encoder struct{}
// Output collects distinct metric/label set outputs.
Output struct {
Map map[string]float64
@@ -45,39 +43,6 @@ type (
testAggregationSelector struct{}
)
var (
// Resource is applied to all test records built in this package.
Resource = resource.New(kv.String("R", "V"))
// LastValueADesc and LastValueBDesc group by "G"
LastValueADesc = metric.NewDescriptor(
"lastvalue.a", metric.ValueObserverKind, metric.Int64NumberKind)
LastValueBDesc = metric.NewDescriptor(
"lastvalue.b", metric.ValueObserverKind, metric.Int64NumberKind)
// CounterADesc and CounterBDesc group by "C"
CounterADesc = metric.NewDescriptor(
"sum.a", metric.CounterKind, metric.Int64NumberKind)
CounterBDesc = metric.NewDescriptor(
"sum.b", metric.CounterKind, metric.Int64NumberKind)
// SdkEncoder uses a non-standard encoder like K1~V1&K2~V2
SdkEncoder = &Encoder{}
// GroupEncoder uses the SDK default encoder
GroupEncoder = label.DefaultEncoder()
// LastValue groups are (labels1), (labels2+labels3)
// Counter groups are (labels1+labels2), (labels3)
// Labels1 has G=H and C=D
Labels1 = makeLabels(kv.String("G", "H"), kv.String("C", "D"))
// Labels2 has C=D and E=F
Labels2 = makeLabels(kv.String("C", "D"), kv.String("E", "F"))
// Labels3 is the empty set
Labels3 = makeLabels()
testLabelEncoderID = label.NewEncoderID()
)
func NewOutput(labelEncoder label.Encoder) Output {
return Output{
Map: make(map[string]float64),
@@ -85,73 +50,53 @@ func NewOutput(labelEncoder label.Encoder) Output {
}
}
// NewAggregationSelector returns a policy that is consistent with the
// AggregationSelector returns a policy that is consistent with the
// test descriptors above. I.e., it returns sum.New() for counter
// instruments and lastvalue.New() for lastValue instruments.
func NewAggregationSelector() export.AggregationSelector {
return &testAggregationSelector{}
func AggregationSelector() export.AggregationSelector {
return testAggregationSelector{}
}
func (*testAggregationSelector) AggregatorFor(desc *metric.Descriptor) export.Aggregator {
switch desc.MetricKind() {
case metric.CounterKind:
return sum.New()
case metric.ValueObserverKind:
return lastvalue.New()
default:
panic("Invalid descriptor MetricKind for this test")
}
}
func (testAggregationSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...*export.Aggregator) {
func makeLabels(labels ...kv.KeyValue) *label.Set {
s := label.NewSet(labels...)
return &s
}
func (Encoder) Encode(iter label.Iterator) string {
var sb strings.Builder
for iter.Next() {
i, l := iter.IndexedLabel()
if i > 0 {
sb.WriteString("&")
switch {
case strings.HasSuffix(desc.Name(), ".disabled"):
for i := range aggPtrs {
*aggPtrs[i] = nil
}
sb.WriteString(string(l.Key))
sb.WriteString("~")
sb.WriteString(l.Value.Emit())
case strings.HasSuffix(desc.Name(), ".sum"):
aggs := sum.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
case strings.HasSuffix(desc.Name(), ".minmaxsumcount"):
aggs := minmaxsumcount.New(len(aggPtrs), desc)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
case strings.HasSuffix(desc.Name(), ".lastvalue"):
aggs := lastvalue.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
case strings.HasSuffix(desc.Name(), ".sketch"):
aggs := ddsketch.New(len(aggPtrs), desc, ddsketch.NewDefaultConfig())
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
case strings.HasSuffix(desc.Name(), ".histogram"):
aggs := histogram.New(len(aggPtrs), desc, nil)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
case strings.HasSuffix(desc.Name(), ".exact"):
aggs := array.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
default:
panic(fmt.Sprint("Invalid instrument name for test AggregationSelector: ", desc.Name()))
}
return sb.String()
}
func (Encoder) ID() label.EncoderID {
return testLabelEncoderID
}
// LastValueAgg returns a checkpointed lastValue aggregator w/ the specified descriptor and value.
func LastValueAgg(desc *metric.Descriptor, v int64) export.Aggregator {
ctx := context.Background()
gagg := lastvalue.New()
_ = gagg.Update(ctx, metric.NewInt64Number(v), desc)
gagg.Checkpoint(desc)
return gagg
}
// Convenience method for building a test exported lastValue record.
func NewLastValueRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record {
return export.NewRecord(desc, labels, Resource, LastValueAgg(desc, value))
}
// Convenience method for building a test exported counter record.
func NewCounterRecord(desc *metric.Descriptor, labels *label.Set, value int64) export.Record {
return export.NewRecord(desc, labels, Resource, CounterAgg(desc, value))
}
// CounterAgg returns a checkpointed counter aggregator w/ the specified descriptor and value.
func CounterAgg(desc *metric.Descriptor, v int64) export.Aggregator {
ctx := context.Background()
cagg := sum.New()
_ = cagg.Update(ctx, metric.NewInt64Number(v), desc)
cagg.Checkpoint(desc)
return cagg
}
// AddTo adds a name/label-encoding entry with the lastValue or counter

View File

@@ -26,7 +26,8 @@ import (
func TestStressInt64MinMaxSumCount(t *testing.T) {
desc := metric.NewDescriptor("some_metric", metric.ValueRecorderKind, metric.Int64NumberKind)
mmsc := minmaxsumcount.New(&desc)
alloc := minmaxsumcount.New(2, &desc)
mmsc, ckpt := &alloc[0], &alloc[1]
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -46,12 +47,12 @@ func TestStressInt64MinMaxSumCount(t *testing.T) {
startTime := time.Now()
for time.Since(startTime) < time.Second {
mmsc.Checkpoint(&desc)
_ = mmsc.SynchronizedCopy(ckpt, &desc)
s, _ := mmsc.Sum()
c, _ := mmsc.Count()
min, e1 := mmsc.Min()
max, e2 := mmsc.Max()
s, _ := ckpt.Sum()
c, _ := ckpt.Count()
min, e1 := ckpt.Min()
max, e2 := ckpt.Max()
if c == 0 && (e1 == nil || e2 == nil || s.AsInt64() != 0) {
t.Fail()
}

View File

@@ -116,10 +116,11 @@ type (
// inst is a pointer to the corresponding instrument.
inst *syncInstrument
// recorder implements the actual RecordOne() API,
// current implements the actual RecordOne() API,
// depending on the type of aggregation. If nil, the
// metric was disabled by the exporter.
recorder export.Aggregator
current export.Aggregator
checkpoint export.Aggregator
}
instrument struct {
@@ -137,7 +138,7 @@ type (
labeledRecorder struct {
observedEpoch int64
labels *label.Set
recorder export.Aggregator
observed export.Aggregator
}
)
@@ -185,14 +186,15 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator {
if lrec.observedEpoch == a.meter.currentEpoch {
// last value wins for Observers, so if we see the same labels
// in the current epoch, we replace the old recorder
lrec.recorder = a.meter.integrator.AggregatorFor(&a.descriptor)
a.meter.integrator.AggregatorFor(&a.descriptor, &lrec.observed)
} else {
lrec.observedEpoch = a.meter.currentEpoch
}
a.recorders[labels.Equivalent()] = lrec
return lrec.recorder
return lrec.observed
}
rec := a.meter.integrator.AggregatorFor(&a.descriptor)
var rec export.Aggregator
a.meter.integrator.AggregatorFor(&a.descriptor, &rec)
if a.recorders == nil {
a.recorders = make(map[label.Distinct]*labeledRecorder)
}
@@ -200,7 +202,7 @@ func (a *asyncInstrument) getRecorder(labels *label.Set) export.Aggregator {
// asyncInstrument for the labelset for good. This is intentional,
// but will be revisited later.
a.recorders[labels.Equivalent()] = &labeledRecorder{
recorder: rec,
observed: rec,
labels: labels,
observedEpoch: a.meter.currentEpoch,
}
@@ -252,7 +254,8 @@ func (s *syncInstrument) acquireHandle(kvs []kv.KeyValue, labelPtr *label.Set) *
}
rec.refMapped = refcountMapped{value: 2}
rec.inst = s
rec.recorder = s.meter.integrator.AggregatorFor(&s.descriptor)
s.meter.integrator.AggregatorFor(&s.descriptor, &rec.current, &rec.checkpoint)
for {
// Load/Store: there's a memory allocation to place `mk` into
@@ -432,7 +435,21 @@ func (m *Accumulator) observeAsyncInstruments(ctx context.Context) int {
}
func (m *Accumulator) checkpointRecord(r *record) int {
return m.checkpoint(&r.inst.descriptor, r.recorder, r.labels)
if r.current == nil {
return 0
}
err := r.current.SynchronizedCopy(r.checkpoint, &r.inst.descriptor)
if err != nil {
global.Handle(err)
return 0
}
exportRecord := export.NewRecord(&r.inst.descriptor, r.labels, m.resource, r.checkpoint)
err = m.integrator.Process(exportRecord)
if err != nil {
global.Handle(err)
}
return 1
}
func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
@@ -444,7 +461,14 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
lrec := lrec
epochDiff := m.currentEpoch - lrec.observedEpoch
if epochDiff == 0 {
checkpointed += m.checkpoint(&a.descriptor, lrec.recorder, lrec.labels)
if lrec.observed != nil {
exportRecord := export.NewRecord(&a.descriptor, lrec.labels, m.resource, lrec.observed)
err := m.integrator.Process(exportRecord)
if err != nil {
global.Handle(err)
}
checkpointed++
}
} else if epochDiff > 1 {
// This is second collection cycle with no
// observations for this labelset. Remove the
@@ -458,20 +482,6 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int {
return checkpointed
}
func (m *Accumulator) checkpoint(descriptor *metric.Descriptor, recorder export.Aggregator, labels *label.Set) int {
if recorder == nil {
return 0
}
recorder.Checkpoint(descriptor)
exportRecord := export.NewRecord(descriptor, labels, m.resource, recorder)
err := m.integrator.Process(exportRecord)
if err != nil {
global.Handle(err)
}
return 1
}
// RecordBatch enters a batch of metric events.
func (m *Accumulator) RecordBatch(ctx context.Context, kvs []kv.KeyValue, measurements ...api.Measurement) {
// Labels will be computed the first time acquireHandle is
@@ -498,7 +508,7 @@ func (m *Accumulator) RecordBatch(ctx context.Context, kvs []kv.KeyValue, measur
// RecordOne implements api.SyncImpl.
func (r *record) RecordOne(ctx context.Context, number api.Number) {
if r.recorder == nil {
if r.current == nil {
// The instrument is disabled according to the AggregationSelector.
return
}
@@ -506,7 +516,7 @@ func (r *record) RecordOne(ctx context.Context, number api.Number) {
global.Handle(err)
return
}
if err := r.recorder.Update(ctx, number, &r.inst.descriptor); err != nil {
if err := r.current.Update(ctx, number, &r.inst.descriptor); err != nil {
global.Handle(err)
return
}

View File

@@ -79,38 +79,57 @@ func NewWithHistogramDistribution(boundaries []float64) export.AggregationSelect
return selectorHistogram{boundaries: boundaries}
}
func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
switch descriptor.MetricKind() {
case metric.ValueObserverKind, metric.ValueRecorderKind:
return minmaxsumcount.New(descriptor)
default:
return sum.New()
func sumAggs(aggPtrs []*export.Aggregator) {
aggs := sum.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
}
func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
func (selectorInexpensive) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.MetricKind() {
case metric.ValueObserverKind, metric.ValueRecorderKind:
return ddsketch.New(descriptor, s.config)
aggs := minmaxsumcount.New(len(aggPtrs), descriptor)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
default:
return sum.New()
sumAggs(aggPtrs)
}
}
func (selectorExact) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
func (s selectorSketch) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.MetricKind() {
case metric.ValueObserverKind, metric.ValueRecorderKind:
return array.New()
aggs := ddsketch.New(len(aggPtrs), descriptor, s.config)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
default:
return sum.New()
sumAggs(aggPtrs)
}
}
func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
func (selectorExact) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.MetricKind() {
case metric.ValueObserverKind, metric.ValueRecorderKind:
return histogram.New(descriptor, s.boundaries)
aggs := array.New(len(aggPtrs))
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
default:
return sum.New()
sumAggs(aggPtrs)
}
}
func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor, aggPtrs ...*export.Aggregator) {
switch descriptor.MetricKind() {
case metric.ValueObserverKind, metric.ValueRecorderKind:
aggs := histogram.New(len(aggPtrs), descriptor, s.boundaries)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
default:
sumAggs(aggPtrs)
}
}

View File

@@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/array"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
@@ -34,30 +35,36 @@ var (
testValueObserverDesc = metric.NewDescriptor("valueobserver", metric.ValueObserverKind, metric.Int64NumberKind)
)
func oneAgg(sel export.AggregationSelector, desc *metric.Descriptor) export.Aggregator {
var agg export.Aggregator
sel.AggregatorFor(desc, &agg)
return agg
}
func TestInexpensiveDistribution(t *testing.T) {
inex := simple.NewWithInexpensiveDistribution()
require.NotPanics(t, func() { _ = inex.AggregatorFor(&testCounterDesc).(*sum.Aggregator) })
require.NotPanics(t, func() { _ = inex.AggregatorFor(&testValueRecorderDesc).(*minmaxsumcount.Aggregator) })
require.NotPanics(t, func() { _ = inex.AggregatorFor(&testValueObserverDesc).(*minmaxsumcount.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(inex, &testCounterDesc).(*sum.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(inex, &testValueRecorderDesc).(*minmaxsumcount.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(inex, &testValueObserverDesc).(*minmaxsumcount.Aggregator) })
}
func TestSketchDistribution(t *testing.T) {
sk := simple.NewWithSketchDistribution(ddsketch.NewDefaultConfig())
require.NotPanics(t, func() { _ = sk.AggregatorFor(&testCounterDesc).(*sum.Aggregator) })
require.NotPanics(t, func() { _ = sk.AggregatorFor(&testValueRecorderDesc).(*ddsketch.Aggregator) })
require.NotPanics(t, func() { _ = sk.AggregatorFor(&testValueObserverDesc).(*ddsketch.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(sk, &testCounterDesc).(*sum.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(sk, &testValueRecorderDesc).(*ddsketch.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(sk, &testValueObserverDesc).(*ddsketch.Aggregator) })
}
func TestExactDistribution(t *testing.T) {
ex := simple.NewWithExactDistribution()
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testCounterDesc).(*sum.Aggregator) })
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueRecorderDesc).(*array.Aggregator) })
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueObserverDesc).(*array.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(ex, &testCounterDesc).(*sum.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(ex, &testValueRecorderDesc).(*array.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(ex, &testValueObserverDesc).(*array.Aggregator) })
}
func TestHistogramDistribution(t *testing.T) {
ex := simple.NewWithHistogramDistribution(nil)
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testCounterDesc).(*sum.Aggregator) })
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueRecorderDesc).(*histogram.Aggregator) })
require.NotPanics(t, func() { _ = ex.AggregatorFor(&testValueObserverDesc).(*histogram.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(ex, &testCounterDesc).(*sum.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(ex, &testValueRecorderDesc).(*histogram.Aggregator) })
require.NotPanics(t, func() { _ = oneAgg(ex, &testValueObserverDesc).(*histogram.Aggregator) })
}

View File

@@ -36,8 +36,7 @@ import (
api "go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
"go.opentelemetry.io/otel/sdk/metric/integrator/test"
)
const (
@@ -59,6 +58,8 @@ type (
impl testImpl
T *testing.T
export.AggregationSelector
lock sync.Mutex
lused map[string]bool
@@ -244,18 +245,6 @@ func (f *testFixture) preCollect() {
f.dupCheck = map[testKey]int{}
}
func (*testFixture) AggregatorFor(descriptor *metric.Descriptor) export.Aggregator {
name := descriptor.Name()
switch {
case strings.HasSuffix(name, "counter"):
return sum.New()
case strings.HasSuffix(name, "lastvalue"):
return lastvalue.New()
default:
panic("Not implemented for this test")
}
}
func (*testFixture) CheckpointSet() export.CheckpointSet {
return nil
}
@@ -301,9 +290,10 @@ func stressTest(t *testing.T, impl testImpl) {
ctx := context.Background()
t.Parallel()
fixture := &testFixture{
T: t,
impl: impl,
lused: map[string]bool{},
T: t,
impl: impl,
lused: map[string]bool{},
AggregationSelector: test.AggregationSelector(),
}
cc := concurrency()
sdk := NewAccumulator(fixture)
@@ -353,7 +343,7 @@ func float64sEqual(a, b api.Number) bool {
func intCounterTestImpl() testImpl {
return testImpl{
newInstrument: func(meter api.Meter, name string) SyncImpler {
return Must(meter).NewInt64Counter(name + ".counter")
return Must(meter).NewInt64Counter(name + ".sum")
},
getUpdateValue: func() api.Number {
for {
@@ -391,7 +381,7 @@ func TestStressInt64Counter(t *testing.T) {
func floatCounterTestImpl() testImpl {
return testImpl{
newInstrument: func(meter api.Meter, name string) SyncImpler {
return Must(meter).NewFloat64Counter(name + ".counter")
return Must(meter).NewFloat64Counter(name + ".sum")
},
getUpdateValue: func() api.Number {
for {