1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-08-10 22:31:50 +02:00

Move time.Now call into exemplar reservoir to improve performance (#5545)

Part of addressing
https://github.com/open-telemetry/opentelemetry-go/issues/5542.

### Motivation

This removes the `time.Now()` call from filtered-out Exemplars by only
invoking `time.Now()` after the filtering decision is made. This
improvement is especially noticeable for measurements without any
attributes.

```
goos: linux
goarch: amd64
pkg: go.opentelemetry.io/otel/sdk/metric
cpu: AMD EPYC 7B12
                                                                 │   old.txt    │                new.txt                │
                                                                 │    sec/op    │   sec/op     vs base                  │
SyncMeasure/NoView/Int64Counter/Attributes/0-24                    158.20n ± 4%   99.83n ± 1%  -36.90% (p=0.000 n=10)
SyncMeasure/NoView/Int64Counter/Attributes/1-24                     333.3n ± 4%   274.8n ± 1%  -17.55% (p=0.000 n=10)
SyncMeasure/NoView/Int64Counter/Attributes/10-24                    1.640µ ± 1%   1.600µ ± 1%   -2.41% (p=0.000 n=10)
SyncMeasure/NoView/Float64Counter/Attributes/0-24                   159.0n ± 3%   101.3n ± 0%  -36.27% (p=0.000 n=10)
SyncMeasure/NoView/Float64Counter/Attributes/1-24                   340.0n ± 2%   272.0n ± 1%  -20.00% (p=0.000 n=10)
SyncMeasure/NoView/Float64Counter/Attributes/10-24                  1.661µ ± 1%   1.597µ ± 0%   -3.85% (p=0.000 n=10)
SyncMeasure/NoView/Int64UpDownCounter/Attributes/0-24               159.8n ± 1%   103.1n ± 0%  -35.50% (p=0.000 n=10)
SyncMeasure/NoView/Int64UpDownCounter/Attributes/1-24               339.5n ± 1%   273.1n ± 0%  -19.57% (p=0.000 n=10)
SyncMeasure/NoView/Int64UpDownCounter/Attributes/10-24              1.656µ ± 0%   1.589µ ± 0%   -4.05% (p=0.000 n=10)
SyncMeasure/NoView/Float64UpDownCounter/Attributes/0-24             159.3n ± 2%   100.8n ± 0%  -36.74% (p=0.000 n=10)
SyncMeasure/NoView/Float64UpDownCounter/Attributes/1-24             337.9n ± 2%   271.8n ± 1%  -19.55% (p=0.000 n=10)
SyncMeasure/NoView/Float64UpDownCounter/Attributes/10-24            1.657µ ± 0%   1.593µ ± 1%   -3.83% (p=0.000 n=10)
SyncMeasure/NoView/Int64Histogram/Attributes/0-24                  144.65n ± 4%   89.38n ± 0%  -38.21% (p=0.000 n=10)
SyncMeasure/NoView/Int64Histogram/Attributes/1-24                   235.7n ± 2%   183.5n ± 0%  -22.15% (p=0.000 n=10)
SyncMeasure/NoView/Int64Histogram/Attributes/10-24                  900.8n ± 1%   836.8n ± 0%   -7.10% (p=0.000 n=10)
SyncMeasure/NoView/Float64Histogram/Attributes/0-24                145.60n ± 5%   93.48n ± 1%  -35.80% (p=0.000 n=10)
SyncMeasure/NoView/Float64Histogram/Attributes/1-24                 240.9n ± 1%   183.0n ± 0%  -24.06% (p=0.000 n=10)
SyncMeasure/NoView/Float64Histogram/Attributes/10-24                905.6n ± 1%   826.3n ± 0%   -8.76% (p=0.000 n=10)
SyncMeasure/DropView/Int64Counter/Attributes/0-24                   20.33n ± 0%   20.35n ± 0%        ~ (p=0.302 n=10)
SyncMeasure/DropView/Int64Counter/Attributes/1-24                   26.46n ± 0%   26.45n ± 1%        ~ (p=0.868 n=10)
SyncMeasure/DropView/Int64Counter/Attributes/10-24                  26.50n ± 0%   26.47n ± 0%        ~ (p=0.208 n=10)
SyncMeasure/DropView/Float64Counter/Attributes/0-24                 20.34n ± 1%   20.27n ± 0%   -0.34% (p=0.009 n=10)
SyncMeasure/DropView/Float64Counter/Attributes/1-24                 26.55n ± 0%   26.60n ± 1%        ~ (p=0.109 n=10)
SyncMeasure/DropView/Float64Counter/Attributes/10-24                26.59n ± 1%   26.57n ± 1%        ~ (p=0.926 n=10)
SyncMeasure/DropView/Int64UpDownCounter/Attributes/0-24             20.38n ± 1%   20.38n ± 0%        ~ (p=0.725 n=10)
SyncMeasure/DropView/Int64UpDownCounter/Attributes/1-24             26.39n ± 0%   26.44n ± 0%        ~ (p=0.238 n=10)
SyncMeasure/DropView/Int64UpDownCounter/Attributes/10-24            26.52n ± 0%   26.42n ± 0%   -0.36% (p=0.049 n=10)
SyncMeasure/DropView/Float64UpDownCounter/Attributes/0-24           20.30n ± 0%   20.25n ± 0%        ~ (p=0.196 n=10)
SyncMeasure/DropView/Float64UpDownCounter/Attributes/1-24           26.57n ± 0%   26.54n ± 1%        ~ (p=0.540 n=10)
SyncMeasure/DropView/Float64UpDownCounter/Attributes/10-24          26.57n ± 0%   26.51n ± 1%        ~ (p=0.643 n=10)
SyncMeasure/DropView/Int64Histogram/Attributes/0-24                 20.37n ± 0%   20.36n ± 1%        ~ (p=1.000 n=10)
SyncMeasure/DropView/Int64Histogram/Attributes/1-24                 26.41n ± 0%   26.50n ± 0%   +0.32% (p=0.007 n=10)
SyncMeasure/DropView/Int64Histogram/Attributes/10-24                26.44n ± 0%   26.55n ± 1%   +0.42% (p=0.012 n=10)
SyncMeasure/DropView/Float64Histogram/Attributes/0-24               20.30n ± 0%   20.45n ± 0%   +0.74% (p=0.000 n=10)
SyncMeasure/DropView/Float64Histogram/Attributes/1-24               26.52n ± 0%   26.48n ± 0%        ~ (p=0.127 n=10)
SyncMeasure/DropView/Float64Histogram/Attributes/10-24              26.55n ± 0%   26.48n ± 0%   -0.26% (p=0.002 n=10)
SyncMeasure/AttrFilterView/Int64Counter/Attributes/0-24             170.5n ± 2%   110.8n ± 0%  -35.03% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Int64Counter/Attributes/1-24             402.5n ± 1%   331.5n ± 1%  -17.64% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Int64Counter/Attributes/10-24            1.363µ ± 1%   1.281µ ± 1%   -6.02% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Float64Counter/Attributes/0-24           170.6n ± 1%   111.5n ± 1%  -34.64% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Float64Counter/Attributes/1-24           397.1n ± 1%   335.9n ± 0%  -15.41% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Float64Counter/Attributes/10-24          1.371µ ± 1%   1.279µ ± 1%   -6.71% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Int64UpDownCounter/Attributes/0-24       170.1n ± 1%   112.2n ± 0%  -34.09% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Int64UpDownCounter/Attributes/1-24       397.5n ± 1%   330.2n ± 0%  -16.93% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Int64UpDownCounter/Attributes/10-24      1.371µ ± 1%   1.289µ ± 1%   -5.95% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Float64UpDownCounter/Attributes/0-24     171.4n ± 2%   112.9n ± 0%  -34.13% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Float64UpDownCounter/Attributes/1-24     397.0n ± 3%   336.4n ± 0%  -15.24% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Float64UpDownCounter/Attributes/10-24    1.383µ ± 1%   1.305µ ± 1%   -5.61% (p=0.000 n=10)
SyncMeasure/AttrFilterView/Int64Histogram/Attributes/0-24          157.30n ± 2%   98.58n ± 1%  -37.33% (p=0.000 n=6+10)
```

### Changes

* Introduce `exemplar.Filter`, which is a filter function based on the
context. It will not be user-facing, so we can always add other
parameters later if needed.
* Introduce `exemplar.FilteredReservoir`, which is similar to a
reservoir, except it does not receive a timestamp. It gets the current
time after the filter decision has been made. It uses generics to avoid
the call to exemplar.NewValue(), since it is internal-only.
* The `exemplar.Reservoir` is left as-is, so that it can be made public
when exemplars are stable. It still includes a timestamp argument.
* Unit tests are updated to expect a much lower number of calls to
time.Now
* `exemplar.Drop` is now an `exemplar.FilteredReservoir` instead of a
`Reservoir`, since we don't need a Reservoir to store things in if the
measurement is always dropped.

Co-authored-by: Sam Xie <sam@samxie.me>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
This commit is contained in:
David Ashpole
2024-07-01 12:36:11 -04:00
committed by GitHub
parent 1cefb17de7
commit 0485de287e
18 changed files with 264 additions and 232 deletions

View File

@@ -30,6 +30,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Upgrade `go.opentelemetry.io/otel/semconv/v1.25.0` to `go.opentelemetry.io/otel/semconv/v1.26.0` in `go.opentelemetry.io/otel/sdk/resource`. (#5490)
- Upgrade `go.opentelemetry.io/otel/semconv/v1.25.0` to `go.opentelemetry.io/otel/semconv/v1.26.0` in `go.opentelemetry.io/otel/sdk/trace`. (#5490)
- Use non-generic functions in the `Start` method of `"go.opentelemetry.io/otel/sdk/trace".Trace` to reduce memory allocation. (#5497)
- Improve performance of metric instruments in `go.opentelemetry.io/otel/sdk/metric` by removing unnecessary calls to time.Now. (#5545)
### Fixed

View File

@@ -19,67 +19,63 @@ import (
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc(agg Aggregation) func() exemplar.Reservoir {
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredReservoir[N] {
if !x.Exemplars.Enabled() {
return nil
}
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
resF := func() func() exemplar.Reservoir {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.Reservoir {
bounds := cp
return exemplar.Histogram(bounds)
}
}
var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. number of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}
}
return func() exemplar.Reservoir {
return exemplar.FixedSize(n)
}
}
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"
var filter exemplar.Filter
switch os.Getenv(filterEnvKey) {
case "always_on":
return resF()
filter = exemplar.AlwaysOnFilter
case "always_off":
return exemplar.Drop
case "trace_based":
fallthrough
default:
newR := resF()
return func() exemplar.Reservoir {
return exemplar.SampledFilter(newR())
filter = exemplar.SampledFilter
}
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.FilteredReservoir[N] {
bounds := cp
return exemplar.NewFilteredReservoir[N](filter, exemplar.Histogram(bounds))
}
}
var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. number of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}
}
return func() exemplar.FilteredReservoir[N] {
return exemplar.NewFilteredReservoir[N](filter, exemplar.FixedSize(n))
}
}

View File

@@ -39,7 +39,7 @@ type Builder[N int64 | float64] struct {
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.Reservoir
ReservoirFunc func() exemplar.FilteredReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
@@ -50,7 +50,7 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}
func (b Builder[N]) resFunc() func() exemplar.Reservoir {
func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

View File

@@ -73,8 +73,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}
func dropExemplars[N int64 | float64]() exemplar.Reservoir {
return exemplar.Drop()
func dropExemplars[N int64 | float64]() exemplar.FilteredReservoir[N] {
return exemplar.Drop[N]()
}
func TestBuilderFilter(t *testing.T) {

View File

@@ -31,7 +31,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res exemplar.Reservoir
res exemplar.FilteredReservoir[N]
count uint64
min N
@@ -282,7 +282,7 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
@@ -305,7 +305,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int
newRes func() exemplar.Reservoir
newRes func() exemplar.FilteredReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
@@ -319,8 +319,6 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
return
}
t := now()
e.valuesMu.Lock()
defer e.valuesMu.Unlock()
@@ -333,7 +331,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib
e.values[attr.Equivalent()] = v
}
v.record(value)
v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
v.res.Offer(ctx, value, droppedAttr)
}
func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {

View File

@@ -778,7 +778,7 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(1),
Time: y2kPlus(9),
Time: y2kPlus(2),
Count: 7,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
@@ -832,8 +832,8 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(10),
Time: y2kPlus(24),
StartTime: y2kPlus(3),
Time: y2kPlus(4),
Count: 7,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
@@ -850,8 +850,8 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
},
{
Attributes: overflowSet,
StartTime: y2kPlus(10),
Time: y2kPlus(24),
StartTime: y2kPlus(3),
Time: y2kPlus(4),
Count: 6,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),
@@ -905,7 +905,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(9),
Time: y2kPlus(2),
Count: 7,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
@@ -938,7 +938,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(13),
Time: y2kPlus(3),
Count: 10,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
@@ -967,7 +967,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(14),
Time: y2kPlus(4),
Count: 10,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
@@ -1004,7 +1004,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(21),
Time: y2kPlus(5),
Count: 10,
Min: metricdata.NewExtrema[N](-1),
Max: metricdata.NewExtrema[N](16),
@@ -1022,7 +1022,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
{
Attributes: overflowSet,
StartTime: y2kPlus(0),
Time: y2kPlus(21),
Time: y2kPlus(5),
Count: 6,
Min: metricdata.NewExtrema[N](1),
Max: metricdata.NewExtrema[N](16),

View File

@@ -17,7 +17,7 @@ import (
type buckets[N int64 | float64] struct {
attrs attribute.Set
res exemplar.Reservoir
res exemplar.FilteredReservoir[N]
counts []uint64
count uint64
@@ -48,13 +48,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64
newRes func() exemplar.Reservoir
newRes func() exemplar.FilteredReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.Reservoir) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
@@ -80,8 +80,6 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, float64(value))
t := now()
s.valuesMu.Lock()
defer s.valuesMu.Unlock()
@@ -106,12 +104,12 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
if !s.noSum {
b.sum(value)
}
b.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
b.res.Offer(ctx, value, droppedAttr)
}
// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.Reservoir) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,

View File

@@ -80,8 +80,8 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 3, y2kPlus(1), y2kPlus(7)),
c.hPt(fltrBob, 10, 2, y2kPlus(1), y2kPlus(7)),
c.hPt(fltrAlice, 2, 3, y2kPlus(1), y2kPlus(2)),
c.hPt(fltrBob, 10, 2, y2kPlus(1), y2kPlus(2)),
},
},
},
@@ -96,8 +96,8 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 10, 1, y2kPlus(7), y2kPlus(10)),
c.hPt(fltrBob, 3, 1, y2kPlus(7), y2kPlus(10)),
c.hPt(fltrAlice, 10, 1, y2kPlus(2), y2kPlus(3)),
c.hPt(fltrBob, 3, 1, y2kPlus(2), y2kPlus(3)),
},
},
},
@@ -126,9 +126,9 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.DeltaTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 1, 1, y2kPlus(11), y2kPlus(16)),
c.hPt(fltrBob, 1, 1, y2kPlus(11), y2kPlus(16)),
c.hPt(overflowSet, 1, 2, y2kPlus(11), y2kPlus(16)),
c.hPt(fltrAlice, 1, 1, y2kPlus(4), y2kPlus(5)),
c.hPt(fltrBob, 1, 1, y2kPlus(4), y2kPlus(5)),
c.hPt(overflowSet, 1, 2, y2kPlus(4), y2kPlus(5)),
},
},
},
@@ -167,8 +167,8 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 3, y2kPlus(0), y2kPlus(7)),
c.hPt(fltrBob, 10, 2, y2kPlus(0), y2kPlus(7)),
c.hPt(fltrAlice, 2, 3, y2kPlus(0), y2kPlus(2)),
c.hPt(fltrBob, 10, 2, y2kPlus(0), y2kPlus(2)),
},
},
},
@@ -183,8 +183,8 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(10)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(10)),
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(3)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(3)),
},
},
},
@@ -196,8 +196,8 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(11)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(11)),
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(4)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(4)),
},
},
},
@@ -213,9 +213,9 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) {
agg: metricdata.Histogram[N]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.HistogramDataPoint[N]{
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(14)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(14)),
c.hPt(overflowSet, 1, 2, y2kPlus(0), y2kPlus(14)),
c.hPt(fltrAlice, 2, 4, y2kPlus(0), y2kPlus(5)),
c.hPt(fltrBob, 10, 3, y2kPlus(0), y2kPlus(5)),
c.hPt(overflowSet, 1, 2, y2kPlus(0), y2kPlus(5)),
},
},
},

