1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-01-26 03:52:03 +02:00

Move exemplar types to non-internal package (#5747)

Part of https://github.com/open-telemetry/opentelemetry-go/issues/5249

This makes all existing types designed to implement the public Exemplar
API public by moving most of `internal/exemplar` to `exemplar`. The only
types that are not being made public are `exemplar.Drop`, and
`exemplar.FilteredReservoir`. Those types are moved to
`internal/aggregate`, and are renamed to `DropReservoir` and
`FilteredExemplarReservoir`.

The following types are made public:

* `exemplar.Exemplar`
* `exemplar.Filter`
* `exemplar.SampledFilter`
* `exemplar.AlwaysOnFilter`
* `exemplar.HistogramReservoir`
* `exemplar.FixedSizeReservoir`
* `exemplar.Reservoir`
* `exemplar.Value`
* `exemplar.ValueType`
This commit is contained in:
David Ashpole 2024-09-26 16:25:05 -04:00 committed by GitHub
parent 6edc7a63df
commit 481f4983f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 214 additions and 172 deletions

View File

@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [Unreleased]
### Added
- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `SampledFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747)
### Changed
- Enable exemplars by default in `go.opentelemetry.io/otel/sdk/metric`. Exemplars can be disabled by setting `OTEL_METRICS_EXEMPLAR_FILTER=always_off` (#5778)

View File

@ -8,7 +8,8 @@ import (
"runtime"
"slices"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)
// reservoirFunc returns the appropriately configured exemplar reservoir
@ -18,7 +19,7 @@ import (
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredReservoir[N] {
func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.FilteredExemplarReservoir[N] {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"
@ -28,7 +29,7 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
case "always_on":
filter = exemplar.AlwaysOnFilter
case "always_off":
return exemplar.Drop
return aggregate.DropReservoir
case "trace_based":
fallthrough
default:
@ -41,9 +42,9 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.FilteredReservoir[N] {
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return exemplar.NewFilteredReservoir[N](filter, exemplar.Histogram(bounds))
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
}
}
@ -71,7 +72,7 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
}
}
return func() exemplar.FilteredReservoir[N] {
return exemplar.NewFilteredReservoir[N](filter, exemplar.FixedSize(n))
return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
}
}

View File

@ -0,0 +1,3 @@
# Metric SDK Exemplars
[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/sdk/metric/exemplar)](https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/exemplar)

View File

@ -3,4 +3,4 @@
// Package exemplar provides an implementation of the OpenTelemetry exemplar
// reservoir to be used in metric collection pipelines.
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

View File

@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"time"

View File

@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"

View File

@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"

View File

@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
@ -12,15 +12,21 @@ import (
"go.opentelemetry.io/otel/attribute"
)
// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
return newRandRes(newStorage(k))
// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
// sample all additional measurement with a decreasing probability.
func NewFixedSizeReservoir(k int) *FixedSizeReservoir {
return newFixedSizeReservoir(newStorage(k))
}
type randRes struct {
var _ Reservoir = &FixedSizeReservoir{}
// FixedSizeReservoir is a [Reservoir] that samples at most k exemplars. If
// there are k or less measurements made, the Reservoir will sample each one.
// If there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
type FixedSizeReservoir struct {
*storage
// count is the number of measurement seen.
@ -39,8 +45,8 @@ type randRes struct {
rng *rand.Rand
}
func newRandRes(s *storage) *randRes {
r := &randRes{
func newFixedSizeReservoir(s *storage) *FixedSizeReservoir {
r := &FixedSizeReservoir{
storage: s,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
@ -50,7 +56,7 @@ func newRandRes(s *storage) *randRes {
// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (r *randRes) randomFloat64() float64 {
func (r *FixedSizeReservoir) randomFloat64() float64 {
// TODO: This does not return a uniform number. rng.Float64 returns a
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
// returns multiples of 2^-53, and not all floating point numbers between 0
@ -75,7 +81,18 @@ func (r *randRes) randomFloat64() float64 {
return f
}
func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
@ -131,7 +148,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute
}
// reset resets r to the initial state.
func (r *randRes) reset() {
func (r *FixedSizeReservoir) reset() {
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
@ -153,7 +170,7 @@ func (r *randRes) reset() {
// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
func (r *randRes) advance() {
func (r *FixedSizeReservoir) advance() {
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
@ -180,7 +197,10 @@ func (r *randRes) advance() {
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
}
func (r *randRes) Collect(dest *[]Exemplar) {
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new

View File

@ -14,17 +14,17 @@ import (
"github.com/stretchr/testify/assert"
)
func TestFixedSize(t *testing.T) {
func TestNewFixedSizeReservoir(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) {
return FixedSize(n), n
return NewFixedSizeReservoir(n), n
}))
t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) {
return FixedSize(n), n
return NewFixedSizeReservoir(n), n
}))
}
func TestFixedSizeSamplingCorrectness(t *testing.T) {
func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
intensity := 0.1
sampleSize := 1000
@ -38,13 +38,13 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
// Sort to test position bias.
slices.Sort(data)
r := FixedSize(sampleSize)
r := NewFixedSizeReservoir(sampleSize)
for _, value := range data {
r.Offer(context.Background(), staticTime, NewValue(value), nil)
}
var sum float64
for _, m := range r.(*randRes).store {
for _, m := range r.store {
sum += m.Value.Float64()
}
mean := sum / float64(sampleSize)

View File

@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
"slices"
"sort"
"time"
"go.opentelemetry.io/otel/attribute"
)
// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds will be sorted by this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
slices.Sort(bounds)
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
}
}
var _ Reservoir = &HistogramReservoir{}
// HistogramReservoir is a [Reservoir] that samples the last measurement that
// falls within a histogram bucket. The histogram bucket upper-boundaries are
// define by bounds.
type HistogramReservoir struct {
*storage
// bounds are bucket bounds in ascending order.
bounds []float64
}
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) {
var x float64
switch v.Type() {
case Int64ValueType:
x = float64(v.Int64())
case Float64ValueType:
x = v.Float64()
default:
panic("unknown value type")
}
r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a)
}

View File

@ -8,10 +8,10 @@ import "testing"
func TestHist(t *testing.T) {
bounds := []float64{0, 100}
t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) {
return Histogram(bounds), len(bounds)
return NewHistogramReservoir(bounds), len(bounds)
}))
t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) {
return Histogram(bounds), len(bounds)
return NewHistogramReservoir(bounds), len(bounds)
}))
}

View File

@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"

View File

@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import (
"context"
@ -35,7 +35,7 @@ func (r *storage) Collect(dest *[]Exemplar) {
continue
}
m.Exemplar(&(*dest)[n])
m.exemplar(&(*dest)[n])
n++
}
*dest = (*dest)[:n]
@ -66,8 +66,8 @@ func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []at
}
}
// Exemplar returns m as an [Exemplar].
func (m measurement) Exemplar(dest *Exemplar) {
// exemplar returns m as an [Exemplar].
func (m measurement) exemplar(dest *Exemplar) {
dest.FilteredAttributes = m.FilteredAttributes
dest.Time = m.Time
dest.Value = m.Value

View File

@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
import "math"

View File

@ -8,7 +8,6 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@ -38,8 +37,8 @@ type Builder[N int64 | float64] struct {
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.FilteredReservoir[N]
// DropReservoir reservoir will be used.
ReservoirFunc func() FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
@ -50,12 +49,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}
func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] {
func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
return exemplar.Drop
return DropReservoir
}
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)

View File

@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
@ -73,8 +72,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}
func dropExemplars[N int64 | float64]() exemplar.FilteredReservoir[N] {
return exemplar.Drop[N]()
func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
return DropReservoir[N]()
}
func TestBuilderFilter(t *testing.T) {

View File

@ -1,16 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
// Drop returns a [FilteredReservoir] that drops all measurements it is offered.
func Drop[N int64 | float64]() FilteredReservoir[N] { return &dropRes[N]{} }
// DropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func DropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
type dropRes[N int64 | float64] struct{}
@ -18,6 +19,6 @@ type dropRes[N int64 | float64] struct{}
func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}
// Collect resets dest. No exemplars will ever be returned.
func (r *dropRes[N]) Collect(dest *[]Exemplar) {
func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) {
*dest = (*dest)[:0]
}

View File

@ -1,12 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar
package aggregate
import (
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
func TestDrop(t *testing.T) {
@ -15,9 +17,9 @@ func TestDrop(t *testing.T) {
}
func testDropFiltered[N int64 | float64](t *testing.T) {
r := Drop[N]()
r := DropReservoir[N]()
var dest []Exemplar
var dest []exemplar.Exemplar
r.Collect(&dest)
assert.Empty(t, dest, "non-sampled context should not be offered")

View File

@ -6,7 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
import (
"sync"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

View File

@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

View File

@ -12,7 +12,6 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@ -31,7 +30,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]
count uint64
min N
@ -284,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
@ -307,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32
newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex

View File

@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
type FilteredExemplarReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]exemplar.Exemplar)
}
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct {
filter exemplar.Filter
reservoir exemplar.Reservoir
}
// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
return &filteredExemplarReservoir[N]{
filter: f,
reservoir: r,
}
}
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurement.
f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
}
}
func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) }

View File

@ -11,13 +11,12 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type buckets[N int64 | float64] struct {
attrs attribute.Set
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]
counts []uint64
count uint64
@ -48,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64
newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
@ -109,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,

View File

@ -9,7 +9,6 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@ -17,10 +16,10 @@ import (
type datapoint[N int64 | float64] struct {
attrs attribute.Set
value N
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]
}
func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
@ -33,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReserv
type lastValue[N int64 | float64] struct {
sync.Mutex
newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
@ -115,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in
// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] {
func newPrecomputedLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}

View File

@ -9,25 +9,24 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type sumValue[N int64 | float64] struct {
n N
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]
attrs attribute.Set
}
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[sumValue[N]]
values map[attribute.Distinct]sumValue[N]
}
func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] {
func newValueMap[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *valueMap[N] {
return &valueMap[N]{
newRes: r,
limit: newLimiter[sumValue[N]](limit),
@ -55,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] {
func newSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
@ -144,7 +143,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
// newPrecomputedSum returns an aggregator that summarizes a set of
// observations as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] {
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,

View File

@ -1,49 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
)
// FilteredReservoir wraps a [Reservoir] with a filter.
type FilteredReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]Exemplar)
}
// filteredReservoir handles the pre-sampled exemplar of measurements made.
type filteredReservoir[N int64 | float64] struct {
filter Filter
reservoir Reservoir
}
// NewFilteredReservoir creates a [FilteredReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredReservoir[N int64 | float64](f Filter, r Reservoir) FilteredReservoir[N] {
return &filteredReservoir[N]{
filter: f,
reservoir: r,
}
}
func (f *filteredReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurement.
f.reservoir.Offer(ctx, time.Now(), NewValue(val), attr)
}
}
func (f *filteredReservoir[N]) Collect(dest *[]Exemplar) { f.reservoir.Collect(dest) }

View File

@ -1,46 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"context"
"slices"
"sort"
"time"
"go.opentelemetry.io/otel/attribute"
)
// Histogram returns a [Reservoir] that samples the last measurement that falls
// within a histogram bucket. The histogram bucket upper-boundaries are define
// by bounds.
//
// The passed bounds will be sorted by this function.
func Histogram(bounds []float64) Reservoir {
slices.Sort(bounds)
return &histRes{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
}
}
type histRes struct {
*storage
// bounds are bucket bounds in ascending order.
bounds []float64
}
func (r *histRes) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) {
var x float64
switch v.Type() {
case Int64ValueType:
x = float64(v.Int64())
case Float64ValueType:
x = v.Float64()
default:
panic("unknown value type")
}
r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a)
}