1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-07-17 01:12:45 +02:00

Replace internal aggregate Aggregator with Measure/ComputeAggregation and a Builder (#4304)

This commit is contained in:
Tyler Yahn
2023-07-17 07:15:50 -07:00
committed by GitHub
parent fdbcb9ac28
commit d18f20179e
18 changed files with 576 additions and 362 deletions

View File

@ -194,7 +194,7 @@ type streamID struct {
} }
type int64Inst struct { type int64Inst struct {
aggregators []aggregate.Aggregator[int64] measures []aggregate.Measure[int64]
embedded.Int64Counter embedded.Int64Counter
embedded.Int64UpDownCounter embedded.Int64UpDownCounter
@ -219,13 +219,13 @@ func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) {
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
return return
} }
for _, agg := range i.aggregators { for _, in := range i.measures {
agg.Aggregate(val, s) in(ctx, val, s)
} }
} }
type float64Inst struct { type float64Inst struct {
aggregators []aggregate.Aggregator[float64] measures []aggregate.Measure[float64]
embedded.Float64Counter embedded.Float64Counter
embedded.Float64UpDownCounter embedded.Float64UpDownCounter
@ -250,8 +250,8 @@ func (i *float64Inst) aggregate(ctx context.Context, val float64, s attribute.Se
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
return return
} }
for _, agg := range i.aggregators { for _, in := range i.measures {
agg.Aggregate(val, s) in(ctx, val, s)
} }
} }
@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{}
var _ metric.Float64ObservableUpDownCounter = float64Observable{} var _ metric.Float64ObservableUpDownCounter = float64Observable{}
var _ metric.Float64ObservableGauge = float64Observable{} var _ metric.Float64ObservableGauge = float64Observable{}
func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[float64]) float64Observable { func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
return float64Observable{ return float64Observable{
observable: newObservable(scope, kind, name, desc, u, agg), observable: newObservable(scope, kind, name, desc, u, meas),
} }
} }
@ -296,9 +296,9 @@ var _ metric.Int64ObservableCounter = int64Observable{}
var _ metric.Int64ObservableUpDownCounter = int64Observable{} var _ metric.Int64ObservableUpDownCounter = int64Observable{}
var _ metric.Int64ObservableGauge = int64Observable{} var _ metric.Int64ObservableGauge = int64Observable{}
func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[int64]) int64Observable { func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
return int64Observable{ return int64Observable{
observable: newObservable(scope, kind, name, desc, u, agg), observable: newObservable(scope, kind, name, desc, u, meas),
} }
} }
@ -306,10 +306,10 @@ type observable[N int64 | float64] struct {
metric.Observable metric.Observable
observablID[N] observablID[N]
aggregators []aggregate.Aggregator[N] measures []aggregate.Measure[N]
} }
func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[N]) *observable[N] { func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
return &observable[N]{ return &observable[N]{
observablID: observablID[N]{ observablID: observablID[N]{
name: name, name: name,
@ -318,14 +318,14 @@ func newObservable[N int64 | float64](scope instrumentation.Scope, kind Instrume
unit: u, unit: u,
scope: scope, scope: scope,
}, },
aggregators: agg, measures: meas,
} }
} }
// observe records the val for the set of attrs. // observe records the val for the set of attrs.
func (o *observable[N]) observe(val N, s attribute.Set) { func (o *observable[N]) observe(val N, s attribute.Set) {
for _, agg := range o.aggregators { for _, in := range o.measures {
agg.Aggregate(val, s) in(context.Background(), val, s)
} }
} }
@ -336,7 +336,7 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument")
// no-op because it does not have any aggregators. Also, an error is returned // no-op because it does not have any aggregators. Also, an error is returned
// if scope defines a Meter other than the one o was created by. // if scope defines a Meter other than the one o was created by.
func (o *observable[N]) registerable(scope instrumentation.Scope) error { func (o *observable[N]) registerable(scope instrumentation.Scope) error {
if len(o.aggregators) == 0 { if len(o.measures) == 0 {
return errEmptyAgg return errEmptyAgg
} }
if scope != o.scope { if scope != o.scope {

View File

@ -20,6 +20,7 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
) )
func BenchmarkInstrument(b *testing.B) { func BenchmarkInstrument(b *testing.B) {
@ -32,11 +33,21 @@ func BenchmarkInstrument(b *testing.B) {
} }
b.Run("instrumentImpl/aggregate", func(b *testing.B) { b.Run("instrumentImpl/aggregate", func(b *testing.B) {
inst := int64Inst{aggregators: []aggregate.Aggregator[int64]{ build := aggregate.Builder[int64]{}
aggregate.NewLastValue[int64](), var meas []aggregate.Measure[int64]
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true), in, _ := build.LastValue()
}} meas = append(meas, in)
build.Temporality = metricdata.CumulativeTemporality
in, _ = build.Sum(true)
meas = append(meas, in)
build.Temporality = metricdata.DeltaTemporality
in, _ = build.Sum(true)
meas = append(meas, in)
inst := int64Inst{measures: meas}
ctx := context.Background() ctx := context.Background()
b.ReportAllocs() b.ReportAllocs()
@ -47,11 +58,21 @@ func BenchmarkInstrument(b *testing.B) {
}) })
b.Run("observable/observe", func(b *testing.B) { b.Run("observable/observe", func(b *testing.B) {
o := observable[int64]{aggregators: []aggregate.Aggregator[int64]{ build := aggregate.Builder[int64]{}
aggregate.NewLastValue[int64](), var meas []aggregate.Measure[int64]
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true), in, _ := build.LastValue()
}} meas = append(meas, in)
build.Temporality = metricdata.CumulativeTemporality
in, _ = build.Sum(true)
meas = append(meas, in)
build.Temporality = metricdata.DeltaTemporality
in, _ = build.Sum(true)
meas = append(meas, in)
o := observable[int64]{measures: meas}
b.ReportAllocs() b.ReportAllocs()
b.ResetTimer() b.ResetTimer()

View File

@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// Measure receives measurements to be aggregated.
type Measure[N int64 | float64] func(context.Context, N, attribute.Set)
// ComputeAggregation stores the aggregate of measurements into dest and
// returns the number of aggregate data-points output.
type ComputeAggregation func(dest *metricdata.Aggregation) int
// Builder builds an aggregate function.
type Builder[N int64 | float64] struct {
// Temporality is the temporality used for the returned aggregate function.
//
// If this is not provided a default of cumulative will be used (except for
// the last-value aggregate function where delta is the only appropriate
// temporality).
Temporality metricdata.Temporality
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
}
func (b Builder[N]) input(agg aggregator[N]) Measure[N] {
if b.Filter != nil {
agg = newFilter[N](agg, b.Filter)
}
return func(_ context.Context, n N, a attribute.Set) {
agg.Aggregate(n, a)
}
}
// LastValue returns a last-value aggregate function input and output.
//
// The Builder.Temporality is ignored and delta is use always.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N]()
return b.input(lv), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = lv.Aggregation()
gData, _ := (*dest).(metricdata.Gauge[N])
return len(gData.DataPoints)
}
}
// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
var s aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
s = newPrecomputedDeltaSum[N](monotonic)
default:
s = newPrecomputedCumulativeSum[N](monotonic)
}
return b.input(s), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = s.Aggregation()
sData, _ := (*dest).(metricdata.Sum[N])
return len(sData.DataPoints)
}
}
// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
var s aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
s = newDeltaSum[N](monotonic)
default:
s = newCumulativeSum[N](monotonic)
}
return b.input(s), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = s.Aggregation()
sData, _ := (*dest).(metricdata.Sum[N])
return len(sData.DataPoints)
}
}
// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram) (Measure[N], ComputeAggregation) {
var h aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
h = newDeltaHistogram[N](cfg)
default:
h = newCumulativeHistogram[N](cfg)
}
return b.input(h), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = h.Aggregation()
hData, _ := (*dest).(metricdata.Histogram[N])
return len(hData.DataPoints)
}
}

View File

