1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-11-06 09:09:44 +02:00

Replace filter aggregator with direct filter on measure (#4342)

* Replace filter aggregator with direct filter on measure

Part of #4220

Instead of using an aggregator to filter measured attributes, directly
filter the attributes in the constructed Measure function the Builder
creates.

Include unit and integration testing of new filtering.

* Update sdk/metric/internal/aggregate/aggregate.go

Co-authored-by: David Ashpole <dashpole@google.com>

---------

Co-authored-by: David Ashpole <dashpole@google.com>
This commit is contained in:
Tyler Yahn
2023-07-20 13:53:50 -07:00
committed by GitHub
parent a5c82bf26e
commit a37cb0504a
5 changed files with 82 additions and 258 deletions

View File

@@ -44,7 +44,11 @@ type Builder[N int64 | float64] struct {
func (b Builder[N]) input(agg aggregator[N]) Measure[N] {
if b.Filter != nil {
agg = newFilter[N](agg, b.Filter)
fltr := b.Filter // Copy to make it immutable after assignment.
return func(_ context.Context, n N, a attribute.Set) {
fAttr, _ := a.Filter(fltr)
agg.Aggregate(n, fAttr)
}
}
return func(_ context.Context, n N, a attribute.Set) {
agg.Aggregate(n, a)

View File

@@ -0,0 +1,75 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/attribute"
)
var (
keyUser = "user"
userAlice = attribute.String(keyUser, "Alice")
adminTrue = attribute.Bool("admin", true)
alice = attribute.NewSet(userAlice, adminTrue)
// Filtered.
attrFltr = func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key(keyUser)
}
fltrAlice = attribute.NewSet(userAlice)
)
type inputTester[N int64 | float64] struct {
aggregator[N]
value N
attr attribute.Set
}
func (it *inputTester[N]) Aggregate(v N, a attribute.Set) { it.value, it.attr = v, a }
func TestBuilderInput(t *testing.T) {
t.Run("Int64", testBuilderInput[int64]())
t.Run("Float64", testBuilderInput[float64]())
}
func testBuilderInput[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
value, attr := N(1), alice
run := func(b Builder[N], wantA attribute.Set) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
it := &inputTester[N]{}
meas := b.input(it)
meas(context.Background(), value, attr)
assert.Equal(t, value, it.value, "measured incorrect value")
assert.Equal(t, wantA, it.attr, "measured incorrect attributes")
}
}
t.Run("NoFilter", run(Builder[N]{}, attr))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice))
}
}

View File

@@ -34,9 +34,8 @@ const (
)
var (
alice = attribute.NewSet(attribute.String("user", "alice"), attribute.Bool("admin", true))
bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))
bob = attribute.NewSet(attribute.String(keyUser, "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String(keyUser, "carol"), attribute.Bool("admin", false))
// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)

View File

@@ -1,58 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// newFilter returns an Aggregator that wraps an agg with an attribute
// filtering function. Both pre-computed non-pre-computed Aggregators can be
// passed for agg. An appropriate Aggregator will be returned for the detected
// type.
func newFilter[N int64 | float64](agg aggregator[N], fn attribute.Filter) aggregator[N] {
if fn == nil {
return agg
}
return &filter[N]{
filter: fn,
aggregator: agg,
}
}
// filter wraps an aggregator with an attribute filter. All recorded
// measurements will have their attributes filtered before they are passed to
// the underlying aggregator's Aggregate method.
//
// This should not be used to wrap a pre-computed Aggregator. Use a
// precomputedFilter instead.
type filter[N int64 | float64] struct {
filter attribute.Filter
aggregator aggregator[N]
}
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) {
fAttr, _ := attr.Filter(f.filter)
f.aggregator.Aggregate(measurement, fAttr)
}
// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (f *filter[N]) Aggregation() metricdata.Aggregation {
return f.aggregator.Aggregation()
}

View File

