1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-10-31 00:07:40 +02:00

Implement WithExplicitBucketBoundaries option in the metric SDK (#4605)

This commit is contained in:
David Ashpole
2023-10-31 03:41:27 -04:00
committed by GitHub
parent 5ec67e83df
commit b2bb2ad00f
6 changed files with 229 additions and 34 deletions

View File

@@ -21,6 +21,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add `Version` function in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4660)
- Add Summary, SummaryDataPoint, and QuantileValue to `go.opentelemetry.io/sdk/metric/metricdata`. (#4622)
- `go.opentelemetry.io/otel/bridge/opencensus.NewMetricProducer` now supports exemplars from OpenCensus. (#4585)
- Add support for `WithExplicitBucketBoundaries` in `go.opentelemetry.io/otel/sdk/metric`. (#4605)
### Deprecated

View File

@@ -95,9 +95,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou
// distribution of int64 measurements during a computational operation.
func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
cfg := metric.NewInt64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := int64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookupHistogram(name, cfg)
if err != nil {
return i, err
}
@@ -188,9 +187,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow
// distribution of float64 measurements during a computational operation.
func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
cfg := metric.NewFloat64HistogramConfig(options...)
const kind = InstrumentKindHistogram
p := float64InstProvider{m}
i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit())
i, err := p.lookupHistogram(name, cfg)
if err != nil {
return i, err
}
@@ -456,12 +454,36 @@ func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]ag
return p.int64Resolver.Aggregators(inst)
}
func (p int64InstProvider) histogramAggs(name string, cfg metric.Int64HistogramConfig) ([]aggregate.Measure[int64], error) {
boundaries := cfg.ExplicitBucketBoundaries()
aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err()
if aggError != nil {
// If boundaries are invalid, ignore them.
boundaries = nil
}
inst := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
measures, err := p.int64Resolver.HistogramAggregators(inst, boundaries)
return measures, errors.Join(aggError, err)
}
// lookup returns the resolved instrumentImpl.
func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &int64Inst{measures: aggs}, err
}
// lookupHistogram returns the resolved instrumentImpl.
func (p int64InstProvider) lookupHistogram(name string, cfg metric.Int64HistogramConfig) (*int64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &int64Inst{measures: aggs}, err
}
// float64InstProvider provides float64 OpenTelemetry instruments.
type float64InstProvider struct{ *meter }
@@ -476,12 +498,36 @@ func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]
return p.float64Resolver.Aggregators(inst)
}
func (p float64InstProvider) histogramAggs(name string, cfg metric.Float64HistogramConfig) ([]aggregate.Measure[float64], error) {
boundaries := cfg.ExplicitBucketBoundaries()
aggError := AggregationExplicitBucketHistogram{Boundaries: boundaries}.err()
if aggError != nil {
// If boundaries are invalid, ignore them.
boundaries = nil
}
inst := Instrument{
Name: name,
Description: cfg.Description(),
Unit: cfg.Unit(),
Kind: InstrumentKindHistogram,
Scope: p.scope,
}
measures, err := p.float64Resolver.HistogramAggregators(inst, boundaries)
return measures, errors.Join(aggError, err)
}
// lookup returns the resolved instrumentImpl.
func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) {
aggs, err := p.aggs(kind, name, desc, u)
return &float64Inst{measures: aggs}, err
}
// lookupHistogram returns the resolved instrumentImpl.
func (p float64InstProvider) lookupHistogram(name string, cfg metric.Float64HistogramConfig) (*float64Inst, error) {
aggs, err := p.histogramAggs(name, cfg)
return &float64Inst{measures: aggs}, err
}
type int64ObservProvider struct{ *meter }
func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) {

View File

@@ -16,6 +16,7 @@ package metric
import (
"context"
"errors"
"fmt"
"strings"
"sync"
@@ -550,6 +551,17 @@ func TestMeterCreatesInstrumentsValidations(t *testing.T) {
wantErr: fmt.Errorf("%w: _: must start with a letter", ErrInstrumentName),
},
{
name: "Int64Histogram with invalid buckets",
fn: func(t *testing.T, m metric.Meter) error {
i, err := m.Int64Histogram("histogram", metric.WithExplicitBucketBoundaries(-1, 1, -5))
assert.NotNil(t, i)
return err
},
wantErr: errors.Join(fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, []float64{-1, 1, -5})),
},
{
name: "Int64ObservableCounter with no validation issues",
@@ -670,6 +682,17 @@ func TestMeterCreatesInstrumentsValidations(t *testing.T) {
wantErr: fmt.Errorf("%w: _: must start with a letter", ErrInstrumentName),
},
{
name: "Float64Histogram with invalid buckets",
fn: func(t *testing.T, m metric.Meter) error {
i, err := m.Float64Histogram("histogram", metric.WithExplicitBucketBoundaries(-1, 1, -5))
assert.NotNil(t, i)
return err
},
wantErr: errors.Join(fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, []float64{-1, 1, -5})),
},
{
name: "Float64ObservableCounter with no validation issues",
@@ -1970,3 +1993,63 @@ func TestMalformedSelectors(t *testing.T) {
})
}
}
func TestHistogramBucketPrecedenceOrdering(t *testing.T) {
defaultBuckets := []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}
aggregationSelector := func(InstrumentKind) Aggregation {
return AggregationExplicitBucketHistogram{Boundaries: []float64{0, 1, 2, 3, 4, 5}}
}
for _, tt := range []struct {
desc string
reader Reader
views []View
histogramOpts []metric.Float64HistogramOption
expectedBucketBoundaries []float64
}{
{
desc: "default",
reader: NewManualReader(),
expectedBucketBoundaries: defaultBuckets,
},
{
desc: "custom reader aggregation overrides default",
reader: NewManualReader(WithAggregationSelector(aggregationSelector)),
expectedBucketBoundaries: []float64{0, 1, 2, 3, 4, 5},
},
{
desc: "overridden by histogram option",
reader: NewManualReader(WithAggregationSelector(aggregationSelector)),
histogramOpts: []metric.Float64HistogramOption{
metric.WithExplicitBucketBoundaries(0, 2, 4, 6, 8, 10),
},
expectedBucketBoundaries: []float64{0, 2, 4, 6, 8, 10},
},
{
desc: "overridden by view",
reader: NewManualReader(WithAggregationSelector(aggregationSelector)),
histogramOpts: []metric.Float64HistogramOption{
metric.WithExplicitBucketBoundaries(0, 2, 4, 6, 8, 10),
},
views: []View{NewView(Instrument{Name: "*"}, Stream{
Aggregation: AggregationExplicitBucketHistogram{Boundaries: []float64{0, 3, 6, 9, 12, 15}},
})},
expectedBucketBoundaries: []float64{0, 3, 6, 9, 12, 15},
},
} {
t.Run(tt.desc, func(t *testing.T) {
meter := NewMeterProvider(WithView(tt.views...), WithReader(tt.reader)).Meter("TestHistogramBucketPrecedenceOrdering")
sfHistogram, err := meter.Float64Histogram("sync.float64.histogram", tt.histogramOpts...)
require.NoError(t, err)
sfHistogram.Record(context.Background(), 1)
var rm metricdata.ResourceMetrics
err = tt.reader.Collect(context.Background(), &rm)
require.NoError(t, err)
require.Len(t, rm.ScopeMetrics, 1)
require.Len(t, rm.ScopeMetrics[0].Metrics, 1)
gotHist, ok := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Histogram[float64])
require.True(t, ok)
require.Len(t, gotHist.DataPoints, 1)
assert.Equal(t, tt.expectedBucketBoundaries, gotHist.DataPoints[0].Bounds)
})
}
}