@ -25,11 +25,11 @@ import (
// override the default time.Now function. // override the default time.Now function.
var now = time.Now var now = time.Now
// Aggregator forms an aggregation from a collection of recorded measurements. // aggregator forms an aggregation from a collection of recorded measurements.
// //
// Aggregators need to be comparable so they can be de-duplicated by the SDK // Aggregators need to be comparable so they can be de-duplicated by the SDK
// when it creates them for multiple views. // when it creates them for multiple views.
type Aggregator[N int64 | float64] interface { type aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it // Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation. // into an aggregation.
Aggregate(measurement N, attr attribute.Set) Aggregate(measurement N, attr attribute.Set)
@ -45,7 +45,7 @@ type precomputeAggregator[N int64 | float64] interface {
// The Aggregate method of the embedded Aggregator is used to record // The Aggregate method of the embedded Aggregator is used to record
// pre-computed measurements, scoped by attributes that have not been // pre-computed measurements, scoped by attributes that have not been
// filtered by an attribute filter. // filtered by an attribute filter.
Aggregator[N] aggregator[N]
// aggregateFiltered records measurements scoped by attributes that have // aggregateFiltered records measurements scoped by attributes that have
// been filtered by an attribute filter. // been filtered by an attribute filter.

View File

@ -37,7 +37,7 @@ func (p *meter) Int64Counter(string, ...metric.Int64CounterOption) (metric.Int64
// temporality to used based on the Reader and View configuration. Assume // temporality to used based on the Reader and View configuration. Assume
// here these are determined to be a cumulative sum. // here these are determined to be a cumulative sum.
aggregator := NewCumulativeSum[int64](true) aggregator := newCumulativeSum[int64](true)
count := inst{aggregateFunc: aggregator.Aggregate} count := inst{aggregateFunc: aggregator.Aggregate}
p.aggregations = append(p.aggregations, aggregator.Aggregation()) p.aggregations = append(p.aggregations, aggregator.Aggregation())
@ -54,7 +54,7 @@ func (p *meter) Int64UpDownCounter(string, ...metric.Int64UpDownCounterOption) (
// configuration. Assume here these are determined to be a last-value // configuration. Assume here these are determined to be a last-value
// aggregation (the temporality does not affect the produced aggregations). // aggregation (the temporality does not affect the produced aggregations).
aggregator := NewLastValue[int64]() aggregator := newLastValue[int64]()
upDownCount := inst{aggregateFunc: aggregator.Aggregate} upDownCount := inst{aggregateFunc: aggregator.Aggregate}
p.aggregations = append(p.aggregations, aggregator.Aggregation()) p.aggregations = append(p.aggregations, aggregator.Aggregation())
@ -71,7 +71,7 @@ func (p *meter) Int64Histogram(string, ...metric.Int64HistogramOption) (metric.I
// Assume here these are determined to be a delta explicit-bucket // Assume here these are determined to be a delta explicit-bucket
// histogram. // histogram.
aggregator := NewDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{ aggregator := newDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}, Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000},
NoMinMax: false, NoMinMax: false,
}) })

View File

@ -84,12 +84,12 @@ type aggregatorTester[N int64 | float64] struct {
CycleN int CycleN int
} }
func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) { func (at *aggregatorTester[N]) Run(a aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) { return func(t *testing.T) {
t.Run("Comparable", func(t *testing.T) { t.Run("Comparable", func(t *testing.T) {
assert.NotPanics(t, func() { assert.NotPanics(t, func() {
_ = map[Aggregator[N]]struct{}{a: {}} _ = map[aggregator[N]]struct{}{a: {}}
}) })
}) })
@ -117,7 +117,7 @@ func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap[N], eFunc expect
var bmarkResults metricdata.Aggregation var bmarkResults metricdata.Aggregation
func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggregator[N], count int) { func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() aggregator[N], count int) {
attrs := make([]attribute.Set, count) attrs := make([]attribute.Set, count)
for i := range attrs { for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i)) attrs[i] = attribute.NewSet(attribute.Int("value", i))
@ -137,7 +137,7 @@ func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggreg
}) })
b.Run("Aggregations", func(b *testing.B) { b.Run("Aggregations", func(b *testing.B) {
aggs := make([]Aggregator[N], b.N) aggs := make([]aggregator[N], b.N)
for n := range aggs { for n := range aggs {
a := factory() a := factory()
for _, attr := range attrs { for _, attr := range attrs {
@ -155,7 +155,7 @@ func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggreg
}) })
} }
func benchmarkAggregator[N int64 | float64](factory func() Aggregator[N]) func(*testing.B) { func benchmarkAggregator[N int64 | float64](factory func() aggregator[N]) func(*testing.B) {
counts := []int{1, 10, 100} counts := []int{1, 10, 100}
return func(b *testing.B) { return func(b *testing.B) {
for _, n := range counts { for _, n := range counts {

View File

@ -19,18 +19,21 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata"
) )
// NewFilter returns an Aggregator that wraps an agg with an attribute // newFilter returns an Aggregator that wraps an agg with an attribute
// filtering function. Both pre-computed non-pre-computed Aggregators can be // filtering function. Both pre-computed non-pre-computed Aggregators can be
// passed for agg. An appropriate Aggregator will be returned for the detected // passed for agg. An appropriate Aggregator will be returned for the detected
// type. // type.
func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggregator[N] { func newFilter[N int64 | float64](agg aggregator[N], fn attribute.Filter) aggregator[N] {
if fn == nil { if fn == nil {
return agg return agg
} }
if fa, ok := agg.(precomputeAggregator[N]); ok { if fa, ok := agg.(precomputeAggregator[N]); ok {
return newPrecomputedFilter(fa, fn) return newPrecomputedFilter(fa, fn)
} }
return newFilter(agg, fn) return &filter[N]{
filter: fn,
aggregator: agg,
}
} }
// filter wraps an aggregator with an attribute filter. All recorded // filter wraps an aggregator with an attribute filter. All recorded
@ -41,19 +44,7 @@ func NewFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) Aggreg
// precomputedFilter instead. // precomputedFilter instead.
type filter[N int64 | float64] struct { type filter[N int64 | float64] struct {
filter attribute.Filter filter attribute.Filter
aggregator Aggregator[N] aggregator aggregator[N]
}
// newFilter returns an filter Aggregator that wraps agg with the attribute
// filter fn.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
func newFilter[N int64 | float64](agg Aggregator[N], fn attribute.Filter) *filter[N] {
return &filter[N]{
filter: fn,
aggregator: agg,
}
} }
// Aggregate records the measurement, scoped by attr, and aggregates it // Aggregate records the measurement, scoped by attr, and aggregates it

View File

@ -54,13 +54,13 @@ func (a *testStableAggregator[N]) Aggregation() metricdata.Aggregation {
} }
} }
func testNewFilterNoFilter[N int64 | float64](t *testing.T, agg Aggregator[N]) { func testNewFilterNoFilter[N int64 | float64](t *testing.T, agg aggregator[N]) {
filter := NewFilter(agg, nil) filter := newFilter(agg, nil)
assert.Equal(t, agg, filter) assert.Equal(t, agg, filter)
} }
func testNewFilter[N int64 | float64](t *testing.T, agg Aggregator[N]) { func testNewFilter[N int64 | float64](t *testing.T, agg aggregator[N]) {
f := NewFilter(agg, testAttributeFilter) f := newFilter(agg, testAttributeFilter)
require.IsType(t, &filter[N]{}, f) require.IsType(t, &filter[N]{}, f)
filt := f.(*filter[N]) filt := f.(*filter[N])
assert.Equal(t, agg, filt.aggregator) assert.Equal(t, agg, filt.aggregator)
@ -147,7 +147,7 @@ func testFilterAggregate[N int64 | float64](t *testing.T) {
for _, tt := range testCases { for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
f := NewFilter[N](&testStableAggregator[N]{}, testAttributeFilter) f := newFilter[N](&testStableAggregator[N]{}, testAttributeFilter)
for _, set := range tt.inputAttr { for _, set := range tt.inputAttr {
f.Aggregate(1, set) f.Aggregate(1, set)
} }
@ -167,7 +167,7 @@ func TestFilterAggregate(t *testing.T) {
} }
func testFilterConcurrent[N int64 | float64](t *testing.T) { func testFilterConcurrent[N int64 | float64](t *testing.T) {
f := NewFilter[N](&testStableAggregator[N]{}, testAttributeFilter) f := newFilter[N](&testStableAggregator[N]{}, testAttributeFilter)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(2) wg.Add(2)
@ -205,7 +205,7 @@ func TestPrecomputedFilter(t *testing.T) {
func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) { func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
agg := newTestFilterAgg[N]() agg := newTestFilterAgg[N]()
f := NewFilter[N](agg, testAttributeFilter) f := newFilter[N](agg, testAttributeFilter)
require.IsType(t, &precomputedFilter[N]{}, f) require.IsType(t, &precomputedFilter[N]{}, f)
var ( var (

View File

@ -100,14 +100,14 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
b.bin(idx, value) b.bin(idx, value)
} }
// NewDeltaHistogram returns an Aggregator that summarizes a set of // newDeltaHistogram returns an Aggregator that summarizes a set of
// measurements as an histogram. Each histogram is scoped by attributes and // measurements as an histogram. Each histogram is scoped by attributes and
// the aggregation cycle the measurements were made in. // the aggregation cycle the measurements were made in.
// //
// Each aggregation cycle is treated independently. When the returned // Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregations method is called it will reset all histogram // Aggregator's Aggregations method is called it will reset all histogram
// counts to zero. // counts to zero.
func NewDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] {
return &deltaHistogram[N]{ return &deltaHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries), histValues: newHistValues[N](cfg.Boundaries),
noMinMax: cfg.NoMinMax, noMinMax: cfg.NoMinMax,
@ -164,13 +164,13 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation {
return h return h
} }
// NewCumulativeHistogram returns an Aggregator that summarizes a set of // newCumulativeHistogram returns an Aggregator that summarizes a set of
// measurements as an histogram. Each histogram is scoped by attributes. // measurements as an histogram. Each histogram is scoped by attributes.
// //
// Each aggregation cycle builds from the previous, the histogram counts are // Each aggregation cycle builds from the previous, the histogram counts are
// the bucketed counts of all values aggregated since the returned Aggregator // the bucketed counts of all values aggregated since the returned Aggregator
// was created. // was created.
func NewCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) Aggregator[N] { func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] {
return &cumulativeHistogram[N]{ return &cumulativeHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries), histValues: newHistValues[N](cfg.Boundaries),
noMinMax: cfg.NoMinMax, noMinMax: cfg.NoMinMax,

View File

@ -50,9 +50,9 @@ func testHistogram[N int64 | float64](t *testing.T) {
incr := monoIncr[N]() incr := monoIncr[N]()
eFunc := deltaHistExpecter[N](incr) eFunc := deltaHistExpecter[N](incr)
t.Run("Delta", tester.Run(NewDeltaHistogram[N](histConf), incr, eFunc)) t.Run("Delta", tester.Run(newDeltaHistogram[N](histConf), incr, eFunc))
eFunc = cumuHistExpecter[N](incr) eFunc = cumuHistExpecter[N](incr)
t.Run("Cumulative", tester.Run(NewCumulativeHistogram[N](histConf), incr, eFunc)) t.Run("Cumulative", tester.Run(newCumulativeHistogram[N](histConf), incr, eFunc))
} }
func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc {
@ -122,7 +122,7 @@ func testBucketsBin[N int64 | float64]() func(t *testing.T) {
} }
} }
func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram) Aggregator[N], getBounds func(Aggregator[N]) []float64) func(t *testing.T) { func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) {
b := []float64{0, 1, 2} b := []float64{0, 1, 2}
cpB := make([]float64, len(b)) cpB := make([]float64, len(b))
copy(cpB, b) copy(cpB, b)
@ -143,16 +143,16 @@ func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBu
func TestHistogramImmutableBounds(t *testing.T) { func TestHistogramImmutableBounds(t *testing.T) {
t.Run("Delta", testHistImmutableBounds( t.Run("Delta", testHistImmutableBounds(
NewDeltaHistogram[int64], newDeltaHistogram[int64],
func(a Aggregator[int64]) []float64 { func(a aggregator[int64]) []float64 {
deltaH := a.(*deltaHistogram[int64]) deltaH := a.(*deltaHistogram[int64])
return deltaH.bounds return deltaH.bounds
}, },
)) ))
t.Run("Cumulative", testHistImmutableBounds( t.Run("Cumulative", testHistImmutableBounds(
NewCumulativeHistogram[int64], newCumulativeHistogram[int64],
func(a Aggregator[int64]) []float64 { func(a aggregator[int64]) []float64 {
cumuH := a.(*cumulativeHistogram[int64]) cumuH := a.(*cumulativeHistogram[int64])
return cumuH.bounds return cumuH.bounds
}, },
@ -160,7 +160,7 @@ func TestHistogramImmutableBounds(t *testing.T) {
} }
func TestCumulativeHistogramImutableCounts(t *testing.T) { func TestCumulativeHistogramImutableCounts(t *testing.T) {
a := NewCumulativeHistogram[int64](histConf) a := newCumulativeHistogram[int64](histConf)
a.Aggregate(5, alice) a.Aggregate(5, alice)
hdp := a.Aggregation().(metricdata.Histogram[int64]).DataPoints[0] hdp := a.Aggregation().(metricdata.Histogram[int64]).DataPoints[0]
@ -176,7 +176,7 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) {
func TestDeltaHistogramReset(t *testing.T) { func TestDeltaHistogramReset(t *testing.T) {
t.Cleanup(mockTime(now)) t.Cleanup(mockTime(now))
a := NewDeltaHistogram[int64](histConf) a := newDeltaHistogram[int64](histConf)
assert.Nil(t, a.Aggregation()) assert.Nil(t, a.Aggregation())
a.Aggregate(1, alice) a.Aggregate(1, alice)
@ -195,10 +195,10 @@ func TestDeltaHistogramReset(t *testing.T) {
} }
func TestEmptyHistogramNilAggregation(t *testing.T) { func TestEmptyHistogramNilAggregation(t *testing.T) {
assert.Nil(t, NewCumulativeHistogram[int64](histConf).Aggregation()) assert.Nil(t, newCumulativeHistogram[int64](histConf).Aggregation())
assert.Nil(t, NewCumulativeHistogram[float64](histConf).Aggregation()) assert.Nil(t, newCumulativeHistogram[float64](histConf).Aggregation())
assert.Nil(t, NewDeltaHistogram[int64](histConf).Aggregation()) assert.Nil(t, newDeltaHistogram[int64](histConf).Aggregation())
assert.Nil(t, NewDeltaHistogram[float64](histConf).Aggregation()) assert.Nil(t, newDeltaHistogram[float64](histConf).Aggregation())
} }
func BenchmarkHistogram(b *testing.B) { func BenchmarkHistogram(b *testing.B) {
@ -207,8 +207,8 @@ func BenchmarkHistogram(b *testing.B) {
} }
func benchmarkHistogram[N int64 | float64](b *testing.B) { func benchmarkHistogram[N int64 | float64](b *testing.B) {
factory := func() Aggregator[N] { return NewDeltaHistogram[N](histConf) } factory := func() aggregator[N] { return newDeltaHistogram[N](histConf) }
b.Run("Delta", benchmarkAggregator(factory)) b.Run("Delta", benchmarkAggregator(factory))
factory = func() Aggregator[N] { return NewCumulativeHistogram[N](histConf) } factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf) }
b.Run("Cumulative", benchmarkAggregator(factory)) b.Run("Cumulative", benchmarkAggregator(factory))
} }

View File

@ -35,9 +35,9 @@ type lastValue[N int64 | float64] struct {
values map[attribute.Set]datapoint[N] values map[attribute.Set]datapoint[N]
} }
// NewLastValue returns an Aggregator that summarizes a set of measurements as // newLastValue returns an Aggregator that summarizes a set of measurements as
// the last one made. // the last one made.
func NewLastValue[N int64 | float64]() Aggregator[N] { func newLastValue[N int64 | float64]() aggregator[N] {
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])} return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
} }

View File

@ -47,13 +47,13 @@ func testLastValue[N int64 | float64]() func(*testing.T) {
return func(int) metricdata.Aggregation { return gauge } return func(int) metricdata.Aggregation { return gauge }
} }
incr := monoIncr[N]() incr := monoIncr[N]()
return tester.Run(NewLastValue[N](), incr, eFunc(incr)) return tester.Run(newLastValue[N](), incr, eFunc(incr))
} }
func testLastValueReset[N int64 | float64](t *testing.T) { func testLastValueReset[N int64 | float64](t *testing.T) {
t.Cleanup(mockTime(now)) t.Cleanup(mockTime(now))
a := NewLastValue[N]() a := newLastValue[N]()
assert.Nil(t, a.Aggregation()) assert.Nil(t, a.Aggregation())
a.Aggregate(1, alice) a.Aggregate(1, alice)
@ -86,11 +86,11 @@ func TestLastValueReset(t *testing.T) {
} }
func TestEmptyLastValueNilAggregation(t *testing.T) { func TestEmptyLastValueNilAggregation(t *testing.T) {
assert.Nil(t, NewLastValue[int64]().Aggregation()) assert.Nil(t, newLastValue[int64]().Aggregation())
assert.Nil(t, NewLastValue[float64]().Aggregation()) assert.Nil(t, newLastValue[float64]().Aggregation())
} }
func BenchmarkLastValue(b *testing.B) { func BenchmarkLastValue(b *testing.B) {
b.Run("Int64", benchmarkAggregator(NewLastValue[int64])) b.Run("Int64", benchmarkAggregator(newLastValue[int64]))
b.Run("Float64", benchmarkAggregator(NewLastValue[float64])) b.Run("Float64", benchmarkAggregator(newLastValue[float64]))
} }

View File

@ -38,7 +38,7 @@ func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
s.Unlock() s.Unlock()
} }
// NewDeltaSum returns an Aggregator that summarizes a set of measurements as // newDeltaSum returns an Aggregator that summarizes a set of measurements as
// their arithmetic sum. Each sum is scoped by attributes and the aggregation // their arithmetic sum. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in. // cycle the measurements were made in.
// //
@ -48,11 +48,7 @@ func (s *valueMap[N]) Aggregate(value N, attr attribute.Set) {
// //
// Each aggregation cycle is treated independently. When the returned // Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregation method is called it will reset all sums to zero. // Aggregator's Aggregation method is called it will reset all sums to zero.
func NewDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { func newDeltaSum[N int64 | float64](monotonic bool) aggregator[N] {
return newDeltaSum[N](monotonic)
}
func newDeltaSum[N int64 | float64](monotonic bool) *deltaSum[N] {
return &deltaSum[N]{ return &deltaSum[N]{
valueMap: newValueMap[N](), valueMap: newValueMap[N](),
monotonic: monotonic, monotonic: monotonic,
@ -98,7 +94,7 @@ func (s *deltaSum[N]) Aggregation() metricdata.Aggregation {
return out return out
} }
// NewCumulativeSum returns an Aggregator that summarizes a set of // newCumulativeSum returns an Aggregator that summarizes a set of
// measurements as their arithmetic sum. Each sum is scoped by attributes and // measurements as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in. // the aggregation cycle the measurements were made in.
// //
@ -108,11 +104,7 @@ func (s *deltaSum[N]) Aggregation() metricdata.Aggregation {
// //
// Each aggregation cycle is treated independently. When the returned // Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregation method is called it will reset all sums to zero. // Aggregator's Aggregation method is called it will reset all sums to zero.
func NewCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { func newCumulativeSum[N int64 | float64](monotonic bool) aggregator[N] {
return newCumulativeSum[N](monotonic)
}
func newCumulativeSum[N int64 | float64](monotonic bool) *cumulativeSum[N] {
return &cumulativeSum[N]{ return &cumulativeSum[N]{
valueMap: newValueMap[N](), valueMap: newValueMap[N](),
monotonic: monotonic, monotonic: monotonic,
@ -215,7 +207,7 @@ func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { //
s.Unlock() s.Unlock()
} }
// NewPrecomputedDeltaSum returns an Aggregator that summarizes a set of // newPrecomputedDeltaSum returns an Aggregator that summarizes a set of
// pre-computed sums. Each sum is scoped by attributes and the aggregation // pre-computed sums. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in. // cycle the measurements were made in.
// //
@ -224,7 +216,7 @@ func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { //
// value is accurate. It is up to the caller to ensure it. // value is accurate. It is up to the caller to ensure it.
// //
// The output Aggregation will report recorded values as delta temporality. // The output Aggregation will report recorded values as delta temporality.
func NewPrecomputedDeltaSum[N int64 | float64](monotonic bool) Aggregator[N] { func newPrecomputedDeltaSum[N int64 | float64](monotonic bool) aggregator[N] {
return &precomputedDeltaSum[N]{ return &precomputedDeltaSum[N]{
precomputedMap: newPrecomputedMap[N](), precomputedMap: newPrecomputedMap[N](),
reported: make(map[attribute.Set]N), reported: make(map[attribute.Set]N),
@ -290,7 +282,7 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
return out return out
} }
// NewPrecomputedCumulativeSum returns an Aggregator that summarizes a set of // newPrecomputedCumulativeSum returns an Aggregator that summarizes a set of
// pre-computed sums. Each sum is scoped by attributes and the aggregation // pre-computed sums. Each sum is scoped by attributes and the aggregation
// cycle the measurements were made in. // cycle the measurements were made in.
// //
@ -300,7 +292,7 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation {
// //
// The output Aggregation will report recorded values as cumulative // The output Aggregation will report recorded values as cumulative
// temporality. // temporality.
func NewPrecomputedCumulativeSum[N int64 | float64](monotonic bool) Aggregator[N] { func newPrecomputedCumulativeSum[N int64 | float64](monotonic bool) aggregator[N] {
return &precomputedCumulativeSum[N]{ return &precomputedCumulativeSum[N]{
precomputedMap: newPrecomputedMap[N](), precomputedMap: newPrecomputedMap[N](),
monotonic: monotonic, monotonic: monotonic,

View File

@ -41,41 +41,41 @@ func testSum[N int64 | float64](t *testing.T) {
t.Run("Delta", func(t *testing.T) { t.Run("Delta", func(t *testing.T) {
incr, mono := monoIncr[N](), true incr, mono := monoIncr[N](), true
eFunc := deltaExpecter[N](incr, mono) eFunc := deltaExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc)) t.Run("Monotonic", tester.Run(newDeltaSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false incr, mono = nonMonoIncr[N](), false
eFunc = deltaExpecter[N](incr, mono) eFunc = deltaExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc)) t.Run("NonMonotonic", tester.Run(newDeltaSum[N](mono), incr, eFunc))
}) })
t.Run("Cumulative", func(t *testing.T) { t.Run("Cumulative", func(t *testing.T) {
incr, mono := monoIncr[N](), true incr, mono := monoIncr[N](), true
eFunc := cumuExpecter[N](incr, mono) eFunc := cumuExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc)) t.Run("Monotonic", tester.Run(newCumulativeSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false incr, mono = nonMonoIncr[N](), false
eFunc = cumuExpecter[N](incr, mono) eFunc = cumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc)) t.Run("NonMonotonic", tester.Run(newCumulativeSum[N](mono), incr, eFunc))
}) })
t.Run("PreComputedDelta", func(t *testing.T) { t.Run("PreComputedDelta", func(t *testing.T) {
incr, mono := monoIncr[N](), true incr, mono := monoIncr[N](), true
eFunc := preDeltaExpecter[N](incr, mono) eFunc := preDeltaExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc)) t.Run("Monotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false incr, mono = nonMonoIncr[N](), false
eFunc = preDeltaExpecter[N](incr, mono) eFunc = preDeltaExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc)) t.Run("NonMonotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc))
}) })
t.Run("PreComputedCumulative", func(t *testing.T) { t.Run("PreComputedCumulative", func(t *testing.T) {
incr, mono := monoIncr[N](), true incr, mono := monoIncr[N](), true
eFunc := preCumuExpecter[N](incr, mono) eFunc := preCumuExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc)) t.Run("Monotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc))
incr, mono = nonMonoIncr[N](), false incr, mono = nonMonoIncr[N](), false
eFunc = preCumuExpecter[N](incr, mono) eFunc = preCumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc)) t.Run("NonMonotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc))
}) })
} }
@ -141,7 +141,7 @@ func point[N int64 | float64](a attribute.Set, v N) metricdata.DataPoint[N] {
func testDeltaSumReset[N int64 | float64](t *testing.T) { func testDeltaSumReset[N int64 | float64](t *testing.T) {
t.Cleanup(mockTime(now)) t.Cleanup(mockTime(now))
a := NewDeltaSum[N](false) a := newDeltaSum[N](false)
assert.Nil(t, a.Aggregation()) assert.Nil(t, a.Aggregation())
a.Aggregate(1, alice) a.Aggregate(1, alice)
@ -166,7 +166,7 @@ func TestDeltaSumReset(t *testing.T) {
func TestPreComputedDeltaSum(t *testing.T) { func TestPreComputedDeltaSum(t *testing.T) {
var mono bool var mono bool
agg := NewPrecomputedDeltaSum[int64](mono) agg := newPrecomputedDeltaSum[int64](mono)
require.Implements(t, (*precomputeAggregator[int64])(nil), agg) require.Implements(t, (*precomputeAggregator[int64])(nil), agg)
attrs := attribute.NewSet(attribute.String("key", "val")) attrs := attribute.NewSet(attribute.String("key", "val"))
@ -231,7 +231,7 @@ func TestPreComputedDeltaSum(t *testing.T) {
func TestPreComputedCumulativeSum(t *testing.T) { func TestPreComputedCumulativeSum(t *testing.T) {
var mono bool var mono bool
agg := NewPrecomputedCumulativeSum[int64](mono) agg := newPrecomputedCumulativeSum[int64](mono)
require.Implements(t, (*precomputeAggregator[int64])(nil), agg) require.Implements(t, (*precomputeAggregator[int64])(nil), agg)
attrs := attribute.NewSet(attribute.String("key", "val")) attrs := attribute.NewSet(attribute.String("key", "val"))
@ -282,22 +282,22 @@ func TestPreComputedCumulativeSum(t *testing.T) {
} }
func TestEmptySumNilAggregation(t *testing.T) { func TestEmptySumNilAggregation(t *testing.T) {
assert.Nil(t, NewCumulativeSum[int64](true).Aggregation()) assert.Nil(t, newCumulativeSum[int64](true).Aggregation())
assert.Nil(t, NewCumulativeSum[int64](false).Aggregation()) assert.Nil(t, newCumulativeSum[int64](false).Aggregation())
assert.Nil(t, NewCumulativeSum[float64](true).Aggregation()) assert.Nil(t, newCumulativeSum[float64](true).Aggregation())
assert.Nil(t, NewCumulativeSum[float64](false).Aggregation()) assert.Nil(t, newCumulativeSum[float64](false).Aggregation())
assert.Nil(t, NewDeltaSum[int64](true).Aggregation()) assert.Nil(t, newDeltaSum[int64](true).Aggregation())
assert.Nil(t, NewDeltaSum[int64](false).Aggregation()) assert.Nil(t, newDeltaSum[int64](false).Aggregation())
assert.Nil(t, NewDeltaSum[float64](true).Aggregation()) assert.Nil(t, newDeltaSum[float64](true).Aggregation())
assert.Nil(t, NewDeltaSum[float64](false).Aggregation()) assert.Nil(t, newDeltaSum[float64](false).Aggregation())
assert.Nil(t, NewPrecomputedCumulativeSum[int64](true).Aggregation()) assert.Nil(t, newPrecomputedCumulativeSum[int64](true).Aggregation())
assert.Nil(t, NewPrecomputedCumulativeSum[int64](false).Aggregation()) assert.Nil(t, newPrecomputedCumulativeSum[int64](false).Aggregation())
assert.Nil(t, NewPrecomputedCumulativeSum[float64](true).Aggregation()) assert.Nil(t, newPrecomputedCumulativeSum[float64](true).Aggregation())
assert.Nil(t, NewPrecomputedCumulativeSum[float64](false).Aggregation()) assert.Nil(t, newPrecomputedCumulativeSum[float64](false).Aggregation())
assert.Nil(t, NewPrecomputedDeltaSum[int64](true).Aggregation()) assert.Nil(t, newPrecomputedDeltaSum[int64](true).Aggregation())
assert.Nil(t, NewPrecomputedDeltaSum[int64](false).Aggregation()) assert.Nil(t, newPrecomputedDeltaSum[int64](false).Aggregation())
assert.Nil(t, NewPrecomputedDeltaSum[float64](true).Aggregation()) assert.Nil(t, newPrecomputedDeltaSum[float64](true).Aggregation())
assert.Nil(t, NewPrecomputedDeltaSum[float64](false).Aggregation()) assert.Nil(t, newPrecomputedDeltaSum[float64](false).Aggregation())
} }
func BenchmarkSum(b *testing.B) { func BenchmarkSum(b *testing.B) {
@ -309,8 +309,8 @@ func benchmarkSum[N int64 | float64](b *testing.B) {
// The monotonic argument is only used to annotate the Sum returned from // The monotonic argument is only used to annotate the Sum returned from
// the Aggregation method. It should not have an effect on operational // the Aggregation method. It should not have an effect on operational
// performance, therefore, only monotonic=false is benchmarked here. // performance, therefore, only monotonic=false is benchmarked here.
factory := func() Aggregator[N] { return NewDeltaSum[N](false) } factory := func() aggregator[N] { return newDeltaSum[N](false) }
b.Run("Delta", benchmarkAggregator(factory)) b.Run("Delta", benchmarkAggregator(factory))
factory = func() Aggregator[N] { return NewCumulativeSum[N](false) } factory = func() aggregator[N] { return newCumulativeSum[N](false) }
b.Run("Cumulative", benchmarkAggregator(factory)) b.Run("Cumulative", benchmarkAggregator(factory))
} }

View File

@ -451,7 +451,7 @@ func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string,
return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)} return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)}
} }
func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Aggregator[int64], error) { func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) {
inst := Instrument{ inst := Instrument{
Name: name, Name: name,
Description: desc, Description: desc,
@ -465,7 +465,7 @@ func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]a
// lookup returns the resolved instrumentImpl. // lookup returns the resolved instrumentImpl.
func (p *int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { func (p *int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u) aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{aggregators: aggs}, err return &int64Inst{measures: aggs}, err
} }
// float64InstProvider provides float64 OpenTelemetry instruments. // float64InstProvider provides float64 OpenTelemetry instruments.
@ -479,7 +479,7 @@ func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[strin
return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)} return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)}
} }
func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Aggregator[float64], error) { func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) {
inst := Instrument{ inst := Instrument{
Name: name, Name: name,
Description: desc, Description: desc,
@ -493,7 +493,7 @@ func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([
// lookup returns the resolved instrumentImpl. // lookup returns the resolved instrumentImpl.
func (p *float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { func (p *float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u) aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{aggregators: aggs}, err return &float64Inst{measures: aggs}, err
} }
type int64ObservProvider struct{ *int64InstProvider } type int64ObservProvider struct{ *int64InstProvider }
@ -504,7 +504,7 @@ func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (
} }
func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) { func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) {
if inst.observable == nil || len(inst.aggregators) == 0 { if inst.observable == nil || len(inst.measures) == 0 {
// Drop aggregator. // Drop aggregator.
return return
} }
@ -537,7 +537,7 @@ func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string)
} }
func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) { func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) {
if inst.observable == nil || len(inst.aggregators) == 0 { if inst.observable == nil || len(inst.measures) == 0 {
// Drop aggregator. // Drop aggregator.
return return
} }

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric"
@ -37,20 +38,15 @@ var (
errCreatingAggregators = errors.New("could not create all aggregators") errCreatingAggregators = errors.New("could not create all aggregators")
errIncompatibleAggregation = errors.New("incompatible aggregation") errIncompatibleAggregation = errors.New("incompatible aggregation")
errUnknownAggregation = errors.New("unrecognized aggregation") errUnknownAggregation = errors.New("unrecognized aggregation")
errUnknownTemporality = errors.New("unrecognized temporality")
) )
type aggregator interface {
Aggregation() metricdata.Aggregation
}
// instrumentSync is a synchronization point between a pipeline and an // instrumentSync is a synchronization point between a pipeline and an
// instrument's Aggregators. // instrument's aggregate function.
type instrumentSync struct { type instrumentSync struct {
name string name string
description string description string
unit string unit string
aggregator aggregator compAgg aggregate.ComputeAggregation
} }
func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline { func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline {
@ -68,8 +64,9 @@ func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline
// pipeline connects all of the instruments created by a meter provider to a Reader. // pipeline connects all of the instruments created by a meter provider to a Reader.
// This is the object that will be `Reader.register()` when a meter provider is created. // This is the object that will be `Reader.register()` when a meter provider is created.
// //
// As instruments are created the instrument should be checked if it exists in the // As instruments are created the instrument should be checked if it exists in
// views of a the Reader, and if so each aggregator should be added to the pipeline. // the views of a the Reader, and if so each aggregate function should be added
// to the pipeline.
type pipeline struct { type pipeline struct {
resource *resource.Resource resource *resource.Resource
@ -161,8 +158,8 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments)) rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments))
j := 0 j := 0
for _, inst := range instruments { for _, inst := range instruments {
data := inst.aggregator.Aggregation() data := rm.ScopeMetrics[i].Metrics[j].Data
if data != nil { if n := inst.compAgg(&data); n > 0 {
rm.ScopeMetrics[i].Metrics[j].Name = inst.name rm.ScopeMetrics[i].Metrics[j].Name = inst.name
rm.ScopeMetrics[i].Metrics[j].Description = inst.description rm.ScopeMetrics[i].Metrics[j].Description = inst.description
rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit
@ -185,11 +182,11 @@ func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics)
// inserter facilitates inserting of new instruments from a single scope into a // inserter facilitates inserting of new instruments from a single scope into a
// pipeline. // pipeline.
type inserter[N int64 | float64] struct { type inserter[N int64 | float64] struct {
// aggregators is a cache that holds Aggregators inserted into the // aggregators is a cache that holds aggregate function inputs whose
// underlying reader pipeline. This cache ensures no duplicate Aggregators // outputs have been inserted into the underlying reader pipeline. This
// are inserted into the reader pipeline and if a new request during an // cache ensures no duplicate aggregate functions are inserted into the
// instrument creation asks for the same Aggregator the same instance is // reader pipeline and if a new request during an instrument creation asks
// returned. // for the same aggregate function input the same instance is returned.
aggregators *cache[streamID, aggVal[N]] aggregators *cache[streamID, aggVal[N]]
// views is a cache that holds instrument identifiers for all the // views is a cache that holds instrument identifiers for all the
@ -215,35 +212,34 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *i
// Instrument inserts the instrument inst with instUnit into a pipeline. All // Instrument inserts the instrument inst with instUnit into a pipeline. All
// views the pipeline contains are matched against, and any matching view that // views the pipeline contains are matched against, and any matching view that
// creates a unique Aggregator will be inserted into the pipeline and included // creates a unique aggregate function will have its output inserted into the
// in the returned slice. // pipeline and its input included in the returned slice.
// //
// The returned Aggregators are ensured to be deduplicated and unique. If // The returned aggregate function inputs are ensured to be deduplicated and
// another view in another pipeline that is cached by this inserter's cache has // unique. If another view in another pipeline that is cached by this
// already inserted the same Aggregator for the same instrument, that // inserter's cache has already inserted the same aggregate function for the
// Aggregator instance is returned. // same instrument, that functions input instance is returned.
// //
// If another instrument has already been inserted by this inserter, or any // If another instrument has already been inserted by this inserter, or any
// other using the same cache, and it conflicts with the instrument being // other using the same cache, and it conflicts with the instrument being
// inserted in this call, an Aggregator matching the arguments will still be // inserted in this call, an aggregate function input matching the arguments
// returned but an Info level log message will also be logged to the OTel // will still be returned but an Info level log message will also be logged to
// global logger. // the OTel global logger.
// //
// If the passed instrument would result in an incompatible Aggregator, an // If the passed instrument would result in an incompatible aggregate function,
// error is returned and that Aggregator is not inserted or returned. // an error is returned and that aggregate function output is not inserted nor
// is its input returned.
// //
// If an instrument is determined to use a Drop aggregation, that instrument is // If an instrument is determined to use a Drop aggregation, that instrument is
// not inserted nor returned. // not inserted nor returned.
func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Aggregator[N], error) { func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error) {
var ( var (
matched bool matched bool
aggs []aggregate.Aggregator[N] measures []aggregate.Measure[N]
) )
errs := &multierror{wrapped: errCreatingAggregators} errs := &multierror{wrapped: errCreatingAggregators}
// The cache will return the same Aggregator instance. Use this fact to seen := make(map[uint64]struct{})
// compare pointer addresses to deduplicate Aggregators.
seen := make(map[aggregate.Aggregator[N]]struct{})
for _, v := range i.pipeline.views { for _, v := range i.pipeline.views {
stream, match := v(inst) stream, match := v(inst)
if !match { if !match {
@ -251,23 +247,23 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Aggregator[N], er
} }
matched = true matched = true
agg, err := i.cachedAggregator(inst.Scope, inst.Kind, stream) in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
if err != nil { if err != nil {
errs.append(err) errs.append(err)
} }
if agg == nil { // Drop aggregator. if in == nil { // Drop aggregation.
continue continue
} }
if _, ok := seen[agg]; ok { if _, ok := seen[id]; ok {
// This aggregator has already been added. // This aggregate function has already been added.
continue continue
} }
seen[agg] = struct{}{} seen[id] = struct{}{}
aggs = append(aggs, agg) measures = append(measures, in)
} }
if matched { if matched {
return aggs, errs.errorOrNil() return measures, errs.errorOrNil()
} }
// Apply implicit default view if no explicit matched. // Apply implicit default view if no explicit matched.
@ -276,37 +272,41 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Aggregator[N], er
Description: inst.Description, Description: inst.Description,
Unit: inst.Unit, Unit: inst.Unit,
} }
agg, err := i.cachedAggregator(inst.Scope, inst.Kind, stream) in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
if err != nil { if err != nil {
errs.append(err) errs.append(err)
} }
if agg != nil { if in != nil {
// Ensured to have not seen given matched was false. // Ensured to have not seen given matched was false.
aggs = append(aggs, agg) measures = append(measures, in)
} }
return aggs, errs.errorOrNil() return measures, errs.errorOrNil()
} }
var aggIDCount uint64
// aggVal is the cached value in an aggregators cache. // aggVal is the cached value in an aggregators cache.
type aggVal[N int64 | float64] struct { type aggVal[N int64 | float64] struct {
Aggregator aggregate.Aggregator[N] ID uint64
Err error Measure aggregate.Measure[N]
Err error
} }
// cachedAggregator returns the appropriate Aggregator for an instrument // cachedAggregator returns the appropriate aggregate input and output
// configuration. If the exact instrument has been created within the // functions for an instrument configuration. If the exact instrument has been
// inst.Scope, that Aggregator instance will be returned. Otherwise, a new // created within the inst.Scope, those aggregate function instances will be
// computed Aggregator will be cached and returned. // returned. Otherwise, new computed aggregate functions will be cached and
// returned.
// //
// If the instrument configuration conflicts with an instrument that has // If the instrument configuration conflicts with an instrument that has
// already been created (e.g. description, unit, data type) a warning will be // already been created (e.g. description, unit, data type) a warning will be
// logged at the "Info" level with the global OTel logger. A valid new // logged at the "Info" level with the global OTel logger. Valid new aggregate
// Aggregator for the instrument configuration will still be returned without // functions for the instrument configuration will still be returned without an
// an error. // error.
// //
// If the instrument defines an unknown or incompatible aggregation, an error // If the instrument defines an unknown or incompatible aggregation, an error
// is returned. // is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (aggregate.Aggregator[N], error) { func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
switch stream.Aggregation.(type) { switch stream.Aggregation.(type) {
case nil, aggregation.Default: case nil, aggregation.Default:
// Undefined, nil, means to use the default from the reader. // Undefined, nil, means to use the default from the reader.
@ -314,7 +314,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
} }
if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil { if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
return nil, fmt.Errorf( return nil, 0, fmt.Errorf(
"creating aggregator with instrumentKind: %d, aggregation %v: %w", "creating aggregator with instrumentKind: %d, aggregation %v: %w",
kind, stream.Aggregation, err, kind, stream.Aggregation, err,
) )
@ -325,26 +325,27 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// still be applied and a warning should be logged. // still be applied and a warning should be logged.
i.logConflict(id) i.logConflict(id)
cv := i.aggregators.Lookup(id, func() aggVal[N] { cv := i.aggregators.Lookup(id, func() aggVal[N] {
agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic) b := aggregate.Builder[N]{Temporality: id.Temporality}
if err != nil {
return aggVal[N]{nil, err}
}
if agg == nil { // Drop aggregator.
return aggVal[N]{nil, nil}
}
if len(stream.AllowAttributeKeys) > 0 { if len(stream.AllowAttributeKeys) > 0 {
agg = aggregate.NewFilter(agg, stream.attributeFilter()) b.Filter = stream.attributeFilter()
}
in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
if err != nil {
return aggVal[N]{0, nil, err}
}
if in == nil { // Drop aggregator.
return aggVal[N]{0, nil, nil}
} }
i.pipeline.addSync(scope, instrumentSync{ i.pipeline.addSync(scope, instrumentSync{
name: stream.Name, name: stream.Name,
description: stream.Description, description: stream.Description,
unit: stream.Unit, unit: stream.Unit,
aggregator: agg, compAgg: out,
}) })
return aggVal[N]{agg, err} id := atomic.AddUint64(&aggIDCount, 1)
return aggVal[N]{id, in, err}
}) })
return cv.Aggregator, cv.Err return cv.Measure, cv.ID, cv.Err
} }
// logConflict validates if an instrument with the same name as id has already // logConflict validates if an instrument with the same name as id has already
@ -386,52 +387,37 @@ func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID {
return id return id
} }
// aggregator returns a new Aggregator matching agg, kind, temporality, and // aggregateFunc returns new aggregate functions matching agg, kind, and
// monotonic. If the agg is unknown or temporality is invalid, an error is // monotonic. If the agg is unknown or temporality is invalid, an error is
// returned. // returned.
func (i *inserter[N]) aggregator(agg aggregation.Aggregation, kind InstrumentKind, temporality metricdata.Temporality, monotonic bool) (aggregate.Aggregator[N], error) { func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggregation, kind InstrumentKind) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, err error) {
switch a := agg.(type) { switch a := agg.(type) {
case aggregation.Default: case aggregation.Default:
return i.aggregator(DefaultAggregationSelector(kind), kind, temporality, monotonic) return i.aggregateFunc(b, DefaultAggregationSelector(kind), kind)
case aggregation.Drop: case aggregation.Drop:
return nil, nil // Return nil in and out to signify the drop aggregator.
case aggregation.LastValue: case aggregation.LastValue:
return aggregate.NewLastValue[N](), nil meas, comp = b.LastValue()
case aggregation.Sum: case aggregation.Sum:
switch kind { switch kind {
case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter: case InstrumentKindObservableCounter:
// Observable counters and up-down-counters are defined to record meas, comp = b.PrecomputedSum(true)
// the absolute value of the count: case InstrumentKindObservableUpDownCounter:
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/metrics/api.md#asynchronous-counter-creation meas, comp = b.PrecomputedSum(false)
switch temporality { case InstrumentKindCounter, InstrumentKindHistogram:
case metricdata.CumulativeTemporality: meas, comp = b.Sum(true)
return aggregate.NewPrecomputedCumulativeSum[N](monotonic), nil
case metricdata.DeltaTemporality:
return aggregate.NewPrecomputedDeltaSum[N](monotonic), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
}
switch temporality {
case metricdata.CumulativeTemporality:
return aggregate.NewCumulativeSum[N](monotonic), nil
case metricdata.DeltaTemporality:
return aggregate.NewDeltaSum[N](monotonic), nil
default: default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality) // InstrumentKindUpDownCounter, InstrumentKindObservableGauge, and
// instrumentKindUndefined or other invalid instrument kinds.
meas, comp = b.Sum(false)
} }
case aggregation.ExplicitBucketHistogram: case aggregation.ExplicitBucketHistogram:
switch temporality { meas, comp = b.ExplicitBucketHistogram(a)
case metricdata.CumulativeTemporality: default:
return aggregate.NewCumulativeHistogram[N](a), nil err = errUnknownAggregation
case metricdata.DeltaTemporality:
return aggregate.NewDeltaHistogram[N](a), nil
default:
return nil, fmt.Errorf("%w: %s(%d)", errUnknownTemporality, temporality.String(), temporality)
}
} }
return nil, errUnknownAggregation
return meas, comp, err
} }
// isAggregatorCompatible checks if the aggregation can be used by the instrument. // isAggregatorCompatible checks if the aggregation can be used by the instrument.
@ -520,9 +506,9 @@ func (u unregisterFuncs) Unregister() error {
return nil return nil
} }
// resolver facilitates resolving Aggregators an instrument needs to aggregate // resolver facilitates resolving aggregate functions an instrument calls to
// measurements with while updating all pipelines that need to pull from those // aggregate measurements with while updating all pipelines that need to pull
// aggregations. // from those aggregations.
type resolver[N int64 | float64] struct { type resolver[N int64 | float64] struct {
inserters []*inserter[N] inserters []*inserter[N]
} }
@ -537,18 +523,18 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) re
// Aggregators returns the Aggregators that must be updated by the instrument // Aggregators returns the Aggregators that must be updated by the instrument
// defined by key. // defined by key.
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Aggregator[N], error) { func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
var aggs []aggregate.Aggregator[N] var measures []aggregate.Measure[N]
errs := &multierror{} errs := &multierror{}
for _, i := range r.inserters { for _, i := range r.inserters {
a, err := i.Instrument(id) in, err := i.Instrument(id)
if err != nil { if err != nil {
errs.append(err) errs.append(err)
} }
aggs = append(aggs, a...) measures = append(measures, in...)
} }
return aggs, errs.errorOrNil() return measures, errs.errorOrNil()
} }
type multierror struct { type multierror struct {

View File

@ -15,6 +15,7 @@
package metric // import "go.opentelemetry.io/otel/sdk/metric" package metric // import "go.opentelemetry.io/otel/sdk/metric"
import ( import (
"context"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -27,6 +28,8 @@ import (
"go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
@ -43,19 +46,112 @@ func (invalidAggregation) Err() error {
return nil return nil
} }
func requireN[N int64 | float64](t *testing.T, n int, m []aggregate.Measure[N], comps []aggregate.ComputeAggregation, err error) {
t.Helper()
assert.NoError(t, err)
require.Len(t, m, n)
require.Len(t, comps, n)
}
func assertSum[N int64 | float64](n int, temp metricdata.Temporality, mono bool, v [2]N) func(*testing.T, []aggregate.Measure[N], []aggregate.ComputeAggregation, error) {
return func(t *testing.T, meas []aggregate.Measure[N], comps []aggregate.ComputeAggregation, err error) {
t.Helper()
requireN[N](t, n, meas, comps, err)
for m := 0; m < n; m++ {
t.Logf("input/output number: %d", m)
in, out := meas[m], comps[m]
in(context.Background(), 1, *attribute.EmptySet())
var got metricdata.Aggregation
assert.Equal(t, 1, out(&got), "1 data-point expected")
metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[N]{
Temporality: temp,
IsMonotonic: mono,
DataPoints: []metricdata.DataPoint[N]{{Value: v[0]}},
}, got, metricdatatest.IgnoreTimestamp())
in(context.Background(), 3, *attribute.EmptySet())
assert.Equal(t, 1, out(&got), "1 data-point expected")
metricdatatest.AssertAggregationsEqual(t, metricdata.Sum[N]{
Temporality: temp,
IsMonotonic: mono,
DataPoints: []metricdata.DataPoint[N]{{Value: v[1]}},
}, got, metricdatatest.IgnoreTimestamp())
}
}
}
func assertHist[N int64 | float64](temp metricdata.Temporality) func(*testing.T, []aggregate.Measure[N], []aggregate.ComputeAggregation, error) {
return func(t *testing.T, meas []aggregate.Measure[N], comps []aggregate.ComputeAggregation, err error) {
t.Helper()
requireN[N](t, 1, meas, comps, err)
in, out := meas[0], comps[0]
in(context.Background(), 1, *attribute.EmptySet())
var got metricdata.Aggregation
assert.Equal(t, 1, out(&got), "1 data-point expected")
buckets := []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
n := 1
metricdatatest.AssertAggregationsEqual(t, metricdata.Histogram[N]{
Temporality: temp,
DataPoints: []metricdata.HistogramDataPoint[N]{{
Count: uint64(n),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: buckets,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](1),
Sum: N(n),
}},
}, got, metricdatatest.IgnoreTimestamp())
in(context.Background(), 1, *attribute.EmptySet())
if temp == metricdata.CumulativeTemporality {
buckets[1] = 2
n = 2
}
assert.Equal(t, 1, out(&got), "1 data-point expected")
metricdatatest.AssertAggregationsEqual(t, metricdata.Histogram[N]{
Temporality: temp,
DataPoints: []metricdata.HistogramDataPoint[N]{{
Count: uint64(n),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: buckets,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](1),
Sum: N(n),
}},
}, got, metricdatatest.IgnoreTimestamp())
}
}
func assertLastValue[N int64 | float64](t *testing.T, meas []aggregate.Measure[N], comps []aggregate.ComputeAggregation, err error) {
t.Helper()
requireN[N](t, 1, meas, comps, err)
in, out := meas[0], comps[0]
in(context.Background(), 10, *attribute.EmptySet())
in(context.Background(), 1, *attribute.EmptySet())
var got metricdata.Aggregation
assert.Equal(t, 1, out(&got), "1 data-point expected")
metricdatatest.AssertAggregationsEqual(t, metricdata.Gauge[N]{
DataPoints: []metricdata.DataPoint[N]{{Value: 1}},
}, got, metricdatatest.IgnoreTimestamp())
}
func testCreateAggregators[N int64 | float64](t *testing.T) { func testCreateAggregators[N int64 | float64](t *testing.T) {
changeAggView := NewView( changeAggView := NewView(
Instrument{Name: "foo"}, Instrument{Name: "foo"},
Stream{Aggregation: aggregation.ExplicitBucketHistogram{}}, Stream{Aggregation: aggregation.ExplicitBucketHistogram{
) Boundaries: []float64{0, 100},
renameView := NewView( NoMinMax: true,
Instrument{Name: "foo"}, }},
Stream{Name: "bar"},
)
defaultAggView := NewView(
Instrument{Name: "foo"},
Stream{Aggregation: aggregation.Default{}},
) )
renameView := NewView(Instrument{Name: "foo"}, Stream{Name: "bar"})
invalidAggView := NewView( invalidAggView := NewView(
Instrument{Name: "foo"}, Instrument{Name: "foo"},
Stream{Aggregation: invalidAggregation{}}, Stream{Aggregation: invalidAggregation{}},
@ -76,194 +172,195 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
reader Reader reader Reader
views []View views []View
inst Instrument inst Instrument
wantKind aggregate.Aggregator[N] //Aggregators should match len and types validate func(*testing.T, []aggregate.Measure[N], []aggregate.ComputeAggregation, error)
wantLen int
wantErr error
}{ }{
{ {
name: "drop should return 0 aggregators", name: "Default/Drop",
reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })),
views: []View{defaultView},
inst: instruments[InstrumentKindCounter], inst: instruments[InstrumentKindCounter],
validate: func(t *testing.T, meas []aggregate.Measure[N], comps []aggregate.ComputeAggregation, err error) {
t.Helper()
assert.NoError(t, err)
assert.Len(t, meas, 0)
assert.Len(t, comps, 0)
},
}, },
{ {
name: "default agg should use reader", name: "Default/Delta/Sum/NonMonotonic",
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []View{defaultAggView},
inst: instruments[InstrumentKindUpDownCounter], inst: instruments[InstrumentKindUpDownCounter],
wantKind: aggregate.NewDeltaSum[N](false), validate: assertSum[N](1, metricdata.DeltaTemporality, false, [2]N{1, 3}),
wantLen: 1,
}, },
{ {
name: "default agg should use reader", name: "Default/Delta/ExplicitBucketHistogram",
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []View{defaultAggView},
inst: instruments[InstrumentKindHistogram], inst: instruments[InstrumentKindHistogram],
wantKind: aggregate.NewDeltaHistogram[N](aggregation.ExplicitBucketHistogram{}), validate: assertHist[N](metricdata.DeltaTemporality),
wantLen: 1,
}, },
{ {
name: "default agg should use reader", name: "Default/Delta/PrecomputedSum/Monotonic",
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []View{defaultAggView},
inst: instruments[InstrumentKindObservableCounter], inst: instruments[InstrumentKindObservableCounter],
wantKind: aggregate.NewPrecomputedDeltaSum[N](true), validate: assertSum[N](1, metricdata.DeltaTemporality, true, [2]N{1, 2}),
wantLen: 1,
}, },
{ {
name: "default agg should use reader", name: "Default/Delta/PrecomputedSum/NonMonotonic",
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []View{defaultAggView},
inst: instruments[InstrumentKindObservableUpDownCounter], inst: instruments[InstrumentKindObservableUpDownCounter],
wantKind: aggregate.NewPrecomputedDeltaSum[N](false), validate: assertSum[N](1, metricdata.DeltaTemporality, false, [2]N{1, 2}),
wantLen: 1,
}, },
{ {
name: "default agg should use reader", name: "Default/Delta/Gauge",
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []View{defaultAggView},
inst: instruments[InstrumentKindObservableGauge], inst: instruments[InstrumentKindObservableGauge],
wantKind: aggregate.NewLastValue[N](), validate: assertLastValue[N],
wantLen: 1,
}, },
{ {
name: "default agg should use reader", name: "Default/Delta/Sum/Monotonic",
reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)), reader: NewManualReader(WithTemporalitySelector(deltaTemporalitySelector)),
views: []View{defaultAggView},
inst: instruments[InstrumentKindCounter], inst: instruments[InstrumentKindCounter],
wantKind: aggregate.NewDeltaSum[N](true), validate: assertSum[N](1, metricdata.DeltaTemporality, true, [2]N{1, 3}),
wantLen: 1,
}, },
{ {
name: "reader should set default agg", name: "Default/Cumulative/Sum/NonMonotonic",
reader: NewManualReader(), reader: NewManualReader(),
views: []View{defaultView},
inst: instruments[InstrumentKindUpDownCounter], inst: instruments[InstrumentKindUpDownCounter],
wantKind: aggregate.NewCumulativeSum[N](false), validate: assertSum[N](1, metricdata.CumulativeTemporality, false, [2]N{1, 4}),
wantLen: 1,
}, },
{ {
name: "reader should set default agg", name: "Default/Cumulative/ExplicitBucketHistogram",
reader: NewManualReader(), reader: NewManualReader(),
views: []View{defaultView},
inst: instruments[InstrumentKindHistogram], inst: instruments[InstrumentKindHistogram],
wantKind: aggregate.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), validate: assertHist[N](metricdata.CumulativeTemporality),
wantLen: 1,
}, },
{ {
name: "reader should set default agg", name: "Default/Cumulative/PrecomputedSum/Monotonic",
reader: NewManualReader(), reader: NewManualReader(),
views: []View{defaultView},
inst: instruments[InstrumentKindObservableCounter], inst: instruments[InstrumentKindObservableCounter],
wantKind: aggregate.NewPrecomputedCumulativeSum[N](true), validate: assertSum[N](1, metricdata.CumulativeTemporality, true, [2]N{1, 3}),
wantLen: 1,
}, },
{ {
name: "reader should set default agg", name: "Default/Cumulative/PrecomputedSum/NonMonotonic",
reader: NewManualReader(), reader: NewManualReader(),
views: []View{defaultView},
inst: instruments[InstrumentKindObservableUpDownCounter], inst: instruments[InstrumentKindObservableUpDownCounter],
wantKind: aggregate.NewPrecomputedCumulativeSum[N](false), validate: assertSum[N](1, metricdata.CumulativeTemporality, false, [2]N{1, 3}),
wantLen: 1,
}, },
{ {
name: "reader should set default agg", name: "Default/Cumulative/Gauge",
reader: NewManualReader(), reader: NewManualReader(),
views: []View{defaultView},
inst: instruments[InstrumentKindObservableGauge], inst: instruments[InstrumentKindObservableGauge],
wantKind: aggregate.NewLastValue[N](), validate: assertLastValue[N],
wantLen: 1,
}, },
{ {
name: "reader should set default agg", name: "Default/Cumulative/Sum/Monotonic",
reader: NewManualReader(), reader: NewManualReader(),
views: []View{defaultView},
inst: instruments[InstrumentKindCounter], inst: instruments[InstrumentKindCounter],
wantKind: aggregate.NewCumulativeSum[N](true), validate: assertSum[N](1, metricdata.CumulativeTemporality, true, [2]N{1, 4}),
wantLen: 1,
}, },
{ {
name: "view should overwrite reader", name: "ViewHasPrecedence",
reader: NewManualReader(), reader: NewManualReader(),
views: []View{changeAggView}, views: []View{changeAggView},
inst: instruments[InstrumentKindCounter], inst: instruments[InstrumentKindCounter],
wantKind: aggregate.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), validate: func(t *testing.T, meas []aggregate.Measure[N], comps []aggregate.ComputeAggregation, err error) {
wantLen: 1, t.Helper()
requireN[N](t, 1, meas, comps, err)
in, out := meas[0], comps[0]
in(context.Background(), 1, *attribute.EmptySet())
var got metricdata.Aggregation
assert.Equal(t, 1, out(&got), "1 data-point expected")
metricdatatest.AssertAggregationsEqual(t, metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{{
Count: 1,
Bounds: []float64{0, 100},
BucketCounts: []uint64{0, 1, 0},
Sum: 1,
}},
}, got, metricdatatest.IgnoreTimestamp())
in(context.Background(), 1, *attribute.EmptySet())
assert.Equal(t, 1, out(&got), "1 data-point expected")
metricdatatest.AssertAggregationsEqual(t, metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{{
Count: 2,
Bounds: []float64{0, 100},
BucketCounts: []uint64{0, 2, 0},
Sum: 2,
}},
}, got, metricdatatest.IgnoreTimestamp())
},
}, },
{ {
name: "multiple views should create multiple aggregators", name: "MultipleViews",
reader: NewManualReader(), reader: NewManualReader(),
views: []View{defaultView, renameView}, views: []View{defaultView, renameView},
inst: instruments[InstrumentKindCounter], inst: instruments[InstrumentKindCounter],
wantKind: aggregate.NewCumulativeSum[N](true), validate: assertSum[N](2, metricdata.CumulativeTemporality, true, [2]N{1, 4}),
wantLen: 2,
}, },
{ {
name: "reader with default aggregation should figure out a Counter", name: "Reader/Default/Cumulative/Sum/Monotonic",
reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })),
views: []View{defaultView},
inst: instruments[InstrumentKindCounter], inst: instruments[InstrumentKindCounter],
wantKind: aggregate.NewCumulativeSum[N](true), validate: assertSum[N](1, metricdata.CumulativeTemporality, true, [2]N{1, 4}),
wantLen: 1,
}, },
{ {
name: "reader with default aggregation should figure out an UpDownCounter", name: "Reader/Default/Cumulative/Sum/NonMonotonic",
reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })),
views: []View{defaultView},
inst: instruments[InstrumentKindUpDownCounter], inst: instruments[InstrumentKindUpDownCounter],
wantKind: aggregate.NewCumulativeSum[N](true), validate: assertSum[N](1, metricdata.CumulativeTemporality, false, [2]N{1, 4}),
wantLen: 1,
}, },
{ {
name: "reader with default aggregation should figure out an Histogram", name: "Reader/Default/Cumulative/ExplicitBucketHistogram",
reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })),
views: []View{defaultView},
inst: instruments[InstrumentKindHistogram], inst: instruments[InstrumentKindHistogram],
wantKind: aggregate.NewCumulativeHistogram[N](aggregation.ExplicitBucketHistogram{}), validate: assertHist[N](metricdata.CumulativeTemporality),
wantLen: 1,
}, },
{ {
name: "reader with default aggregation should figure out an ObservableCounter", name: "Reader/Default/Cumulative/PrecomputedSum/Monotonic",
reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })),
views: []View{defaultView},
inst: instruments[InstrumentKindObservableCounter], inst: instruments[InstrumentKindObservableCounter],
wantKind: aggregate.NewPrecomputedCumulativeSum[N](true), validate: assertSum[N](1, metricdata.CumulativeTemporality, true, [2]N{1, 3}),
wantLen: 1,
}, },
{ {
name: "reader with default aggregation should figure out an ObservableUpDownCounter", name: "Reader/Default/Cumulative/PrecomputedSum/NonMonotonic",
reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })),
views: []View{defaultView},
inst: instruments[InstrumentKindObservableUpDownCounter], inst: instruments[InstrumentKindObservableUpDownCounter],
wantKind: aggregate.NewPrecomputedCumulativeSum[N](true), validate: assertSum[N](1, metricdata.CumulativeTemporality, false, [2]N{1, 3}),
wantLen: 1,
}, },
{ {
name: "reader with default aggregation should figure out an ObservableGauge", name: "Reader/Default/Gauge",
reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })),
views: []View{defaultView},
inst: instruments[InstrumentKindObservableGauge], inst: instruments[InstrumentKindObservableGauge],
wantKind: aggregate.NewLastValue[N](), validate: assertLastValue[N],
wantLen: 1,
}, },
{ {
name: "view with invalid aggregation should error", name: "InvalidAggregation",
reader: NewManualReader(), reader: NewManualReader(),
views: []View{invalidAggView}, views: []View{invalidAggView},
inst: instruments[InstrumentKindCounter], inst: instruments[InstrumentKindCounter],
wantErr: errCreatingAggregators, validate: func(t *testing.T, _ []aggregate.Measure[N], _ []aggregate.ComputeAggregation, err error) {
assert.ErrorIs(t, err, errCreatingAggregators)
},
}, },
} }
for _, tt := range testcases { for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
var c cache[string, streamID] var c cache[string, streamID]
i := newInserter[N](newPipeline(nil, tt.reader, tt.views), &c) p := newPipeline(nil, tt.reader, tt.views)
got, err := i.Instrument(tt.inst) i := newInserter[N](p, &c)
assert.ErrorIs(t, err, tt.wantErr) input, err := i.Instrument(tt.inst)
require.Len(t, got, tt.wantLen) var comps []aggregate.ComputeAggregation
for _, agg := range got { for _, instSyncs := range p.aggregations {
assert.IsType(t, tt.wantKind, agg) for _, i := range instSyncs {
comps = append(comps, i.compAgg)
}
} }
tt.validate(t, input, comps, err)
}) })
} }
} }

