mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-03-03 14:52:56 +02:00
Histogram aggregator functional options (#1434)
* Add a Config/Option for histogram * Just one option here * Test fixes * Support and test int64 histograms * Changelog * Lint * Un-export three things.
This commit is contained in:
parent
0df8cd620c
commit
c56227771d
@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Unify endpoint API that related to OTel exporter. (#1401)
|
||||
- Optimize metric histogram aggregator to re-use its slice of buckets. (#1435)
|
||||
- Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430)
|
||||
- Histogram aggregator accepts functional options, uses default boundaries if none given. (#1434)
|
||||
- `SamplingResult` now passed a `Tracestate` from the parent `SpanContext` (#1432)
|
||||
- Moved gRPC driver for OTLP exporter to `exporters/otlp/otlpgrpc`. (#1420)
|
||||
- The `TraceContext` propagator now correctly propagates `TraceState` through the `SpanContext`. (#1447)
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
|
||||
"go.opentelemetry.io/otel/label"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
||||
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
@ -59,9 +60,11 @@ func initMeter() {
|
||||
|
||||
cont := controller.New(
|
||||
processor.New(
|
||||
simple.NewWithHistogramDistribution([]float64{
|
||||
simple.NewWithHistogramDistribution(
|
||||
histogram.WithExplicitBoundaries([]float64{
|
||||
0.001, 0.01, 0.1, 1, 10, 100, 1000,
|
||||
}),
|
||||
),
|
||||
otlpExporter, // otlpExporter is an ExportKindSelector
|
||||
processor.WithMemory(true),
|
||||
),
|
||||
|
@ -32,9 +32,10 @@ import (
|
||||
"go.opentelemetry.io/otel/metric/number"
|
||||
export "go.opentelemetry.io/otel/sdk/export/metric"
|
||||
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
|
||||
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
|
||||
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
|
||||
)
|
||||
|
||||
// Exporter supports Prometheus pulls. It does not implement the
|
||||
@ -150,7 +151,9 @@ func InstallNewPipeline(config Config, options ...controller.Option) (*Exporter,
|
||||
func defaultController(config Config, options ...controller.Option) *controller.Controller {
|
||||
return controller.New(
|
||||
processor.New(
|
||||
simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries),
|
||||
selector.NewWithHistogramDistribution(
|
||||
histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries),
|
||||
),
|
||||
export.CumulativeExportKindSelector(),
|
||||
processor.WithMemory(true),
|
||||
),
|
||||
|
@ -756,7 +756,7 @@ func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record,
|
||||
if r.iKind.Adding() {
|
||||
agg, ckpt = metrictest.Unslice2(sum.New(2))
|
||||
} else {
|
||||
agg, ckpt = metrictest.Unslice2(histogram.New(2, &desc, testHistogramBoundaries))
|
||||
agg, ckpt = metrictest.Unslice2(histogram.New(2, &desc, histogram.WithExplicitBoundaries(testHistogramBoundaries)))
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
@ -39,7 +39,7 @@ func benchmarkHistogramSearchFloat64(b *testing.B, size int) {
|
||||
values[i] = rand.Float64() * inputRange
|
||||
}
|
||||
desc := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, number.Float64Kind)
|
||||
agg := &histogram.New(1, desc, boundaries)[0]
|
||||
agg := &histogram.New(1, desc, histogram.WithExplicitBoundaries(boundaries))[0]
|
||||
ctx := context.Background()
|
||||
|
||||
b.ReportAllocs()
|
||||
@ -90,7 +90,7 @@ func benchmarkHistogramSearchInt64(b *testing.B, size int) {
|
||||
values[i] = int64(rand.Float64() * inputRange)
|
||||
}
|
||||
desc := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, number.Int64Kind)
|
||||
agg := &histogram.New(1, desc, boundaries)[0]
|
||||
agg := &histogram.New(1, desc, histogram.WithExplicitBoundaries(boundaries))[0]
|
||||
ctx := context.Background()
|
||||
|
||||
b.ReportAllocs()
|
||||
|
@ -41,6 +41,19 @@ type (
|
||||
state *state
|
||||
}
|
||||
|
||||
// config describes how the histogram is aggregated.
|
||||
config struct {
|
||||
// explicitBoundaries support arbitrary bucketing schemes. This
|
||||
// is the general case.
|
||||
explicitBoundaries []float64
|
||||
}
|
||||
|
||||
// Option configures a histogram config.
|
||||
Option interface {
|
||||
// apply sets one or more config fields.
|
||||
apply(*config)
|
||||
}
|
||||
|
||||
// state represents the state of a histogram, consisting of
|
||||
// the sum and counts for all observed values and
|
||||
// the less than equal bucket count for the pre-determined boundaries.
|
||||
@ -51,6 +64,39 @@ type (
|
||||
}
|
||||
)
|
||||
|
||||
// WithExplicitBoundaries sets the ExplicitBoundaries configuration option of a config.
|
||||
func WithExplicitBoundaries(explicitBoundaries []float64) Option {
|
||||
return explicitBoundariesOption{explicitBoundaries}
|
||||
}
|
||||
|
||||
type explicitBoundariesOption struct {
|
||||
boundaries []float64
|
||||
}
|
||||
|
||||
func (o explicitBoundariesOption) apply(config *config) {
|
||||
config.explicitBoundaries = o.boundaries
|
||||
}
|
||||
|
||||
// defaultExplicitBoundaries have been copied from prometheus.DefBuckets.
|
||||
//
|
||||
// Note we anticipate the use of a high-precision histogram sketch as
|
||||
// the standard histogram aggregator for OTLP export.
|
||||
// (https://github.com/open-telemetry/opentelemetry-specification/issues/982).
|
||||
var defaultFloat64ExplicitBoundaries = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
|
||||
|
||||
// defaultInt64ExplicitBoundaryMultiplier determines the default
|
||||
// integer histogram boundaries.
|
||||
const defaultInt64ExplicitBoundaryMultiplier = 1e6
|
||||
|
||||
// defaultInt64ExplicitBoundaries applies a multiplier to the default
|
||||
// float64 boundaries: [ 5K, 10K, 25K, ..., 2.5M, 5M, 10M ]
|
||||
var defaultInt64ExplicitBoundaries = func(bounds []float64) (asint []float64) {
|
||||
for _, f := range bounds {
|
||||
asint = append(asint, defaultInt64ExplicitBoundaryMultiplier*f)
|
||||
}
|
||||
return
|
||||
}(defaultFloat64ExplicitBoundaries)
|
||||
|
||||
var _ export.Aggregator = &Aggregator{}
|
||||
var _ aggregation.Sum = &Aggregator{}
|
||||
var _ aggregation.Count = &Aggregator{}
|
||||
@ -64,14 +110,26 @@ var _ aggregation.Histogram = &Aggregator{}
|
||||
// Note that this aggregator maintains each value using independent
|
||||
// atomic operations, which introduces the possibility that
|
||||
// checkpoints are inconsistent.
|
||||
func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator {
|
||||
func New(cnt int, desc *metric.Descriptor, opts ...Option) []Aggregator {
|
||||
var cfg config
|
||||
|
||||
if desc.NumberKind() == number.Int64Kind {
|
||||
cfg.explicitBoundaries = defaultInt64ExplicitBoundaries
|
||||
} else {
|
||||
cfg.explicitBoundaries = defaultFloat64ExplicitBoundaries
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt.apply(&cfg)
|
||||
}
|
||||
|
||||
aggs := make([]Aggregator, cnt)
|
||||
|
||||
// Boundaries MUST be ordered otherwise the histogram could not
|
||||
// be properly computed.
|
||||
sortedBoundaries := make([]float64, len(boundaries))
|
||||
sortedBoundaries := make([]float64, len(cfg.explicitBoundaries))
|
||||
|
||||
copy(sortedBoundaries, boundaries)
|
||||
copy(sortedBoundaries, cfg.explicitBoundaries)
|
||||
sort.Float64s(sortedBoundaries)
|
||||
|
||||
for i := range aggs {
|
||||
|
@ -15,6 +15,7 @@
|
||||
package histogram_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
@ -59,16 +60,16 @@ var (
|
||||
},
|
||||
}
|
||||
|
||||
boundaries = []float64{500, 250, 750}
|
||||
testBoundaries = []float64{500, 250, 750}
|
||||
)
|
||||
|
||||
func new2(desc *metric.Descriptor) (_, _ *histogram.Aggregator) {
|
||||
alloc := histogram.New(2, desc, boundaries)
|
||||
func new2(desc *metric.Descriptor, options ...histogram.Option) (_, _ *histogram.Aggregator) {
|
||||
alloc := histogram.New(2, desc, options...)
|
||||
return &alloc[0], &alloc[1]
|
||||
}
|
||||
|
||||
func new4(desc *metric.Descriptor) (_, _, _, _ *histogram.Aggregator) {
|
||||
alloc := histogram.New(4, desc, boundaries)
|
||||
func new4(desc *metric.Descriptor, options ...histogram.Option) (_, _, _, _ *histogram.Aggregator) {
|
||||
alloc := histogram.New(4, desc, options...)
|
||||
return &alloc[0], &alloc[1], &alloc[2], &alloc[3]
|
||||
}
|
||||
|
||||
@ -84,11 +85,10 @@ func checkZero(t *testing.T, agg *histogram.Aggregator, desc *metric.Descriptor)
|
||||
buckets, err := agg.Histogram()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
require.Equal(t, len(buckets.Counts), len(testBoundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
|
||||
for i, bCount := range buckets.Counts {
|
||||
require.Equal(t, uint64(0), uint64(bCount), "Bucket #%d must have 0 observed values", i)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestHistogramAbsolute(t *testing.T) {
|
||||
@ -113,7 +113,7 @@ func TestHistogramPositiveAndNegative(t *testing.T) {
|
||||
func testHistogram(t *testing.T, profile aggregatortest.Profile, policy policy) {
|
||||
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)
|
||||
|
||||
agg, ckpt := new2(descriptor)
|
||||
agg, ckpt := new2(descriptor, histogram.WithExplicitBoundaries(testBoundaries))
|
||||
|
||||
// This needs to repeat at least 3 times to uncover a failure to reset
|
||||
// for the overall sum and count fields, since the third time through
|
||||
@ -139,12 +139,12 @@ func TestHistogramInitial(t *testing.T) {
|
||||
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
|
||||
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)
|
||||
|
||||
agg := &histogram.New(1, descriptor, boundaries)[0]
|
||||
agg := &histogram.New(1, descriptor, histogram.WithExplicitBoundaries(testBoundaries))[0]
|
||||
buckets, err := agg.Histogram()
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(buckets.Counts), len(boundaries)+1)
|
||||
require.Equal(t, len(buckets.Boundaries), len(boundaries))
|
||||
require.Equal(t, len(buckets.Counts), len(testBoundaries)+1)
|
||||
require.Equal(t, len(buckets.Boundaries), len(testBoundaries))
|
||||
})
|
||||
}
|
||||
|
||||
@ -152,7 +152,7 @@ func TestHistogramMerge(t *testing.T) {
|
||||
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
|
||||
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)
|
||||
|
||||
agg1, agg2, ckpt1, ckpt2 := new4(descriptor)
|
||||
agg1, agg2, ckpt1, ckpt2 := new4(descriptor, histogram.WithExplicitBoundaries(testBoundaries))
|
||||
|
||||
all := aggregatortest.NewNumbers(profile.NumberKind)
|
||||
|
||||
@ -180,7 +180,7 @@ func TestHistogramNotSet(t *testing.T) {
|
||||
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
|
||||
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)
|
||||
|
||||
agg, ckpt := new2(descriptor)
|
||||
agg, ckpt := new2(descriptor, histogram.WithExplicitBoundaries(testBoundaries))
|
||||
|
||||
err := agg.SynchronizedMove(ckpt, descriptor)
|
||||
require.NoError(t, err)
|
||||
@ -212,11 +212,12 @@ func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregator
|
||||
buckets, err := agg.Histogram()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(buckets.Counts), len(boundaries)+1,
|
||||
require.Equal(t, len(buckets.Counts), len(testBoundaries)+1,
|
||||
"There should be b + 1 counts, where b is the number of boundaries")
|
||||
|
||||
sortedBoundaries := make([]float64, len(boundaries))
|
||||
copy(sortedBoundaries, boundaries)
|
||||
sortedBoundaries := make([]float64, len(testBoundaries))
|
||||
copy(sortedBoundaries, testBoundaries)
|
||||
|
||||
sort.Float64s(sortedBoundaries)
|
||||
|
||||
require.EqualValues(t, sortedBoundaries, buckets.Boundaries)
|
||||
@ -240,7 +241,56 @@ func TestSynchronizedMoveReset(t *testing.T) {
|
||||
t,
|
||||
metric.ValueRecorderInstrumentKind,
|
||||
func(desc *metric.Descriptor) export.Aggregator {
|
||||
return &histogram.New(1, desc, boundaries)[0]
|
||||
return &histogram.New(1, desc, histogram.WithExplicitBoundaries(testBoundaries))[0]
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func TestHistogramDefaultBoundaries(t *testing.T) {
|
||||
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
|
||||
ctx := context.Background()
|
||||
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)
|
||||
|
||||
agg, ckpt := new2(descriptor)
|
||||
|
||||
bounds := []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} // len 11
|
||||
values := append(bounds, 100) // len 12
|
||||
expect := []uint64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} // len 12
|
||||
|
||||
for _, value := range values {
|
||||
var num number.Number
|
||||
|
||||
value -= .001 // Avoid exact boundaries
|
||||
|
||||
if descriptor.NumberKind() == number.Int64Kind {
|
||||
value *= 1e6
|
||||
num = number.NewInt64Number(int64(value))
|
||||
} else {
|
||||
num = number.NewFloat64Number(value)
|
||||
}
|
||||
|
||||
require.NoError(t, agg.Update(ctx, num, descriptor))
|
||||
}
|
||||
|
||||
bucks, err := agg.Histogram()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Check for proper lengths, 1 count in each bucket.
|
||||
require.Equal(t, len(values), len(bucks.Counts))
|
||||
require.Equal(t, len(bounds), len(bucks.Boundaries))
|
||||
require.EqualValues(t, expect, bucks.Counts)
|
||||
|
||||
require.Equal(t, expect, bucks.Counts)
|
||||
|
||||
// Move and repeat the test on `ckpt`.
|
||||
err = agg.SynchronizedMove(ckpt, descriptor)
|
||||
require.NoError(t, err)
|
||||
|
||||
bucks, err = ckpt.Histogram()
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, len(values), len(bucks.Counts))
|
||||
require.Equal(t, len(bounds), len(bucks.Boundaries))
|
||||
require.EqualValues(t, expect, bucks.Counts)
|
||||
})
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
func TestStressInt64Histogram(t *testing.T) {
|
||||
desc := metric.NewDescriptor("some_metric", metric.ValueRecorderInstrumentKind, number.Int64Kind)
|
||||
|
||||
alloc := histogram.New(2, &desc, []float64{25, 50, 75})
|
||||
alloc := histogram.New(2, &desc, histogram.WithExplicitBoundaries([]float64{25, 50, 75}))
|
||||
h, ckpt := &alloc[0], &alloc[1]
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(context.Background())
|
||||
|
@ -184,7 +184,7 @@ func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...
|
||||
*aggPtrs[i] = &aggs[i]
|
||||
}
|
||||
case strings.HasSuffix(desc.Name(), ".histogram"):
|
||||
aggs := histogram.New(len(aggPtrs), desc, nil)
|
||||
aggs := histogram.New(len(aggPtrs), desc)
|
||||
for i := range aggPtrs {
|
||||
*aggPtrs[i] = &aggs[i]
|
||||
}
|
||||
|
@ -28,7 +28,7 @@ type (
|
||||
selectorInexpensive struct{}
|
||||
selectorExact struct{}
|
||||
selectorHistogram struct {
|
||||
boundaries []float64
|
||||
options []histogram.Option
|
||||
}
|
||||
)
|
||||
|
||||
@ -59,8 +59,8 @@ func NewWithExactDistribution() export.AggregatorSelector {
|
||||
// NewWithHistogramDistribution returns a simple aggregator selector
|
||||
// that uses histogram aggregators for `ValueRecorder` instruments.
|
||||
// This selector is a good default choice for most metric exporters.
|
||||
func NewWithHistogramDistribution(boundaries []float64) export.AggregatorSelector {
|
||||
return selectorHistogram{boundaries: boundaries}
|
||||
func NewWithHistogramDistribution(options ...histogram.Option) export.AggregatorSelector {
|
||||
return selectorHistogram{options: options}
|
||||
}
|
||||
|
||||
func sumAggs(aggPtrs []*export.Aggregator) {
|
||||
@ -110,7 +110,7 @@ func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor, aggPtrs
|
||||
case metric.ValueObserverInstrumentKind:
|
||||
lastValueAggs(aggPtrs)
|
||||
case metric.ValueRecorderInstrumentKind:
|
||||
aggs := histogram.New(len(aggPtrs), descriptor, s.boundaries)
|
||||
aggs := histogram.New(len(aggPtrs), descriptor, s.options...)
|
||||
for i := range aggPtrs {
|
||||
*aggPtrs[i] = &aggs[i]
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ func TestExactDistribution(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestHistogramDistribution(t *testing.T) {
|
||||
hist := simple.NewWithHistogramDistribution(nil)
|
||||
hist := simple.NewWithHistogramDistribution()
|
||||
require.IsType(t, (*histogram.Aggregator)(nil), oneAgg(hist, &testValueRecorderDesc))
|
||||
testFixedSelectors(t, hist)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user