View File

@@ -17,10 +17,10 @@ import (
type datapoint[N int64 | float64] struct {
attrs attribute.Set
value N
res exemplar.Reservoir
res exemplar.FilteredReservoir[N]
}
func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
@@ -33,15 +33,13 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *la
type lastValue[N int64 | float64] struct {
sync.Mutex
newRes func() exemplar.Reservoir
newRes func() exemplar.FilteredReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
}
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
t := now()
s.Lock()
defer s.Unlock()
@@ -53,7 +51,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.
d.attrs = attr
d.value = value
d.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
d.res.Offer(ctx, value, droppedAttr)
s.values[attr.Equivalent()] = d
}
@@ -117,7 +115,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in
// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *precomputedLastValue[N] {
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}

View File

@@ -61,13 +61,13 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(1),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(1),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: -10,
},
},
@@ -88,14 +88,14 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) {
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(8),
Time: y2kPlus(11),
StartTime: y2kPlus(3),
Time: y2kPlus(4),
Value: 10,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(8),
Time: y2kPlus(11),
StartTime: y2kPlus(3),
Time: y2kPlus(4),
Value: 3,
},
},
@@ -115,20 +115,20 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) {
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(11),
Time: y2kPlus(16),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(11),
Time: y2kPlus(16),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(11),
Time: y2kPlus(16),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
},
@@ -165,13 +165,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: -10,
},
},
@@ -187,13 +187,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(8),
Time: y2kPlus(3),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(8),
Time: y2kPlus(3),
Value: -10,
},
},
@@ -211,13 +211,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(11),
Time: y2kPlus(4),
Value: 10,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(11),
Time: y2kPlus(4),
Value: 3,
},
},
@@ -238,19 +238,19 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(16),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(16),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(0),
Time: y2kPlus(16),
Time: y2kPlus(5),
Value: 1,
},
},
@@ -287,13 +287,13 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(1),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(1),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: -10,
},
},
@@ -314,14 +314,14 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) {
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(8),
Time: y2kPlus(11),
StartTime: y2kPlus(3),
Time: y2kPlus(4),
Value: 10,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(8),
Time: y2kPlus(11),
StartTime: y2kPlus(3),
Time: y2kPlus(4),
Value: 3,
},
},
@@ -341,20 +341,20 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) {
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(11),
Time: y2kPlus(16),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(11),
Time: y2kPlus(16),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(11),
Time: y2kPlus(16),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
},
@@ -391,13 +391,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: 2,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: -10,
},
},
@@ -419,13 +419,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(11),
Time: y2kPlus(4),
Value: 10,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(11),
Time: y2kPlus(4),
Value: 3,
},
},
@@ -446,19 +446,19 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(16),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(16),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(0),
Time: y2kPlus(16),
Time: y2kPlus(5),
Value: 1,
},
},