View File

@ -30,13 +30,13 @@ import (
"go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/resource"
) )
type testSumAggregator struct{} func testSumAggregateOutput(dest *metricdata.Aggregation) int {
*dest = metricdata.Sum[int64]{
func (testSumAggregator) Aggregation() metricdata.Aggregation {
return metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality, Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false, IsMonotonic: false,
DataPoints: []metricdata.DataPoint[int64]{}} DataPoints: []metricdata.DataPoint[int64]{{Value: 1}},
}
return 1
} }
func TestNewPipeline(t *testing.T) { func TestNewPipeline(t *testing.T) {
@ -48,7 +48,7 @@ func TestNewPipeline(t *testing.T) {
assert.Equal(t, resource.Empty(), output.Resource) assert.Equal(t, resource.Empty(), output.Resource)
assert.Len(t, output.ScopeMetrics, 0) assert.Len(t, output.ScopeMetrics, 0)
iSync := instrumentSync{"name", "desc", "1", testSumAggregator{}} iSync := instrumentSync{"name", "desc", "1", testSumAggregateOutput}
assert.NotPanics(t, func() { assert.NotPanics(t, func() {
pipe.addSync(instrumentation.Scope{}, iSync) pipe.addSync(instrumentation.Scope{}, iSync)
}) })
@ -92,7 +92,7 @@ func TestPipelineConcurrency(t *testing.T) {
go func(n int) { go func(n int) {
defer wg.Done() defer wg.Done()
name := fmt.Sprintf("name %d", n) name := fmt.Sprintf("name %d", n)
sync := instrumentSync{name, "desc", "1", testSumAggregator{}} sync := instrumentSync{name, "desc", "1", testSumAggregateOutput}
pipe.addSync(instrumentation.Scope{}, sync) pipe.addSync(instrumentation.Scope{}, sync)
}(i) }(i)
@ -142,8 +142,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
got, err := i.Instrument(inst) got, err := i.Instrument(inst)
require.NoError(t, err) require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied") assert.Len(t, got, 1, "default view not applied")
for _, a := range got { for _, in := range got {
a.Aggregate(1, *attribute.EmptySet()) in(context.Background(), 1, *attribute.EmptySet())
} }
out := metricdata.ResourceMetrics{} out := metricdata.ResourceMetrics{}