You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-09-16 09:26:25 +02:00
Remove metric MinMaxSumCount kind aggregation (#2423)
* remove metric MinMaxSumCount kind aggregation * remove related files * fix unittest * fix changelog * remove reference the removed aggregation type Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
@@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
|
||||
- Remove the metric Processor's ability to convert cumulative to delta aggregation temporality. (#2350)
|
||||
- Remove the metric Bound Instruments interface and implementations. (#2399)
|
||||
- Remove the metric MinMaxSumCount kind aggregation and the corresponding OTLP export path. (#2423)
|
||||
|
||||
## [1.2.0] - 2021-11-12
|
||||
|
||||
|
@@ -42,8 +42,6 @@ replace go.opentelemetry.io/otel/example/otel-collector => ../../../example/otel
|
||||
|
||||
replace go.opentelemetry.io/otel/example/passthrough => ../../../example/passthrough
|
||||
|
||||
replace go.opentelemetry.io/otel/example/prom-collector => ../../../example/prom-collector
|
||||
|
||||
replace go.opentelemetry.io/otel/example/prometheus => ../../../example/prometheus
|
||||
|
||||
replace go.opentelemetry.io/otel/example/zipkin => ../../../example/zipkin
|
||||
|
@@ -239,13 +239,6 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.Metric, error) {
|
||||
func Record(temporalitySelector aggregation.TemporalitySelector, r export.Record) (*metricpb.Metric, error) {
|
||||
agg := r.Aggregation()
|
||||
switch agg.Kind() {
|
||||
case aggregation.MinMaxSumCountKind:
|
||||
mmsc, ok := agg.(aggregation.MinMaxSumCount)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg)
|
||||
}
|
||||
return minMaxSumCount(r, mmsc)
|
||||
|
||||
case aggregation.HistogramKind:
|
||||
h, ok := agg.(aggregation.Histogram)
|
||||
if !ok {
|
||||
@@ -390,64 +383,6 @@ func sumPoint(record export.Record, num number.Number, start, end time.Time, tem
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// minMaxSumCountValue returns the values of the MinMaxSumCount Aggregator
|
||||
// as discrete values.
|
||||
func minMaxSumCountValues(a aggregation.MinMaxSumCount) (min, max, sum number.Number, count uint64, err error) {
|
||||
if min, err = a.Min(); err != nil {
|
||||
return
|
||||
}
|
||||
if max, err = a.Max(); err != nil {
|
||||
return
|
||||
}
|
||||
if sum, err = a.Sum(); err != nil {
|
||||
return
|
||||
}
|
||||
if count, err = a.Count(); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// minMaxSumCount transforms a MinMaxSumCount Aggregator into an OTLP Metric.
|
||||
func minMaxSumCount(record export.Record, a aggregation.MinMaxSumCount) (*metricpb.Metric, error) {
|
||||
desc := record.Descriptor()
|
||||
labels := record.Labels()
|
||||
min, max, sum, count, err := minMaxSumCountValues(a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := &metricpb.Metric{
|
||||
Name: desc.Name(),
|
||||
Description: desc.Description(),
|
||||
Unit: string(desc.Unit()),
|
||||
Data: &metricpb.Metric_Summary{
|
||||
Summary: &metricpb.Summary{
|
||||
DataPoints: []*metricpb.SummaryDataPoint{
|
||||
{
|
||||
Sum: sum.CoerceToFloat64(desc.NumberKind()),
|
||||
Attributes: Iterator(labels.Iter()),
|
||||
StartTimeUnixNano: toNanos(record.StartTime()),
|
||||
TimeUnixNano: toNanos(record.EndTime()),
|
||||
Count: uint64(count),
|
||||
QuantileValues: []*metricpb.SummaryDataPoint_ValueAtQuantile{
|
||||
{
|
||||
Quantile: 0.0,
|
||||
Value: min.CoerceToFloat64(desc.NumberKind()),
|
||||
},
|
||||
{
|
||||
Quantile: 1.0,
|
||||
Value: max.CoerceToFloat64(desc.NumberKind()),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func histogramValues(a aggregation.Histogram) (boundaries []float64, counts []uint64, err error) {
|
||||
var buckets aggregation.Buckets
|
||||
if buckets, err = a.Histogram(); err != nil {
|
||||
|
@@ -31,7 +31,6 @@ import (
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
|
||||
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
|
||||
@@ -96,84 +95,6 @@ func TestStringKeyValues(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinMaxSumCountValue(t *testing.T) {
|
||||
mmscs := minmaxsumcount.New(2, &sdkapi.Descriptor{})
|
||||
mmsc, ckpt := &mmscs[0], &mmscs[1]
|
||||
|
||||
assert.NoError(t, mmsc.Update(context.Background(), 1, &sdkapi.Descriptor{}))
|
||||
assert.NoError(t, mmsc.Update(context.Background(), 10, &sdkapi.Descriptor{}))
|
||||
|
||||
// Prior to checkpointing ErrNoData should be returned.
|
||||
_, _, _, _, err := minMaxSumCountValues(ckpt)
|
||||
assert.EqualError(t, err, aggregation.ErrNoData.Error())
|
||||
|
||||
// Checkpoint to set non-zero values
|
||||
require.NoError(t, mmsc.SynchronizedMove(ckpt, &sdkapi.Descriptor{}))
|
||||
min, max, sum, count, err := minMaxSumCountValues(ckpt)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, min, number.NewInt64Number(1))
|
||||
assert.Equal(t, max, number.NewInt64Number(10))
|
||||
assert.Equal(t, sum, number.NewInt64Number(11))
|
||||
assert.Equal(t, count, uint64(2))
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinMaxSumCountDatapoints(t *testing.T) {
|
||||
desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Int64Kind)
|
||||
labels := attribute.NewSet(attribute.String("one", "1"))
|
||||
mmscs := minmaxsumcount.New(2, &sdkapi.Descriptor{})
|
||||
mmsc, ckpt := &mmscs[0], &mmscs[1]
|
||||
|
||||
assert.NoError(t, mmsc.Update(context.Background(), 1, &desc))
|
||||
assert.NoError(t, mmsc.Update(context.Background(), 10, &desc))
|
||||
require.NoError(t, mmsc.SynchronizedMove(ckpt, &desc))
|
||||
expected := []*metricpb.SummaryDataPoint{
|
||||
{
|
||||
Count: 2,
|
||||
Sum: 11,
|
||||
StartTimeUnixNano: uint64(intervalStart.UnixNano()),
|
||||
TimeUnixNano: uint64(intervalEnd.UnixNano()),
|
||||
Attributes: []*commonpb.KeyValue{
|
||||
{
|
||||
Key: "one",
|
||||
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "1"}},
|
||||
},
|
||||
},
|
||||
QuantileValues: []*metricpb.SummaryDataPoint_ValueAtQuantile{
|
||||
{
|
||||
Quantile: 0.0,
|
||||
Value: 1.0,
|
||||
},
|
||||
{
|
||||
Quantile: 1.0,
|
||||
Value: 10.0,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd)
|
||||
m, err := minMaxSumCount(record, ckpt)
|
||||
if assert.NoError(t, err) {
|
||||
assert.Nil(t, m.GetGauge())
|
||||
assert.Nil(t, m.GetSum())
|
||||
assert.Nil(t, m.GetHistogram())
|
||||
assert.Equal(t, expected, m.GetSummary().DataPoints)
|
||||
assert.Nil(t, m.GetIntGauge()) // nolint
|
||||
assert.Nil(t, m.GetIntSum()) // nolint
|
||||
assert.Nil(t, m.GetIntHistogram()) // nolint
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinMaxSumCountPropagatesErrors(t *testing.T) {
|
||||
// ErrNoData should be returned by both the Min and Max values of
|
||||
// a MinMaxSumCount Aggregator. Use this fact to check the error is
|
||||
// correctly returned.
|
||||
mmsc := &minmaxsumcount.New(1, &sdkapi.Descriptor{})[0]
|
||||
_, _, _, _, err := minMaxSumCountValues(mmsc)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, aggregation.ErrNoData, err)
|
||||
}
|
||||
|
||||
func TestSumIntDataPoints(t *testing.T) {
|
||||
desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Int64Kind)
|
||||
labels := attribute.NewSet(attribute.String("one", "1"))
|
||||
@@ -335,10 +256,6 @@ type testErrLastValue struct {
|
||||
err error
|
||||
}
|
||||
|
||||
type testErrMinMaxSumCount struct {
|
||||
testErrSum
|
||||
}
|
||||
|
||||
func (te *testErrLastValue) LastValue() (number.Number, time.Time, error) {
|
||||
return 0, time.Time{}, te.err
|
||||
}
|
||||
@@ -353,23 +270,10 @@ func (te *testErrSum) Kind() aggregation.Kind {
|
||||
return aggregation.SumKind
|
||||
}
|
||||
|
||||
func (te *testErrMinMaxSumCount) Min() (number.Number, error) {
|
||||
return 0, te.err
|
||||
}
|
||||
|
||||
func (te *testErrMinMaxSumCount) Max() (number.Number, error) {
|
||||
return 0, te.err
|
||||
}
|
||||
|
||||
func (te *testErrMinMaxSumCount) Count() (uint64, error) {
|
||||
return 0, te.err
|
||||
}
|
||||
|
||||
var _ export.Aggregator = &testAgg{}
|
||||
var _ aggregation.Aggregation = &testAgg{}
|
||||
var _ aggregation.Sum = &testErrSum{}
|
||||
var _ aggregation.LastValue = &testErrLastValue{}
|
||||
var _ aggregation.MinMaxSumCount = &testErrMinMaxSumCount{}
|
||||
|
||||
func TestRecordAggregatorIncompatibleErrors(t *testing.T) {
|
||||
makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) {
|
||||
@@ -393,12 +297,6 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
require.Nil(t, mpb)
|
||||
require.True(t, errors.Is(err, ErrIncompatibleAgg))
|
||||
|
||||
mpb, err = makeMpb(aggregation.MinMaxSumCountKind, &lastvalue.New(1)[0])
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, mpb)
|
||||
require.True(t, errors.Is(err, ErrIncompatibleAgg))
|
||||
}
|
||||
|
||||
func TestRecordAggregatorUnexpectedErrors(t *testing.T) {
|
||||
@@ -421,10 +319,4 @@ func TestRecordAggregatorUnexpectedErrors(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
require.Nil(t, mpb)
|
||||
require.True(t, errors.Is(err, errEx))
|
||||
|
||||
mpb, err = makeMpb(aggregation.MinMaxSumCountKind, &testErrMinMaxSumCount{testErrSum{errEx}})
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, mpb)
|
||||
require.True(t, errors.Is(err, errEx))
|
||||
}
|
||||
|
@@ -54,8 +54,6 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter
|
||||
instruments := map[string]data{
|
||||
"test-int64-counter": {sdkapi.CounterInstrumentKind, number.Int64Kind, 1},
|
||||
"test-float64-counter": {sdkapi.CounterInstrumentKind, number.Float64Kind, 1},
|
||||
"test-int64-histogram": {sdkapi.HistogramInstrumentKind, number.Int64Kind, 2},
|
||||
"test-float64-histogram": {sdkapi.HistogramInstrumentKind, number.Float64Kind, 2},
|
||||
"test-int64-gaugeobserver": {sdkapi.GaugeObserverInstrumentKind, number.Int64Kind, 3},
|
||||
"test-float64-gaugeobserver": {sdkapi.GaugeObserverInstrumentKind, number.Float64Kind, 3},
|
||||
}
|
||||
|
@@ -37,8 +37,6 @@ var _ exportmetric.Exporter = &metricExporter{}
|
||||
|
||||
type line struct {
|
||||
Name string `json:"Name"`
|
||||
Min interface{} `json:"Min,omitempty"`
|
||||
Max interface{} `json:"Max,omitempty"`
|
||||
Sum interface{} `json:"Sum,omitempty"`
|
||||
Count interface{} `json:"Count,omitempty"`
|
||||
LastValue interface{} `json:"Last,omitempty"`
|
||||
@@ -83,26 +81,6 @@ func (e *metricExporter) Export(_ context.Context, res *resource.Resource, reade
|
||||
return err
|
||||
}
|
||||
expose.Sum = value.AsInterface(kind)
|
||||
}
|
||||
|
||||
if mmsc, ok := agg.(aggregation.MinMaxSumCount); ok {
|
||||
count, err := mmsc.Count()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expose.Count = count
|
||||
|
||||
max, err := mmsc.Max()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expose.Max = max.AsInterface(kind)
|
||||
|
||||
min, err := mmsc.Min()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
expose.Min = min.AsInterface(kind)
|
||||
} else if lv, ok := agg.(aggregation.LastValue); ok {
|
||||
value, timestamp, err := lv.LastValue()
|
||||
if err != nil {
|
||||
|
@@ -156,18 +156,6 @@ func TestStdoutLastValueFormat(t *testing.T) {
|
||||
require.Equal(t, `[{"Name":"name.lastvalue{R=V,instrumentation.name=test,A=B,C=D}","Last":123.456}]`, fix.Output())
|
||||
}
|
||||
|
||||
func TestStdoutMinMaxSumCount(t *testing.T) {
|
||||
fix := newFixture(t)
|
||||
|
||||
counter := metric.Must(fix.meter).NewFloat64Counter("name.minmaxsumcount")
|
||||
counter.Add(fix.ctx, 123.456, attribute.String("A", "B"), attribute.String("C", "D"))
|
||||
counter.Add(fix.ctx, 876.543, attribute.String("A", "B"), attribute.String("C", "D"))
|
||||
|
||||
require.NoError(t, fix.cont.Stop(fix.ctx))
|
||||
|
||||
require.Equal(t, `[{"Name":"name.minmaxsumcount{R=V,instrumentation.name=test,A=B,C=D}","Min":123.456,"Max":876.543,"Sum":999.999,"Count":2}]`, fix.Output())
|
||||
}
|
||||
|
||||
func TestStdoutHistogramFormat(t *testing.T) {
|
||||
fix := newFixture(t, stdoutmetric.WithPrettyPrint())
|
||||
|
||||
@@ -201,7 +189,6 @@ func TestStdoutNoData(t *testing.T) {
|
||||
}
|
||||
|
||||
runTwoAggs("lastvalue")
|
||||
runTwoAggs("minmaxsumcount")
|
||||
}
|
||||
|
||||
func TestStdoutResource(t *testing.T) {
|
||||
|
@@ -84,15 +84,6 @@ type (
|
||||
Sum() (number.Number, error)
|
||||
Histogram() (Buckets, error)
|
||||
}
|
||||
|
||||
// MinMaxSumCount supports the Min, Max, Sum, and Count interfaces.
|
||||
MinMaxSumCount interface {
|
||||
Aggregation
|
||||
Min() (number.Number, error)
|
||||
Max() (number.Number, error)
|
||||
Sum() (number.Number, error)
|
||||
Count() (uint64, error)
|
||||
}
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -106,18 +97,16 @@ type (
|
||||
// deciding how to expose metric data. This enables
|
||||
// user-supplied Aggregators to replace builtin Aggregators.
|
||||
//
|
||||
// For example, test for a Distribution before testing for a
|
||||
// MinMaxSumCount, test for a Histogram before testing for a
|
||||
// For example, test for a Histogram before testing for a
|
||||
// Sum, and so on.
|
||||
Kind string
|
||||
)
|
||||
|
||||
// Kind description constants.
|
||||
const (
|
||||
SumKind Kind = "Sum"
|
||||
MinMaxSumCountKind Kind = "MinMaxSumCount"
|
||||
HistogramKind Kind = "Histogram"
|
||||
LastValueKind Kind = "Lastvalue"
|
||||
SumKind Kind = "Sum"
|
||||
HistogramKind Kind = "Histogram"
|
||||
LastValueKind Kind = "Lastvalue"
|
||||
)
|
||||
|
||||
// Sentinel errors for Aggregation interface.
|
||||
|
@@ -139,8 +139,7 @@ type CheckpointerFactory interface {
|
||||
//
|
||||
// Note that any Aggregator may be attached to any instrument--this is
|
||||
// the result of the OpenTelemetry API/SDK separation. It is possible
|
||||
// to attach a Sum aggregator to a Histogram instrument or a
|
||||
// MinMaxSumCount aggregator to a Counter instrument.
|
||||
// to attach a Sum aggregator to a Histogram instrument.
|
||||
type Aggregator interface {
|
||||
// Aggregation returns an Aggregation interface to access the
|
||||
// current state of this Aggregator. The caller is
|
||||
|
@@ -1,165 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package minmaxsumcount // import "go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel/metric/number"
|
||||
"go.opentelemetry.io/otel/metric/sdkapi"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator"
|
||||
)
|
||||
|
||||
type (
|
||||
// Aggregator aggregates events that form a distribution,
|
||||
// keeping only the min, max, sum, and count.
|
||||
Aggregator struct {
|
||||
lock sync.Mutex
|
||||
kind number.Kind
|
||||
state
|
||||
}
|
||||
|
||||
state struct {
|
||||
sum number.Number
|
||||
min number.Number
|
||||
max number.Number
|
||||
count uint64
|
||||
}
|
||||
)
|
||||
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregation.MinMaxSumCount = &Aggregator{}
|
||||
|
||||
// New returns a new aggregator for computing the min, max, sum, and
|
||||
// count.
|
||||
//
|
||||
// This type uses a mutex for Update() and SynchronizedMove() concurrency.
|
||||
func New(cnt int, desc *sdkapi.Descriptor) []Aggregator {
|
||||
kind := desc.NumberKind()
|
||||
aggs := make([]Aggregator, cnt)
|
||||
for i := range aggs {
|
||||
aggs[i] = Aggregator{
|
||||
kind: kind,
|
||||
state: emptyState(kind),
|
||||
}
|
||||
}
|
||||
return aggs
|
||||
}
|
||||
|
||||
// Aggregation returns an interface for reading the state of this aggregator.
|
||||
func (c *Aggregator) Aggregation() aggregation.Aggregation {
|
||||
return c
|
||||
}
|
||||
|
||||
// Kind returns aggregation.MinMaxSumCountKind.
|
||||
func (c *Aggregator) Kind() aggregation.Kind {
|
||||
return aggregation.MinMaxSumCountKind
|
||||
}
|
||||
|
||||
// Sum returns the sum of values in the checkpoint.
|
||||
func (c *Aggregator) Sum() (number.Number, error) {
|
||||
return c.sum, nil
|
||||
}
|
||||
|
||||
// Count returns the number of values in the checkpoint.
|
||||
func (c *Aggregator) Count() (uint64, error) {
|
||||
return c.count, nil
|
||||
}
|
||||
|
||||
// Min returns the minimum value in the checkpoint.
|
||||
// The error value aggregation.ErrNoData will be returned
|
||||
// if there were no measurements recorded during the checkpoint.
|
||||
func (c *Aggregator) Min() (number.Number, error) {
|
||||
if c.count == 0 {
|
||||
return 0, aggregation.ErrNoData
|
||||
}
|
||||
return c.min, nil
|
||||
}
|
||||
|
||||
// Max returns the maximum value in the checkpoint.
|
||||
// The error value aggregation.ErrNoData will be returned
|
||||
// if there were no measurements recorded during the checkpoint.
|
||||
func (c *Aggregator) Max() (number.Number, error) {
|
||||
if c.count == 0 {
|
||||
return 0, aggregation.ErrNoData
|
||||
}
|
||||
return c.max, nil
|
||||
}
|
||||
|
||||
// SynchronizedMove saves the current state into oa and resets the current state to
|
||||
// the empty set.
|
||||
func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *sdkapi.Descriptor) error {
|
||||
o, _ := oa.(*Aggregator)
|
||||
|
||||
if oa != nil && o == nil {
|
||||
return aggregator.NewInconsistentAggregatorError(c, oa)
|
||||
}
|
||||
c.lock.Lock()
|
||||
if o != nil {
|
||||
o.state = c.state
|
||||
}
|
||||
c.state = emptyState(c.kind)
|
||||
c.lock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func emptyState(kind number.Kind) state {
|
||||
return state{
|
||||
count: 0,
|
||||
sum: 0,
|
||||
min: kind.Maximum(),
|
||||
max: kind.Minimum(),
|
||||
}
|
||||
}
|
||||
|
||||
// Update adds the recorded measurement to the current data set.
|
||||
func (c *Aggregator) Update(_ context.Context, number number.Number, desc *sdkapi.Descriptor) error {
|
||||
kind := desc.NumberKind()
|
||||
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
c.count++
|
||||
c.sum.AddNumber(kind, number)
|
||||
if number.CompareNumber(kind, c.min) < 0 {
|
||||
c.min = number
|
||||
}
|
||||
if number.CompareNumber(kind, c.max) > 0 {
|
||||
c.max = number
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Merge combines two data sets into one.
|
||||
func (c *Aggregator) Merge(oa export.Aggregator, desc *sdkapi.Descriptor) error {
|
||||
o, _ := oa.(*Aggregator)
|
||||
if o == nil {
|
||||
return aggregator.NewInconsistentAggregatorError(c, oa)
|
||||
}
|
||||
|
||||
c.count += o.count
|
||||
c.sum.AddNumber(desc.NumberKind(), o.sum)
|
||||
|
||||
if c.min.CompareNumber(desc.NumberKind(), o.min) > 0 {
|
||||
c.min.SetNumber(o.min)
|
||||
}
|
||||
if c.max.CompareNumber(desc.NumberKind(), o.max) < 0 {
|
||||
c.max.SetNumber(o.max)
|
||||
}
|
||||
return nil
|
||||
}
|
@@ -1,248 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package minmaxsumcount
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"go.opentelemetry.io/otel/metric/number"
|
||||
"go.opentelemetry.io/otel/metric/sdkapi"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/aggregatortest"
|
||||
)
|
||||
|
||||
const count = 100
|
||||
|
||||
type policy struct {
|
||||
name string
|
||||
absolute bool
|
||||
sign func() int
|
||||
}
|
||||
|
||||
var (
|
||||
positiveOnly = policy{
|
||||
name: "absolute",
|
||||
absolute: true,
|
||||
sign: func() int { return +1 },
|
||||
}
|
||||
negativeOnly = policy{
|
||||
name: "negative",
|
||||
absolute: false,
|
||||
sign: func() int { return -1 },
|
||||
}
|
||||
positiveAndNegative = policy{
|
||||
name: "positiveAndNegative",
|
||||
absolute: false,
|
||||
sign: func() int {
|
||||
if rand.Uint32() > math.MaxUint32/2 {
|
||||
return -1
|
||||
}
|
||||
return 1
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func TestMinMaxSumCountAbsolute(t *testing.T) {
|
||||
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
|
||||
minMaxSumCount(t, profile, positiveOnly)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMinMaxSumCountNegativeOnly(t *testing.T) {
|
||||
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
|
||||
minMaxSumCount(t, profile, negativeOnly)
|
||||
})
|
||||
}
|
||||
|
||||
func TestMinMaxSumCountPositiveAndNegative(t *testing.T) {
|
||||
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
|
||||
minMaxSumCount(t, profile, positiveAndNegative)
|
||||
})
|
||||
}
|
||||
|
||||
func new2(desc *sdkapi.Descriptor) (_, _ *Aggregator) {
|
||||
alloc := New(2, desc)
|
||||
return &alloc[0], &alloc[1]
|
||||
}
|
||||
|
||||
func new4(desc *sdkapi.Descriptor) (_, _, _, _ *Aggregator) {
|
||||
alloc := New(4, desc)
|
||||
return &alloc[0], &alloc[1], &alloc[2], &alloc[3]
|
||||
}
|
||||
|
||||
func checkZero(t *testing.T, agg *Aggregator, desc *sdkapi.Descriptor) {
|
||||
kind := desc.NumberKind()
|
||||
|
||||
sum, err := agg.Sum()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, kind.Zero(), sum)
|
||||
|
||||
count, err := agg.Count()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), count)
|
||||
|
||||
max, err := agg.Max()
|
||||
require.True(t, errors.Is(err, aggregation.ErrNoData))
|
||||
require.Equal(t, kind.Zero(), max)
|
||||
|
||||
min, err := agg.Min()
|
||||
require.True(t, errors.Is(err, aggregation.ErrNoData))
|
||||
require.Equal(t, kind.Zero(), min)
|
||||
}
|
||||
|
||||
// Validates min, max, sum and count for a given profile and policy
|
||||
func minMaxSumCount(t *testing.T, profile aggregatortest.Profile, policy policy) {
|
||||
descriptor := aggregatortest.NewAggregatorTest(sdkapi.HistogramInstrumentKind, profile.NumberKind)
|
||||
|
||||
agg, ckpt := new2(descriptor)
|
||||
|
||||
all := aggregatortest.NewNumbers(profile.NumberKind)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(policy.sign())
|
||||
all.Append(x)
|
||||
aggregatortest.CheckedUpdate(t, agg, x, descriptor)
|
||||
}
|
||||
|
||||
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||
|
||||
checkZero(t, agg, descriptor)
|
||||
|
||||
all.Sort()
|
||||
|
||||
aggSum, err := ckpt.Sum()
|
||||
require.Nil(t, err)
|
||||
allSum := all.Sum()
|
||||
require.InEpsilon(t,
|
||||
(&allSum).CoerceToFloat64(profile.NumberKind),
|
||||
aggSum.CoerceToFloat64(profile.NumberKind),
|
||||
0.000000001,
|
||||
"Same sum - "+policy.name)
|
||||
|
||||
count, err := ckpt.Count()
|
||||
require.Equal(t, all.Count(), count, "Same count -"+policy.name)
|
||||
require.Nil(t, err)
|
||||
|
||||
min, err := ckpt.Min()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t,
|
||||
all.Min(),
|
||||
min,
|
||||
"Same min -"+policy.name)
|
||||
|
||||
max, err := ckpt.Max()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t,
|
||||
all.Max(),
|
||||
max,
|
||||
"Same max -"+policy.name)
|
||||
}
|
||||
|
||||
func TestMinMaxSumCountMerge(t *testing.T) {
|
||||
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
|
||||
descriptor := aggregatortest.NewAggregatorTest(sdkapi.HistogramInstrumentKind, profile.NumberKind)
|
||||
|
||||
agg1, agg2, ckpt1, ckpt2 := new4(descriptor)
|
||||
|
||||
all := aggregatortest.NewNumbers(profile.NumberKind)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all.Append(x)
|
||||
aggregatortest.CheckedUpdate(t, agg1, x, descriptor)
|
||||
}
|
||||
for i := 0; i < count; i++ {
|
||||
x := profile.Random(+1)
|
||||
all.Append(x)
|
||||
aggregatortest.CheckedUpdate(t, agg2, x, descriptor)
|
||||
}
|
||||
|
||||
require.NoError(t, agg1.SynchronizedMove(ckpt1, descriptor))
|
||||
require.NoError(t, agg2.SynchronizedMove(ckpt2, descriptor))
|
||||
|
||||
checkZero(t, agg1, descriptor)
|
||||
checkZero(t, agg2, descriptor)
|
||||
|
||||
aggregatortest.CheckedMerge(t, ckpt1, ckpt2, descriptor)
|
||||
|
||||
all.Sort()
|
||||
|
||||
aggSum, err := ckpt1.Sum()
|
||||
require.Nil(t, err)
|
||||
allSum := all.Sum()
|
||||
require.InEpsilon(t,
|
||||
(&allSum).CoerceToFloat64(profile.NumberKind),
|
||||
aggSum.CoerceToFloat64(profile.NumberKind),
|
||||
0.000000001,
|
||||
"Same sum - absolute")
|
||||
|
||||
count, err := ckpt1.Count()
|
||||
require.Equal(t, all.Count(), count, "Same count - absolute")
|
||||
require.Nil(t, err)
|
||||
|
||||
min, err := ckpt1.Min()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t,
|
||||
all.Min(),
|
||||
min,
|
||||
"Same min - absolute")
|
||||
|
||||
max, err := ckpt1.Max()
|
||||
require.Nil(t, err)
|
||||
require.Equal(t,
|
||||
all.Max(),
|
||||
max,
|
||||
"Same max - absolute")
|
||||
})
|
||||
}
|
||||
|
||||
func TestMaxSumCountNotSet(t *testing.T) {
|
||||
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
|
||||
descriptor := aggregatortest.NewAggregatorTest(sdkapi.HistogramInstrumentKind, profile.NumberKind)
|
||||
|
||||
alloc := New(2, descriptor)
|
||||
agg, ckpt := &alloc[0], &alloc[1]
|
||||
|
||||
require.NoError(t, agg.SynchronizedMove(ckpt, descriptor))
|
||||
|
||||
asum, err := ckpt.Sum()
|
||||
require.Equal(t, number.Number(0), asum, "Empty checkpoint sum = 0")
|
||||
require.Nil(t, err)
|
||||
|
||||
count, err := ckpt.Count()
|
||||
require.Equal(t, uint64(0), count, "Empty checkpoint count = 0")
|
||||
require.Nil(t, err)
|
||||
|
||||
max, err := ckpt.Max()
|
||||
require.Equal(t, aggregation.ErrNoData, err)
|
||||
require.Equal(t, number.Number(0), max)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSynchronizedMoveReset(t *testing.T) {
|
||||
aggregatortest.SynchronizedMoveResetTest(
|
||||
t,
|
||||
sdkapi.HistogramInstrumentKind,
|
||||
func(desc *sdkapi.Descriptor) export.Aggregator {
|
||||
return &New(1, desc)[0]
|
||||
},
|
||||
)
|
||||
}
|
@@ -303,16 +303,6 @@ func BenchmarkGaugeObserverObservationFloat64(b *testing.B) {
|
||||
fix.accumulator.Collect(ctx)
|
||||
}
|
||||
|
||||
// MaxSumCount
|
||||
|
||||
func BenchmarkInt64MaxSumCountAdd(b *testing.B) {
|
||||
benchmarkInt64HistogramAdd(b, "int64.minmaxsumcount")
|
||||
}
|
||||
|
||||
func BenchmarkFloat64MaxSumCountAdd(b *testing.B) {
|
||||
benchmarkFloat64HistogramAdd(b, "float64.minmaxsumcount")
|
||||
}
|
||||
|
||||
// Exact
|
||||
|
||||
func BenchmarkInt64ExactAdd(b *testing.B) {
|
||||
|
@@ -1,81 +0,0 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/metric/metrictest"
|
||||
"go.opentelemetry.io/otel/metric/number"
|
||||
"go.opentelemetry.io/otel/metric/sdkapi"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
)
|
||||
|
||||
func TestStressInt64MinMaxSumCount(t *testing.T) {
|
||||
desc := metrictest.NewDescriptor("some_metric", sdkapi.HistogramInstrumentKind, number.Int64Kind)
|
||||
alloc := minmaxsumcount.New(2, &desc)
|
||||
mmsc, ckpt := &alloc[0], &alloc[1]
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
rnd := rand.New(rand.NewSource(time.Now().Unix()))
|
||||
v := rnd.Int63() % 103
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
_ = mmsc.Update(ctx, number.NewInt64Number(v), &desc)
|
||||
}
|
||||
v++
|
||||
}
|
||||
}()
|
||||
|
||||
startTime := time.Now()
|
||||
for time.Since(startTime) < time.Second {
|
||||
_ = mmsc.SynchronizedMove(ckpt, &desc)
|
||||
|
||||
s, _ := ckpt.Sum()
|
||||
c, _ := ckpt.Count()
|
||||
min, e1 := ckpt.Min()
|
||||
max, e2 := ckpt.Max()
|
||||
if c == 0 && (e1 == nil || e2 == nil || s.AsInt64() != 0) {
|
||||
t.Fail()
|
||||
}
|
||||
if c != 0 {
|
||||
if e1 != nil || e2 != nil {
|
||||
t.Fail()
|
||||
}
|
||||
lo, hi, sum := min.AsInt64(), max.AsInt64(), s.AsInt64()
|
||||
|
||||
if uint64(hi-lo)+1 != c {
|
||||
t.Fail()
|
||||
}
|
||||
if c == 1 {
|
||||
if lo != hi || lo != sum {
|
||||
t.Fail()
|
||||
}
|
||||
} else {
|
||||
if hi*(hi+1)/2-(lo-1)*lo/2 != sum {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -80,7 +80,6 @@ func TestProcessor(t *testing.T) {
|
||||
t.Run(nc.kind.String(), func(t *testing.T) {
|
||||
for _, ac := range []aggregatorCase{
|
||||
{kind: aggregation.SumKind},
|
||||
{kind: aggregation.MinMaxSumCountKind},
|
||||
{kind: aggregation.HistogramKind},
|
||||
{kind: aggregation.LastValueKind},
|
||||
} {
|
||||
|
@@ -28,7 +28,6 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
@@ -191,11 +190,6 @@ func (testAggregatorSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...
|
||||
for i := range aggPtrs {
|
||||
*aggPtrs[i] = &aggs[i]
|
||||
}
|
||||
case strings.HasSuffix(desc.Name(), ".minmaxsumcount"):
|
||||
aggs := minmaxsumcount.New(len(aggPtrs), desc)
|
||||
for i := range aggPtrs {
|
||||
*aggPtrs[i] = &aggs[i]
|
||||
}
|
||||
case strings.HasSuffix(desc.Name(), ".lastvalue"):
|
||||
aggs := lastvalue.New(len(aggPtrs))
|
||||
for i := range aggPtrs {
|
||||
|
@@ -19,7 +19,6 @@ import (
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
)
|
||||
|
||||
@@ -70,7 +69,7 @@ func (selectorInexpensive) AggregatorFor(descriptor *sdkapi.Descriptor, aggPtrs
|
||||
case sdkapi.GaugeObserverInstrumentKind:
|
||||
lastValueAggs(aggPtrs)
|
||||
case sdkapi.HistogramInstrumentKind:
|
||||
aggs := minmaxsumcount.New(len(aggPtrs), descriptor)
|
||||
aggs := sum.New(len(aggPtrs))
|
||||
for i := range aggPtrs {
|
||||
*aggPtrs[i] = &aggs[i]
|
||||
}
|
||||
|
@@ -25,7 +25,6 @@ import (
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
)
|
||||
@@ -55,7 +54,7 @@ func testFixedSelectors(t *testing.T, sel export.AggregatorSelector) {
|
||||
|
||||
func TestInexpensiveDistribution(t *testing.T) {
|
||||
inex := simple.NewWithInexpensiveDistribution()
|
||||
require.IsType(t, (*minmaxsumcount.Aggregator)(nil), oneAgg(inex, &testHistogramDesc))
|
||||
require.IsType(t, (*sum.Aggregator)(nil), oneAgg(inex, &testHistogramDesc))
|
||||
testFixedSelectors(t, inex)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user