View File

@@ -15,19 +15,19 @@ import (
type sumValue[N int64 | float64] struct {
n N
res exemplar.Reservoir
res exemplar.FilteredReservoir[N]
attrs attribute.Set
}
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
newRes func() exemplar.Reservoir
newRes func() exemplar.FilteredReservoir[N]
limit limiter[sumValue[N]]
values map[attribute.Distinct]sumValue[N]
}
func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir) *valueMap[N] {
func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] {
return &valueMap[N]{
newRes: r,
limit: newLimiter[sumValue[N]](limit),
@@ -36,8 +36,6 @@ func newValueMap[N int64 | float64](limit int, r func() exemplar.Reservoir) *val
}
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
t := now()
s.Lock()
defer s.Unlock()
@@ -49,7 +47,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
v.attrs = attr
v.n += value
v.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr)
v.res.Offer(ctx, value, droppedAttr)
s.values[attr.Equivalent()] = v
}
@@ -57,7 +55,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir) *sum[N] {
func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
@@ -146,7 +144,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
// newPrecomputedSum returns an aggregator that summarizes a set of
// observatrions as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.Reservoir) *precomputedSum[N] {
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,

View File

@@ -75,13 +75,13 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(1),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: 4,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(1),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: -11,
},
},
@@ -101,14 +101,14 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(7),
Time: y2kPlus(10),
StartTime: y2kPlus(2),
Time: y2kPlus(3),
Value: 10,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(7),
Time: y2kPlus(10),
StartTime: y2kPlus(2),
Time: y2kPlus(3),
Value: 3,
},
},
@@ -143,20 +143,20 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(11),
Time: y2kPlus(16),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(11),
Time: y2kPlus(16),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(11),
Time: y2kPlus(16),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 2,
},
},
@@ -203,13 +203,13 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: 4,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: -11,
},
},
@@ -230,13 +230,13 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(10),
Time: y2kPlus(3),
Value: 14,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(10),
Time: y2kPlus(3),
Value: -8,
},
},
@@ -258,19 +258,19 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(13),
Time: y2kPlus(4),
Value: 14,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(13),
Time: y2kPlus(4),
Value: -8,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(0),
Time: y2kPlus(13),
Time: y2kPlus(4),
Value: 2,
},
},
@@ -317,13 +317,13 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(1),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: 4,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(1),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: -11,
},
},
@@ -344,14 +344,14 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(7),
Time: y2kPlus(11),
StartTime: y2kPlus(2),
Time: y2kPlus(3),
Value: 7,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(7),
Time: y2kPlus(11),
StartTime: y2kPlus(2),
Time: y2kPlus(3),
Value: 14,
},
},
@@ -386,20 +386,20 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) {
DataPoints: []metricdata.DataPoint[N]{
{
Attributes: fltrAlice,
StartTime: y2kPlus(12),
Time: y2kPlus(17),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(12),
Time: y2kPlus(17),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(12),
Time: y2kPlus(17),
StartTime: y2kPlus(4),
Time: y2kPlus(5),
Value: 2,
},
},
@@ -446,13 +446,13 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: 4,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(7),
Time: y2kPlus(2),
Value: -11,
},
},
@@ -474,13 +474,13 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(11),
Time: y2kPlus(3),
Value: 11,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(11),
Time: y2kPlus(3),
Value: 3,
},
},
@@ -516,19 +516,19 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) {
{
Attributes: fltrAlice,
StartTime: y2kPlus(0),
Time: y2kPlus(17),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: fltrBob,
StartTime: y2kPlus(0),
Time: y2kPlus(17),
Time: y2kPlus(5),
Value: 1,
},
{
Attributes: overflowSet,
StartTime: y2kPlus(0),
Time: y2kPlus(17),
Time: y2kPlus(5),
Value: 2,
},
},