@@ -1,196 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// This is an aggregator that has a stable output, used for testing. It does not
// follow any spec prescribed aggregation.
type testStableAggregator[N int64 | float64] struct {
sync.Mutex
values []metricdata.DataPoint[N]
}
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
func (a *testStableAggregator[N]) Aggregate(measurement N, attr attribute.Set) {
a.Lock()
defer a.Unlock()
a.values = append(a.values, metricdata.DataPoint[N]{
Attributes: attr,
Value: measurement,
})
}
// Aggregation returns an Aggregation, for all the aggregated
// measurements made and ends an aggregation cycle.
func (a *testStableAggregator[N]) Aggregation() metricdata.Aggregation {
return metricdata.Gauge[N]{
DataPoints: a.values,
}
}
func testNewFilterNoFilter[N int64 | float64](t *testing.T, agg aggregator[N]) {
filter := newFilter(agg, nil)
assert.Equal(t, agg, filter)
}
func testNewFilter[N int64 | float64](t *testing.T, agg aggregator[N]) {
f := newFilter(agg, testAttributeFilter)
require.IsType(t, &filter[N]{}, f)
filt := f.(*filter[N])
assert.Equal(t, agg, filt.aggregator)
}
var testAttributeFilter = func(kv attribute.KeyValue) bool {
return kv.Key == "power-level"
}
func TestNewFilter(t *testing.T) {
t.Run("int64", func(t *testing.T) {
agg := &testStableAggregator[int64]{}
testNewFilterNoFilter[int64](t, agg)
testNewFilter[int64](t, agg)
})
t.Run("float64", func(t *testing.T) {
agg := &testStableAggregator[float64]{}
testNewFilterNoFilter[float64](t, agg)
testNewFilter[float64](t, agg)
})
}
func testDataPoint[N int64 | float64](attr attribute.Set) metricdata.DataPoint[N] {
return metricdata.DataPoint[N]{
Attributes: attr,
Value: 1,
}
}
func testFilterAggregate[N int64 | float64](t *testing.T) {
testCases := []struct {
name string
inputAttr []attribute.Set
output []metricdata.DataPoint[N]
}{
{
name: "Will filter all out",
inputAttr: []attribute.Set{
attribute.NewSet(
attribute.String("foo", "bar"),
attribute.Float64("lifeUniverseEverything", 42.0),
),
},
output: []metricdata.DataPoint[N]{
testDataPoint[N](*attribute.EmptySet()),
},
},
{
name: "Will keep appropriate attributes",
inputAttr: []attribute.Set{
attribute.NewSet(
attribute.String("foo", "bar"),
attribute.Int("power-level", 9001),
attribute.Float64("lifeUniverseEverything", 42.0),
),
attribute.NewSet(
attribute.String("foo", "bar"),
attribute.Int("power-level", 9001),
),
},
output: []metricdata.DataPoint[N]{
// A real Aggregator will combine these, the testAggregator doesn't for list stability.
testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))),
testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))),
},
},
{
name: "Will combine Aggregations",
inputAttr: []attribute.Set{
attribute.NewSet(
attribute.String("foo", "bar"),
),
attribute.NewSet(
attribute.Float64("lifeUniverseEverything", 42.0),
),
},
output: []metricdata.DataPoint[N]{
// A real Aggregator will combine these, the testAggregator doesn't for list stability.
testDataPoint[N](*attribute.EmptySet()),
testDataPoint[N](*attribute.EmptySet()),
},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
f := newFilter[N](&testStableAggregator[N]{}, testAttributeFilter)
for _, set := range tt.inputAttr {
f.Aggregate(1, set)
}
out := f.Aggregation().(metricdata.Gauge[N])
assert.Equal(t, tt.output, out.DataPoints)
})
}
}
func TestFilterAggregate(t *testing.T) {
t.Run("int64", func(t *testing.T) {
testFilterAggregate[int64](t)
})
t.Run("float64", func(t *testing.T) {
testFilterAggregate[float64](t)
})
}
func testFilterConcurrent[N int64 | float64](t *testing.T) {
f := newFilter[N](&testStableAggregator[N]{}, testAttributeFilter)
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
f.Aggregate(1, attribute.NewSet(
attribute.String("foo", "bar"),
))
wg.Done()
}()
go func() {
f.Aggregate(1, attribute.NewSet(
attribute.Int("power-level", 9001),
))
wg.Done()
}()
wg.Wait()
}
func TestFilterConcurrent(t *testing.T) {
t.Run("int64", func(t *testing.T) {
testFilterConcurrent[int64](t)
})
t.Run("float64", func(t *testing.T) {
testFilterConcurrent[float64](t)
})
}