1
0
mirror of https://github.com/open-telemetry/opentelemetry-go.git synced 2025-11-25 22:41:46 +02:00

Replace Stream.AttributeFilter with AllowAttributeKeys (#4288)

* Replace Stream AttributeFilter with AttributeKeys

* Rename Stream field AttributeKeys to AllowAttributeKeys

Ensure forward compatibility if a deny-list of attribute keys is ever
added.

* Add change to changelog

* Update PR number in changelog

* Update CHANGELOG.md

Co-authored-by: Damien Mathieu <42@dmathieu.com>

---------

Co-authored-by: Damien Mathieu <42@dmathieu.com>
This commit is contained in:
Tyler Yahn
2023-07-08 08:01:47 -07:00
committed by GitHub
parent c404a30b96
commit 1633c74aea
7 changed files with 51 additions and 37 deletions

View File

@@ -25,6 +25,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Count the Collect time in the PeriodicReader timeout. (#4221) - Count the Collect time in the PeriodicReader timeout. (#4221)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272) - `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272) - `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- ⚠️ Metrics SDK Breaking ⚠️ : the `AttributeFilter` fields of the `Stream` from `go.opentelemetry.io/otel/sdk/metric` is replaced by the `AttributeKeys` field.
The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view.
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
### Fixed ### Fixed

View File

@@ -43,9 +43,7 @@ var viewBenchmarks = []struct {
"AttrFilterView", "AttrFilterView",
[]View{NewView( []View{NewView(
Instrument{Name: "*"}, Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool { Stream{AllowAttributeKeys: []attribute.Key{"K"}},
return kv.Key == attribute.Key("K")
}},
)}, )},
}, },
} }

View File

@@ -145,8 +145,31 @@ type Stream struct {
Unit string Unit string
// Aggregation the stream uses for an instrument. // Aggregation the stream uses for an instrument.
Aggregation aggregation.Aggregation Aggregation aggregation.Aggregation
// AttributeFilter applied to all attributes recorded for an instrument. // AllowAttributeKeys are an allow-list of attribute keys that will be
AttributeFilter attribute.Filter // preserved for the stream. Any attribute recorded for the stream with a
// key not in this slice will be dropped.
//
// If this slice is empty, all attributes will be kept.
AllowAttributeKeys []attribute.Key
}
// attributeFilter returns an attribute.Filter that only allows attributes
// with keys in s.AttributeKeys.
//
// If s.AttributeKeys is empty an accept-all filter is returned.
func (s Stream) attributeFilter() attribute.Filter {
if len(s.AllowAttributeKeys) <= 0 {
return func(kv attribute.KeyValue) bool { return true }
}
allowed := make(map[attribute.Key]struct{})
for _, k := range s.AllowAttributeKeys {
allowed[k] = struct{}{}
}
return func(kv attribute.KeyValue) bool {
_, ok := allowed[kv.Key]
return ok
}
} }
// streamID are the identifying properties of a stream. // streamID are the identifying properties of a stream.

View File

@@ -1516,9 +1516,7 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
WithReader(rdr), WithReader(rdr),
WithView(NewView( WithView(NewView(
Instrument{Name: "*"}, Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool { Stream{AllowAttributeKeys: []attribute.Key{"foo"}},
return kv.Key == attribute.Key("foo")
}},
)), )),
).Meter("TestAttributeFilter") ).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr)) require.NoError(t, tt.register(t, mtr))
@@ -1565,11 +1563,8 @@ func TestObservableExample(t *testing.T) {
selector := func(InstrumentKind) metricdata.Temporality { return temp } selector := func(InstrumentKind) metricdata.Temporality { return temp }
reader := NewManualReader(WithTemporalitySelector(selector)) reader := NewManualReader(WithTemporalitySelector(selector))
noopFilter := func(kv attribute.KeyValue) bool { return true } noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName})
noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: noopFilter}) filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AllowAttributeKeys: []attribute.Key{"pid"}})
filter := func(kv attribute.KeyValue) bool { return kv.Key != "tid" }
filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter})
mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered)) mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered))
meter := mp.Meter(scopeName) meter := mp.Meter(scopeName)

View File

@@ -332,8 +332,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
if agg == nil { // Drop aggregator. if agg == nil { // Drop aggregator.
return aggVal[N]{nil, nil} return aggVal[N]{nil, nil}
} }
if stream.AttributeFilter != nil { if len(stream.AllowAttributeKeys) > 0 {
agg = aggregate.NewFilter(agg, stream.AttributeFilter) agg = aggregate.NewFilter(agg, stream.attributeFilter())
} }
i.pipeline.addSync(scope, instrumentSync{ i.pipeline.addSync(scope, instrumentSync{

View File

@@ -103,11 +103,11 @@ func NewView(criteria Instrument, mask Stream) View {
return func(i Instrument) (Stream, bool) { return func(i Instrument) (Stream, bool) {
if matchFunc(i) { if matchFunc(i) {
return Stream{ return Stream{
Name: nonZero(mask.Name, i.Name), Name: nonZero(mask.Name, i.Name),
Description: nonZero(mask.Description, i.Description), Description: nonZero(mask.Description, i.Description),
Unit: nonZero(mask.Unit, i.Unit), Unit: nonZero(mask.Unit, i.Unit),
Aggregation: agg, Aggregation: agg,
AttributeFilter: mask.AttributeFilter, AllowAttributeKeys: mask.AllowAttributeKeys,
}, true }, true
} }
return Stream{}, false return Stream{}, false

View File

@@ -404,6 +404,18 @@ func TestNewViewReplace(t *testing.T) {
} }
}, },
}, },
{
name: "AttributeKeys",
mask: Stream{AllowAttributeKeys: []attribute.Key{"test"}},
want: func(i Instrument) Stream {
return Stream{
Name: i.Name,
Description: i.Description,
Unit: i.Unit,
AllowAttributeKeys: []attribute.Key{"test"},
}
},
},
{ {
name: "Complete", name: "Complete",
mask: Stream{ mask: Stream{
@@ -430,23 +442,6 @@ func TestNewViewReplace(t *testing.T) {
assert.Equal(t, test.want(completeIP), got) assert.Equal(t, test.want(completeIP), got)
}) })
} }
// Go does not allow for the comparison of function values, even their
// addresses. Therefore, the AttributeFilter field needs an alternative
// testing strategy.
t.Run("AttributeFilter", func(t *testing.T) {
allowed := attribute.String("key", "val")
filter := func(kv attribute.KeyValue) bool {
return kv == allowed
}
mask := Stream{AttributeFilter: filter}
got, match := NewView(completeIP, mask)(completeIP)
require.True(t, match, "view did not match exact criteria")
require.NotNil(t, got.AttributeFilter, "AttributeFilter not set")
assert.True(t, got.AttributeFilter(allowed), "wrong AttributeFilter")
other := attribute.String("key", "other val")
assert.False(t, got.AttributeFilter(other), "wrong AttributeFilter")
})
} }
type badAgg struct { type badAgg struct {