View File

@@ -5,20 +5,19 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exempla
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
)
// Drop returns a [Reservoir] that drops all measurements it is offered.
func Drop() Reservoir { return &dropRes{} }
// Drop returns a [FilteredReservoir] that drops all measurements it is offered.
func Drop[N int64 | float64]() FilteredReservoir[N] { return &dropRes[N]{} }
type dropRes struct{}
type dropRes[N int64 | float64] struct{}
// Offer does nothing, all measurements offered will be dropped.
func (r *dropRes) Offer(context.Context, time.Time, Value, []attribute.KeyValue) {}
func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}
// Collect resets dest. No exemplars will ever be returned.
func (r *dropRes) Collect(dest *[]Exemplar) {
func (r *dropRes[N]) Collect(dest *[]Exemplar) {
*dest = (*dest)[:0]
}

View File

@@ -5,14 +5,20 @@ package exemplar
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestDrop(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) {
return Drop(), 0
}))
t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) {
return Drop(), 0
}))
t.Run("Int64", testDropFiltered[int64])
t.Run("Float64", testDropFiltered[float64])
}
func testDropFiltered[N int64 | float64](t *testing.T) {
r := Drop[N]()
var dest []Exemplar
r.Collect(&dest)
assert.Len(t, dest, 0, "non-sampled context should not be offered")
}

