You've already forked opentelemetry-go
mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-11-25 22:41:46 +02:00
Adds attribute filter logic (#3396)
* Adds attribute filter logic * Apply PR feedback * Use updated MP options * Update Changelog and TODO numbers Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
This commit is contained in:
@@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Cumulative metrics from the OpenCensus bridge (`go.opentelemetry.io/otel/bridge/opencensus`) are defined as monotonic sums, instead of non-monotonic. (#3389)
|
||||
- Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398)
|
||||
- Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340)
|
||||
- Reenabled Attribute Filters in the Metric SDK. (#3396)
|
||||
- Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432)
|
||||
|
||||
## [1.11.1/0.33.0] 2022-10-19
|
||||
|
||||
@@ -28,6 +28,7 @@ import (
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||
"go.opentelemetry.io/otel/sdk/metric/view"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
@@ -474,3 +475,356 @@ func TestMetersProvideScope(t *testing.T) {
|
||||
assert.NoError(t, err)
|
||||
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
|
||||
}
|
||||
|
||||
func TestAttributeFilter(t *testing.T) {
|
||||
one := 1.0
|
||||
two := 2.0
|
||||
testcases := []struct {
|
||||
name string
|
||||
register func(t *testing.T, mtr metric.Meter) error
|
||||
wantMetric metricdata.Metrics
|
||||
}{
|
||||
{
|
||||
name: "AsyncFloat64Counter",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.AsyncFloat64().Counter("afcounter")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
})
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "afcounter",
|
||||
Data: metricdata.Sum[float64]{
|
||||
DataPoints: []metricdata.DataPoint[float64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 2.0, // TODO (#3439): This should be 3.0.
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "AsyncFloat64UpDownCounter",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.AsyncFloat64().UpDownCounter("afupdowncounter")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
})
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "afupdowncounter",
|
||||
Data: metricdata.Sum[float64]{
|
||||
DataPoints: []metricdata.DataPoint[float64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 2.0, // TODO (#3439): This should be 3.0.
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "AsyncFloat64Gauge",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.AsyncFloat64().Gauge("afgauge")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
})
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "afgauge",
|
||||
Data: metricdata.Gauge[float64]{
|
||||
DataPoints: []metricdata.DataPoint[float64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 2.0,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "AsyncInt64Counter",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.AsyncInt64().Counter("aicounter")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
})
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "aicounter",
|
||||
Data: metricdata.Sum[int64]{
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 20, // TODO (#3439): This should be 30.
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "AsyncInt64UpDownCounter",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.AsyncInt64().UpDownCounter("aiupdowncounter")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
})
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "aiupdowncounter",
|
||||
Data: metricdata.Sum[int64]{
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 20, // TODO (#3439): This should be 30.
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "AsyncInt64Gauge",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.AsyncInt64().Gauge("aigauge")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
})
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "aigauge",
|
||||
Data: metricdata.Gauge[int64]{
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 20,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SyncFloat64Counter",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.SyncFloat64().Counter("sfcounter")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctr.Add(context.Background(), 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Add(context.Background(), 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
return nil
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "sfcounter",
|
||||
Data: metricdata.Sum[float64]{
|
||||
DataPoints: []metricdata.DataPoint[float64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 3.0,
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SyncFloat64UpDownCounter",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.SyncFloat64().UpDownCounter("sfupdowncounter")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctr.Add(context.Background(), 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Add(context.Background(), 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
return nil
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "sfupdowncounter",
|
||||
Data: metricdata.Sum[float64]{
|
||||
DataPoints: []metricdata.DataPoint[float64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 3.0,
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SyncFloat64Histogram",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.SyncFloat64().Histogram("sfhistogram")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctr.Record(context.Background(), 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Record(context.Background(), 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
return nil
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "sfhistogram",
|
||||
Data: metricdata.Histogram{
|
||||
DataPoints: []metricdata.HistogramDataPoint{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
|
||||
BucketCounts: []uint64{0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
Count: 2,
|
||||
Min: &one,
|
||||
Max: &two,
|
||||
Sum: 3.0,
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SyncInt64Counter",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.SyncInt64().Counter("sicounter")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctr.Add(context.Background(), 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Add(context.Background(), 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
return nil
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "sicounter",
|
||||
Data: metricdata.Sum[int64]{
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 30,
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SyncInt64UpDownCounter",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.SyncInt64().UpDownCounter("siupdowncounter")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctr.Add(context.Background(), 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Add(context.Background(), 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
return nil
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "siupdowncounter",
|
||||
Data: metricdata.Sum[int64]{
|
||||
DataPoints: []metricdata.DataPoint[int64]{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Value: 30,
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
IsMonotonic: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "SyncInt64Histogram",
|
||||
register: func(t *testing.T, mtr metric.Meter) error {
|
||||
ctr, err := mtr.SyncInt64().Histogram("sihistogram")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctr.Record(context.Background(), 1, attribute.String("foo", "bar"), attribute.Int("version", 1))
|
||||
ctr.Record(context.Background(), 2, attribute.String("foo", "bar"), attribute.Int("version", 2))
|
||||
return nil
|
||||
},
|
||||
wantMetric: metricdata.Metrics{
|
||||
Name: "sihistogram",
|
||||
Data: metricdata.Histogram{
|
||||
DataPoints: []metricdata.HistogramDataPoint{
|
||||
{
|
||||
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
|
||||
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
|
||||
BucketCounts: []uint64{0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
|
||||
Count: 2,
|
||||
Min: &one,
|
||||
Max: &two,
|
||||
Sum: 3.0,
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range testcases {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
v, err := view.New(
|
||||
view.MatchInstrumentName("*"),
|
||||
view.WithFilterAttributes(attribute.Key("foo")),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
rdr := NewManualReader()
|
||||
mtr := NewMeterProvider(
|
||||
WithReader(rdr),
|
||||
WithView(v),
|
||||
).Meter("TestAttributeFilter")
|
||||
|
||||
err = tt.register(t, mtr)
|
||||
require.NoError(t, err)
|
||||
|
||||
m, err := rdr.Collect(context.Background())
|
||||
assert.NoError(t, err)
|
||||
|
||||
require.Len(t, m.ScopeMetrics, 1)
|
||||
require.Len(t, m.ScopeMetrics[0].Metrics, 1)
|
||||
|
||||
metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
@@ -203,7 +204,7 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in
|
||||
}
|
||||
matched = true
|
||||
|
||||
agg, err := i.cachedAggregator(inst, instUnit)
|
||||
agg, err := i.cachedAggregator(inst, instUnit, v.AttributeFilter())
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
@@ -223,7 +224,7 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in
|
||||
}
|
||||
|
||||
// Apply implicit default view if no explicit matched.
|
||||
agg, err := i.cachedAggregator(inst, instUnit)
|
||||
agg, err := i.cachedAggregator(inst, instUnit, nil)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
@@ -247,7 +248,7 @@ func (i *inserter[N]) Instrument(inst view.Instrument, instUnit unit.Unit) ([]in
|
||||
//
|
||||
// If the instrument defines an unknown or incompatible aggregation, an error
|
||||
// is returned.
|
||||
func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (internal.Aggregator[N], error) {
|
||||
func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit, filter func(attribute.Set) attribute.Set) (internal.Aggregator[N], error) {
|
||||
switch inst.Aggregation.(type) {
|
||||
case nil, aggregation.Default:
|
||||
// Undefined, nil, means to use the default from the reader.
|
||||
@@ -273,6 +274,10 @@ func (i *inserter[N]) cachedAggregator(inst view.Instrument, u unit.Unit) (inter
|
||||
if agg == nil { // Drop aggregator.
|
||||
return nil, nil
|
||||
}
|
||||
if filter != nil {
|
||||
agg = internal.NewFilter(agg, filter)
|
||||
}
|
||||
|
||||
i.pipeline.addSync(inst.Scope, instrumentSync{
|
||||
name: inst.Name,
|
||||
description: inst.Description,
|
||||
|
||||
Reference in New Issue
Block a user