View File

@@ -231,7 +231,7 @@ func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *ins
//
// If an instrument is determined to use a Drop aggregation, that instrument is
// not inserted nor returned.
func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error) {
func (i *inserter[N]) Instrument(inst Instrument, readerAggregation Aggregation) ([]aggregate.Measure[N], error) {
var (
matched bool
measures []aggregate.Measure[N]
@@ -245,8 +245,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
continue
}
matched = true
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
}
@@ -271,7 +270,7 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error
Description: inst.Description,
Unit: inst.Unit,
}
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation)
if err != nil {
errs.append(err)
}
@@ -291,6 +290,31 @@ type aggVal[N int64 | float64] struct {
Err error
}
// readerDefaultAggregation returns the default aggregation for the instrument
// kind based on the reader's aggregation preferences. This is used unless the
// aggregation is overridden with a view.
func (i *inserter[N]) readerDefaultAggregation(kind InstrumentKind) Aggregation {
aggregation := i.pipeline.reader.aggregation(kind)
switch aggregation.(type) {
case nil, AggregationDefault:
// If the reader returns default or nil use the default selector.
aggregation = DefaultAggregationSelector(kind)
default:
// Deep copy and validate before using.
aggregation = aggregation.copy()
if err := aggregation.err(); err != nil {
orig := aggregation
aggregation = DefaultAggregationSelector(kind)
global.Error(
err, "using default aggregation instead",
"aggregation", orig,
"replacement", aggregation,
)
}
}
return aggregation
}
// cachedAggregator returns the appropriate aggregate input and output
// functions for an instrument configuration. If the exact instrument has been
// created within the inst.Scope, those aggregate function instances will be
@@ -305,29 +329,14 @@ type aggVal[N int64 | float64] struct {
//
// If the instrument defines an unknown or incompatible aggregation, an error
// is returned.
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream, readerAggregation Aggregation) (meas aggregate.Measure[N], aggID uint64, err error) {
switch stream.Aggregation.(type) {
case nil:
// Undefined, nil, means to use the default from the reader.
stream.Aggregation = i.pipeline.reader.aggregation(kind)
switch stream.Aggregation.(type) {
case nil, AggregationDefault:
// If the reader returns default or nil use the default selector.
stream.Aggregation = DefaultAggregationSelector(kind)
default:
// Deep copy and validate before using.
stream.Aggregation = stream.Aggregation.copy()
if err := stream.Aggregation.err(); err != nil {
orig := stream.Aggregation
stream.Aggregation = DefaultAggregationSelector(kind)
global.Error(
err, "using default aggregation instead",
"aggregation", orig,
"replacement", stream.Aggregation,
)
}
}
// The aggregation was not overridden with a view. Use the aggregation
// provided by the reader.
stream.Aggregation = readerAggregation
case AggregationDefault:
// The view explicitly requested the default aggregation.
stream.Aggregation = DefaultAggregationSelector(kind)
}
@@ -596,7 +605,29 @@ func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error)
errs := &multierror{}
for _, i := range r.inserters {
in, err := i.Instrument(id)
in, err := i.Instrument(id, i.readerDefaultAggregation(id.Kind))
if err != nil {
errs.append(err)
}
measures = append(measures, in...)
}
return measures, errs.errorOrNil()
}
// HistogramAggregators returns the histogram Aggregators that must be updated by the instrument
// defined by key. If boundaries were provided on instrument instantiation, those take precedence
// over boundaries provided by the reader.
func (r resolver[N]) HistogramAggregators(id Instrument, boundaries []float64) ([]aggregate.Measure[N], error) {
var measures []aggregate.Measure[N]
errs := &multierror{}
for _, i := range r.inserters {
agg := i.readerDefaultAggregation(id.Kind)
if histAgg, ok := agg.(AggregationExplicitBucketHistogram); ok && len(boundaries) > 0 {
histAgg.Boundaries = boundaries
agg = histAgg
}
in, err := i.Instrument(id, agg)
if err != nil {
errs.append(err)
}

View File

@@ -351,7 +351,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
var c cache[string, instID]
p := newPipeline(nil, tt.reader, tt.views)
i := newInserter[N](p, &c)
input, err := i.Instrument(tt.inst)
readerAggregation := i.readerDefaultAggregation(tt.inst.Kind)
input, err := i.Instrument(tt.inst, readerAggregation)
var comps []aggregate.ComputeAggregation
for _, instSyncs := range p.aggregations {
for _, i := range instSyncs {
@@ -375,7 +376,8 @@ func testInvalidInstrumentShouldPanic[N int64 | float64]() {
Name: "foo",
Kind: InstrumentKind(255),
}
_, _ = i.Instrument(inst)
readerAggregation := i.readerDefaultAggregation(inst.Kind)
_, _ = i.Instrument(inst, readerAggregation)
}
func TestInvalidInstrumentShouldPanic(t *testing.T) {
@@ -460,6 +462,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {
p := newPipelines(resource.Empty(), tt.readers, tt.views)
testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount)
testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount)
testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount)
testPipelineRegistryResolveFloatHistogramAggregators(t, p, tt.wantCount)
})
}
}
@@ -484,6 +488,26 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want
require.Len(t, aggs, wantCount)
}
func testPipelineRegistryResolveIntHistogramAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, instID]
r := newResolver[int64](p, &c)
aggs, err := r.HistogramAggregators(inst, []float64{1, 2, 3})
assert.NoError(t, err)
require.Len(t, aggs, wantCount)
}
func testPipelineRegistryResolveFloatHistogramAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, instID]
r := newResolver[float64](p, &c)
aggs, err := r.HistogramAggregators(inst, []float64{1, 2, 3})
assert.NoError(t, err)
require.Len(t, aggs, wantCount)
}
func TestPipelineRegistryResource(t *testing.T) {
v := NewView(Instrument{Name: "bar"}, Stream{Name: "foo"})
readers := []Reader{NewManualReader()}
@@ -513,6 +537,14 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
floatAggs, err := rf.Aggregators(inst)
assert.Error(t, err)
assert.Len(t, floatAggs, 0)
intAggs, err = ri.HistogramAggregators(inst, []float64{1, 2, 3})
assert.Error(t, err)
assert.Len(t, intAggs, 0)
floatAggs, err = rf.HistogramAggregators(inst, []float64{1, 2, 3})
assert.Error(t, err)
assert.Len(t, floatAggs, 0)
}
type logCounter struct {

View File

@@ -146,7 +146,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
var c cache[string, instID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
readerAggregation := i.readerDefaultAggregation(inst.Kind)
got, err := i.Instrument(inst, readerAggregation)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")
for _, in := range got {
@@ -372,7 +373,8 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) {
pipe := newPipeline(nil, NewManualReader(), nil)
i := newInserter[int64](pipe, &vc)
_, origID, err := i.cachedAggregator(scope, kind, stream)
readerAggregation := i.readerDefaultAggregation(kind)
_, origID, err := i.cachedAggregator(scope, kind, stream, readerAggregation)
require.NoError(t, err)
require.Len(t, pipe.aggregations, 1)
@@ -382,7 +384,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) {
require.Equal(t, name, iSync[0].name)
stream.Name = "RequestCount"
_, id, err := i.cachedAggregator(scope, kind, stream)
_, id, err := i.cachedAggregator(scope, kind, stream, readerAggregation)
require.NoError(t, err)
assert.Equal(t, origID, id, "multiple aggregators for equivalent name")