View File

@@ -5,25 +5,25 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exempla
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// SampledFilter returns a [Reservoir] wrapping r that will only offer measurements
// to r if the passed context associated with the measurement contains a sampled
// Filter determines if a measurement should be offered.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
type Filter func(context.Context) bool
// SampledFilter is a [Filter] that will only offer measurements
// if the passed context associated with the measurement contains a sampled
// [go.opentelemetry.io/otel/trace.SpanContext].
func SampledFilter(r Reservoir) Reservoir {
return filtered{Reservoir: r}
func SampledFilter(ctx context.Context) bool {
return trace.SpanContextFromContext(ctx).IsSampled()
}
type filtered struct {
Reservoir
}
func (f filtered) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
if trace.SpanContextFromContext(ctx).IsSampled() {
f.Reservoir.Offer(ctx, t, n, a)
}
// AlwaysOnFilter is a [Filter] that always offers measurements.
func AlwaysOnFilter(ctx context.Context) bool {
return true
}

View File

@@ -6,11 +6,9 @@ package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exempla
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
@@ -20,18 +18,10 @@ func TestSampledFilter(t *testing.T) {
}
func testSampledFiltered[N int64 | float64](t *testing.T) {
under := &res{}
r := SampledFilter(under)
ctx := context.Background()
r.Offer(ctx, staticTime, NewValue(N(0)), nil)
assert.False(t, under.OfferCalled, "underlying Reservoir Offer called")
r.Offer(sample(ctx), staticTime, NewValue(N(0)), nil)
assert.True(t, under.OfferCalled, "underlying Reservoir Offer not called")
r.Collect(nil)
assert.True(t, under.CollectCalled, "underlying Reservoir Collect not called")
assert.False(t, SampledFilter(ctx), "non-sampled context should not be offered")
assert.True(t, SampledFilter(sample(ctx)), "sampled context should be offered")
}
func sample(parent context.Context) context.Context {
@@ -43,15 +33,14 @@ func sample(parent context.Context) context.Context {
return trace.ContextWithSpanContext(parent, sc)
}
type res struct {
OfferCalled bool
CollectCalled bool
func TestAlwaysOnFilter(t *testing.T) {
t.Run("Int64", testAlwaysOnFiltered[int64])
t.Run("Float64", testAlwaysOnFiltered[float64])
}
func (r *res) Offer(context.Context, time.Time, Value, []attribute.KeyValue) {
r.OfferCalled = true
}
func testAlwaysOnFiltered[N int64 | float64](t *testing.T) {
ctx := context.Background()
func (r *res) Collect(*[]Exemplar) {
r.CollectCalled = true
assert.True(t, AlwaysOnFilter(ctx), "non-sampled context should not be offered")
assert.True(t, AlwaysOnFilter(sample(ctx)), "sampled context should be offered")
}

View File

@@ -0,0 +1,49 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
)
// FilteredReservoir wraps a [Reservoir] with a filter.
type FilteredReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]Exemplar)
}
// filteredReservoir handles the pre-sampled exemplar of measurements made.
type filteredReservoir[N int64 | float64] struct {
filter Filter
reservoir Reservoir
}
// NewFilteredReservoir creates a [FilteredReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredReservoir[N int64 | float64](f Filter, r Reservoir) FilteredReservoir[N] {
return &filteredReservoir[N]{
filter: f,
reservoir: r,
}
}
func (f *filteredReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurment.
f.reservoir.Offer(ctx, time.Now(), NewValue(val), attr)
}
}
func (f *filteredReservoir[N]) Collect(dest *[]Exemplar) { f.reservoir.Collect(dest) }

View File

@@ -349,7 +349,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
ReservoirFunc: reservoirFunc(stream.Aggregation),
ReservoirFunc: reservoirFunc[N](stream.Aggregation),
}
b.Filter = stream.AttributeFilter
// A value less than or equal to zero will disable the aggregation