mirror of
https://github.com/open-telemetry/opentelemetry-go.git
synced 2025-04-11 11:21:59 +02:00
Replace use of old term label with attribute (#2790)
* Replace use of old term label with attribute The specification has unified on the term attribute to describe key-value pairs. There exist still many hold-overs of the use of the term label. This updates those uses or deprecates exported types and functions in favor of renamed forms. * fix infinite recursion * Remove backticks from attribute set docs * Remove LabelFilterSelector entirely * Remove Metadata.Labels instead of deprecate * Update changelog with public changes * Revert OC err msg
This commit is contained in:
parent
1884de2b4b
commit
a8ea3dbb46
18
CHANGELOG.md
18
CHANGELOG.md
@ -14,6 +14,24 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
|
||||
- Resolve supply-chain failure for the markdown-link-checker GitHub action by calling the CLI directly. (#2834)
|
||||
- Remove import of `testing` package in non-tests builds. (#2786)
|
||||
|
||||
### Changed
|
||||
|
||||
- The `WithLabelEncoder` option from the `go.opentelemetry.io/otel/exporters/stdout/stdoutmetric` package is renamed to `WithAttributeEncoder`. (#2790)
|
||||
- The `Batch.Labels` field from the `go.opentelemetry.io/otel/sdk/metric/metrictest` package is renamed to `Batch.Attributes`. (#2790)
|
||||
- The `LabelFilterSelector` interface from `go.opentelemetry.io/otel/sdk/metric/processor/reducer` is renamed to `AttributeFilterSelector`.
|
||||
The method included in the renamed interface also changed from `LabelFilterFor` to `AttributeFilterFor`. (#2790)
|
||||
- The `Metadata.Labels` method from the `go.opentelemetry.io/otel/sdk/metric/export` package is renamed to `Metadata.Attributes`.
|
||||
Consequentially, the `Record` type from the same package also has had the embedded method renamed. (#2790)
|
||||
|
||||
### Deprecated
|
||||
|
||||
- The `Iterator.Label` method in the `go.opentelemetry.io/otel/attribute` package is deprecated.
|
||||
Use the equivalent `Iterator.Attribute` method instead. (#2790)
|
||||
- The `Iterator.IndexedLabel` method in the `go.opentelemetry.io/otel/attribute` package is deprecated.
|
||||
Use the equivalent `Iterator.IndexedAttribute` method instead. (#2790)
|
||||
- The `MergeIterator.Label` method in the `go.opentelemetry.io/otel/attribute` package is deprecated.
|
||||
Use the equivalent `MergeIterator.Attribute` method instead. (#2790)
|
||||
|
||||
## [0.29.0] - 2022-04-11
|
||||
|
||||
### Added
|
||||
|
@ -21,19 +21,17 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
// Encoder is a mechanism for serializing a label set into a
|
||||
// specific string representation that supports caching, to
|
||||
// avoid repeated serialization. An example could be an
|
||||
// exporter encoding the label set into a wire representation.
|
||||
// Encoder is a mechanism for serializing an attribute set into a specific
|
||||
// string representation that supports caching, to avoid repeated
|
||||
// serialization. An example could be an exporter encoding the attribute
|
||||
// set into a wire representation.
|
||||
Encoder interface {
|
||||
// Encode returns the serialized encoding of the label
|
||||
// set using its Iterator. This result may be cached
|
||||
// by a attribute.Set.
|
||||
// Encode returns the serialized encoding of the attribute set using
|
||||
// its Iterator. This result may be cached by a attribute.Set.
|
||||
Encode(iterator Iterator) string
|
||||
|
||||
// ID returns a value that is unique for each class of
|
||||
// label encoder. Label encoders allocate these using
|
||||
// `NewEncoderID`.
|
||||
// ID returns a value that is unique for each class of attribute
|
||||
// encoder. Attribute encoders allocate these using `NewEncoderID`.
|
||||
ID() EncoderID
|
||||
}
|
||||
|
||||
@ -43,54 +41,53 @@ type (
|
||||
value uint64
|
||||
}
|
||||
|
||||
// defaultLabelEncoder uses a sync.Pool of buffers to reduce
|
||||
// the number of allocations used in encoding labels. This
|
||||
// implementation encodes a comma-separated list of key=value,
|
||||
// with '/'-escaping of '=', ',', and '\'.
|
||||
defaultLabelEncoder struct {
|
||||
// pool is a pool of labelset builders. The buffers in this
|
||||
// pool grow to a size that most label encodings will not
|
||||
// allocate new memory.
|
||||
// defaultAttrEncoder uses a sync.Pool of buffers to reduce the number of
|
||||
// allocations used in encoding attributes. This implementation encodes a
|
||||
// comma-separated list of key=value, with '/'-escaping of '=', ',', and
|
||||
// '\'.
|
||||
defaultAttrEncoder struct {
|
||||
// pool is a pool of attribute set builders. The buffers in this pool
|
||||
// grow to a size that most attribute encodings will not allocate new
|
||||
// memory.
|
||||
pool sync.Pool // *bytes.Buffer
|
||||
}
|
||||
)
|
||||
|
||||
// escapeChar is used to ensure uniqueness of the label encoding where
|
||||
// keys or values contain either '=' or ','. Since there is no parser
|
||||
// needed for this encoding and its only requirement is to be unique,
|
||||
// this choice is arbitrary. Users will see these in some exporters
|
||||
// (e.g., stdout), so the backslash ('\') is used as a conventional choice.
|
||||
// escapeChar is used to ensure uniqueness of the attribute encoding where
|
||||
// keys or values contain either '=' or ','. Since there is no parser needed
|
||||
// for this encoding and its only requirement is to be unique, this choice is
|
||||
// arbitrary. Users will see these in some exporters (e.g., stdout), so the
|
||||
// backslash ('\') is used as a conventional choice.
|
||||
const escapeChar = '\\'
|
||||
|
||||
var (
|
||||
_ Encoder = &defaultLabelEncoder{}
|
||||
_ Encoder = &defaultAttrEncoder{}
|
||||
|
||||
// encoderIDCounter is for generating IDs for other label
|
||||
// encoders.
|
||||
// encoderIDCounter is for generating IDs for other attribute encoders.
|
||||
encoderIDCounter uint64
|
||||
|
||||
defaultEncoderOnce sync.Once
|
||||
defaultEncoderID = NewEncoderID()
|
||||
defaultEncoderInstance *defaultLabelEncoder
|
||||
defaultEncoderInstance *defaultAttrEncoder
|
||||
)
|
||||
|
||||
// NewEncoderID returns a unique label encoder ID. It should be
|
||||
// called once per each type of label encoder. Preferably in init() or
|
||||
// in var definition.
|
||||
// NewEncoderID returns a unique attribute encoder ID. It should be called
|
||||
// once per each type of attribute encoder. Preferably in init() or in var
|
||||
// definition.
|
||||
func NewEncoderID() EncoderID {
|
||||
return EncoderID{value: atomic.AddUint64(&encoderIDCounter, 1)}
|
||||
}
|
||||
|
||||
// DefaultEncoder returns a label encoder that encodes labels
|
||||
// in such a way that each escaped label's key is followed by an equal
|
||||
// sign and then by an escaped label's value. All key-value pairs are
|
||||
// separated by a comma.
|
||||
// DefaultEncoder returns an attribute encoder that encodes attributes in such
|
||||
// a way that each escaped attribute's key is followed by an equal sign and
|
||||
// then by an escaped attribute's value. All key-value pairs are separated by
|
||||
// a comma.
|
||||
//
|
||||
// Escaping is done by prepending a backslash before either a
|
||||
// backslash, equal sign or a comma.
|
||||
// Escaping is done by prepending a backslash before either a backslash, equal
|
||||
// sign or a comma.
|
||||
func DefaultEncoder() Encoder {
|
||||
defaultEncoderOnce.Do(func() {
|
||||
defaultEncoderInstance = &defaultLabelEncoder{
|
||||
defaultEncoderInstance = &defaultAttrEncoder{
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &bytes.Buffer{}
|
||||
@ -101,15 +98,14 @@ func DefaultEncoder() Encoder {
|
||||
return defaultEncoderInstance
|
||||
}
|
||||
|
||||
// Encode is a part of an implementation of the LabelEncoder
|
||||
// interface.
|
||||
func (d *defaultLabelEncoder) Encode(iter Iterator) string {
|
||||
// Encode is a part of an implementation of the AttributeEncoder interface.
|
||||
func (d *defaultAttrEncoder) Encode(iter Iterator) string {
|
||||
buf := d.pool.Get().(*bytes.Buffer)
|
||||
defer d.pool.Put(buf)
|
||||
buf.Reset()
|
||||
|
||||
for iter.Next() {
|
||||
i, keyValue := iter.IndexedLabel()
|
||||
i, keyValue := iter.IndexedAttribute()
|
||||
if i > 0 {
|
||||
_, _ = buf.WriteRune(',')
|
||||
}
|
||||
@ -126,8 +122,8 @@ func (d *defaultLabelEncoder) Encode(iter Iterator) string {
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// ID is a part of an implementation of the LabelEncoder interface.
|
||||
func (*defaultLabelEncoder) ID() EncoderID {
|
||||
// ID is a part of an implementation of the AttributeEncoder interface.
|
||||
func (*defaultAttrEncoder) ID() EncoderID {
|
||||
return defaultEncoderID
|
||||
}
|
||||
|
||||
|
@ -14,16 +14,16 @@
|
||||
|
||||
package attribute // import "go.opentelemetry.io/otel/attribute"
|
||||
|
||||
// Iterator allows iterating over the set of labels in order,
|
||||
// sorted by key.
|
||||
// Iterator allows iterating over the set of attributes in order, sorted by
|
||||
// key.
|
||||
type Iterator struct {
|
||||
storage *Set
|
||||
idx int
|
||||
}
|
||||
|
||||
// MergeIterator supports iterating over two sets of labels while
|
||||
// eliminating duplicate values from the combined set. The first
|
||||
// iterator value takes precedence.
|
||||
// MergeIterator supports iterating over two sets of attributes while
|
||||
// eliminating duplicate values from the combined set. The first iterator
|
||||
// value takes precedence.
|
||||
type MergeIterator struct {
|
||||
one oneIterator
|
||||
two oneIterator
|
||||
@ -31,13 +31,13 @@ type MergeIterator struct {
|
||||
}
|
||||
|
||||
type oneIterator struct {
|
||||
iter Iterator
|
||||
done bool
|
||||
label KeyValue
|
||||
iter Iterator
|
||||
done bool
|
||||
attr KeyValue
|
||||
}
|
||||
|
||||
// Next moves the iterator to the next position. Returns false if there
|
||||
// are no more labels.
|
||||
// Next moves the iterator to the next position. Returns false if there are no
|
||||
// more attributes.
|
||||
func (i *Iterator) Next() bool {
|
||||
i.idx++
|
||||
return i.idx < i.Len()
|
||||
@ -45,30 +45,41 @@ func (i *Iterator) Next() bool {
|
||||
|
||||
// Label returns current KeyValue. Must be called only after Next returns
|
||||
// true.
|
||||
//
|
||||
// Deprecated: Use Attribute instead.
|
||||
func (i *Iterator) Label() KeyValue {
|
||||
return i.Attribute()
|
||||
}
|
||||
|
||||
// Attribute returns the current KeyValue of the Iterator. It must be called
|
||||
// only after Next returns true.
|
||||
func (i *Iterator) Attribute() KeyValue {
|
||||
kv, _ := i.storage.Get(i.idx)
|
||||
return kv
|
||||
}
|
||||
|
||||
// Attribute is a synonym for Label().
|
||||
func (i *Iterator) Attribute() KeyValue {
|
||||
return i.Label()
|
||||
}
|
||||
|
||||
// IndexedLabel returns current index and attribute. Must be called only
|
||||
// after Next returns true.
|
||||
//
|
||||
// Deprecated: Use IndexedAttribute instead.
|
||||
func (i *Iterator) IndexedLabel() (int, KeyValue) {
|
||||
return i.idx, i.Label()
|
||||
return i.idx, i.Attribute()
|
||||
}
|
||||
|
||||
// Len returns a number of labels in the iterator's `*Set`.
|
||||
// IndexedAttribute returns current index and attribute. Must be called only
|
||||
// after Next returns true.
|
||||
func (i *Iterator) IndexedAttribute() (int, KeyValue) {
|
||||
return i.idx, i.Attribute()
|
||||
}
|
||||
|
||||
// Len returns a number of attributes in the iterated set.
|
||||
func (i *Iterator) Len() int {
|
||||
return i.storage.Len()
|
||||
}
|
||||
|
||||
// ToSlice is a convenience function that creates a slice of labels
|
||||
// from the passed iterator. The iterator is set up to start from the
|
||||
// beginning before creating the slice.
|
||||
// ToSlice is a convenience function that creates a slice of attributes from
|
||||
// the passed iterator. The iterator is set up to start from the beginning
|
||||
// before creating the slice.
|
||||
func (i *Iterator) ToSlice() []KeyValue {
|
||||
l := i.Len()
|
||||
if l == 0 {
|
||||
@ -77,12 +88,12 @@ func (i *Iterator) ToSlice() []KeyValue {
|
||||
i.idx = -1
|
||||
slice := make([]KeyValue, 0, l)
|
||||
for i.Next() {
|
||||
slice = append(slice, i.Label())
|
||||
slice = append(slice, i.Attribute())
|
||||
}
|
||||
return slice
|
||||
}
|
||||
|
||||
// NewMergeIterator returns a MergeIterator for merging two label sets
|
||||
// NewMergeIterator returns a MergeIterator for merging two attribute sets.
|
||||
// Duplicates are resolved by taking the value from the first set.
|
||||
func NewMergeIterator(s1, s2 *Set) MergeIterator {
|
||||
mi := MergeIterator{
|
||||
@ -102,42 +113,49 @@ func makeOne(iter Iterator) oneIterator {
|
||||
|
||||
func (oi *oneIterator) advance() {
|
||||
if oi.done = !oi.iter.Next(); !oi.done {
|
||||
oi.label = oi.iter.Label()
|
||||
oi.attr = oi.iter.Attribute()
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns true if there is another label available.
|
||||
// Next returns true if there is another attribute available.
|
||||
func (m *MergeIterator) Next() bool {
|
||||
if m.one.done && m.two.done {
|
||||
return false
|
||||
}
|
||||
if m.one.done {
|
||||
m.current = m.two.label
|
||||
m.current = m.two.attr
|
||||
m.two.advance()
|
||||
return true
|
||||
}
|
||||
if m.two.done {
|
||||
m.current = m.one.label
|
||||
m.current = m.one.attr
|
||||
m.one.advance()
|
||||
return true
|
||||
}
|
||||
if m.one.label.Key == m.two.label.Key {
|
||||
m.current = m.one.label // first iterator label value wins
|
||||
if m.one.attr.Key == m.two.attr.Key {
|
||||
m.current = m.one.attr // first iterator attribute value wins
|
||||
m.one.advance()
|
||||
m.two.advance()
|
||||
return true
|
||||
}
|
||||
if m.one.label.Key < m.two.label.Key {
|
||||
m.current = m.one.label
|
||||
if m.one.attr.Key < m.two.attr.Key {
|
||||
m.current = m.one.attr
|
||||
m.one.advance()
|
||||
return true
|
||||
}
|
||||
m.current = m.two.label
|
||||
m.current = m.two.attr
|
||||
m.two.advance()
|
||||
return true
|
||||
}
|
||||
|
||||
// Label returns the current value after Next() returns true.
|
||||
//
|
||||
// Deprecated: Use Attribute instead.
|
||||
func (m *MergeIterator) Label() KeyValue {
|
||||
return m.current
|
||||
}
|
||||
|
||||
// Attribute returns the current value after Next() returns true.
|
||||
func (m *MergeIterator) Attribute() KeyValue {
|
||||
return m.current
|
||||
}
|
||||
|
@ -31,15 +31,15 @@ func TestIterator(t *testing.T) {
|
||||
require.Equal(t, 2, iter.Len())
|
||||
|
||||
require.True(t, iter.Next())
|
||||
require.Equal(t, one, iter.Label())
|
||||
idx, attr := iter.IndexedLabel()
|
||||
require.Equal(t, one, iter.Attribute())
|
||||
idx, attr := iter.IndexedAttribute()
|
||||
require.Equal(t, 0, idx)
|
||||
require.Equal(t, one, attr)
|
||||
require.Equal(t, 2, iter.Len())
|
||||
|
||||
require.True(t, iter.Next())
|
||||
require.Equal(t, two, iter.Label())
|
||||
idx, attr = iter.IndexedLabel()
|
||||
require.Equal(t, two, iter.Attribute())
|
||||
idx, attr = iter.IndexedAttribute()
|
||||
require.Equal(t, 1, idx)
|
||||
require.Equal(t, two, attr)
|
||||
require.Equal(t, 2, iter.Len())
|
||||
@ -64,7 +64,7 @@ func TestMergedIterator(t *testing.T) {
|
||||
expect []string
|
||||
}
|
||||
|
||||
makeLabels := func(keys []string, num int) (result []attribute.KeyValue) {
|
||||
makeAttributes := func(keys []string, num int) (result []attribute.KeyValue) {
|
||||
for _, k := range keys {
|
||||
result = append(result, attribute.Int(k, num))
|
||||
}
|
||||
@ -128,19 +128,19 @@ func TestMergedIterator(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
t.Run(input.name, func(t *testing.T) {
|
||||
labels1 := makeLabels(input.keys1, 1)
|
||||
labels2 := makeLabels(input.keys2, 2)
|
||||
attr1 := makeAttributes(input.keys1, 1)
|
||||
attr2 := makeAttributes(input.keys2, 2)
|
||||
|
||||
set1 := attribute.NewSet(labels1...)
|
||||
set2 := attribute.NewSet(labels2...)
|
||||
set1 := attribute.NewSet(attr1...)
|
||||
set2 := attribute.NewSet(attr2...)
|
||||
|
||||
merge := attribute.NewMergeIterator(&set1, &set2)
|
||||
|
||||
var result []string
|
||||
|
||||
for merge.Next() {
|
||||
label := merge.Label()
|
||||
result = append(result, fmt.Sprint(label.Key, "/", label.Value.Emit()))
|
||||
attr := merge.Attribute()
|
||||
result = append(result, fmt.Sprint(attr.Key, "/", attr.Value.Emit()))
|
||||
}
|
||||
|
||||
require.Equal(t, input.expect, result)
|
||||
|
143
attribute/set.go
143
attribute/set.go
@ -21,49 +21,42 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
// Set is the representation for a distinct label set. It
|
||||
// manages an immutable set of labels, with an internal cache
|
||||
// for storing label encodings.
|
||||
// Set is the representation for a distinct attribute set. It manages an
|
||||
// immutable set of attributes, with an internal cache for storing
|
||||
// attribute encodings.
|
||||
//
|
||||
// This type supports the `Equivalent` method of comparison
|
||||
// using values of type `Distinct`.
|
||||
//
|
||||
// This type is used to implement:
|
||||
// 1. Metric labels
|
||||
// 2. Resource sets
|
||||
// 3. Correlation map (TODO)
|
||||
// This type supports the Equivalent method of comparison using values of
|
||||
// type Distinct.
|
||||
Set struct {
|
||||
equivalent Distinct
|
||||
}
|
||||
|
||||
// Distinct wraps a variable-size array of `KeyValue`,
|
||||
// constructed with keys in sorted order. This can be used as
|
||||
// a map key or for equality checking between Sets.
|
||||
// Distinct wraps a variable-size array of KeyValue, constructed with keys
|
||||
// in sorted order. This can be used as a map key or for equality checking
|
||||
// between Sets.
|
||||
Distinct struct {
|
||||
iface interface{}
|
||||
}
|
||||
|
||||
// Filter supports removing certain labels from label sets.
|
||||
// When the filter returns true, the label will be kept in
|
||||
// the filtered label set. When the filter returns false, the
|
||||
// label is excluded from the filtered label set, and the
|
||||
// label instead appears in the `removed` list of excluded labels.
|
||||
// Filter supports removing certain attributes from attribute sets. When
|
||||
// the filter returns true, the attribute will be kept in the filtered
|
||||
// attribute set. When the filter returns false, the attribute is excluded
|
||||
// from the filtered attribute set, and the attribute instead appears in
|
||||
// the removed list of excluded attributes.
|
||||
Filter func(KeyValue) bool
|
||||
|
||||
// Sortable implements `sort.Interface`, used for sorting
|
||||
// `KeyValue`. This is an exported type to support a
|
||||
// memory optimization. A pointer to one of these is needed
|
||||
// for the call to `sort.Stable()`, which the caller may
|
||||
// provide in order to avoid an allocation. See
|
||||
// `NewSetWithSortable()`.
|
||||
// Sortable implements sort.Interface, used for sorting KeyValue. This is
|
||||
// an exported type to support a memory optimization. A pointer to one of
|
||||
// these is needed for the call to sort.Stable(), which the caller may
|
||||
// provide in order to avoid an allocation. See NewSetWithSortable().
|
||||
Sortable []KeyValue
|
||||
)
|
||||
|
||||
var (
|
||||
// keyValueType is used in `computeDistinctReflect`.
|
||||
// keyValueType is used in computeDistinctReflect.
|
||||
keyValueType = reflect.TypeOf(KeyValue{})
|
||||
|
||||
// emptySet is returned for empty label sets.
|
||||
// emptySet is returned for empty attribute sets.
|
||||
emptySet = &Set{
|
||||
equivalent: Distinct{
|
||||
iface: [0]KeyValue{},
|
||||
@ -78,17 +71,17 @@ func EmptySet() *Set {
|
||||
return emptySet
|
||||
}
|
||||
|
||||
// reflect abbreviates `reflect.ValueOf`.
|
||||
// reflect abbreviates reflect.ValueOf.
|
||||
func (d Distinct) reflect() reflect.Value {
|
||||
return reflect.ValueOf(d.iface)
|
||||
}
|
||||
|
||||
// Valid returns true if this value refers to a valid `*Set`.
|
||||
// Valid returns true if this value refers to a valid Set.
|
||||
func (d Distinct) Valid() bool {
|
||||
return d.iface != nil
|
||||
}
|
||||
|
||||
// Len returns the number of labels in this set.
|
||||
// Len returns the number of attributes in this set.
|
||||
func (l *Set) Len() int {
|
||||
if l == nil || !l.equivalent.Valid() {
|
||||
return 0
|
||||
@ -96,7 +89,7 @@ func (l *Set) Len() int {
|
||||
return l.equivalent.reflect().Len()
|
||||
}
|
||||
|
||||
// Get returns the KeyValue at ordered position `idx` in this set.
|
||||
// Get returns the KeyValue at ordered position idx in this set.
|
||||
func (l *Set) Get(idx int) (KeyValue, bool) {
|
||||
if l == nil {
|
||||
return KeyValue{}, false
|
||||
@ -142,7 +135,7 @@ func (l *Set) HasValue(k Key) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
// Iter returns an iterator for visiting the labels in this set.
|
||||
// Iter returns an iterator for visiting the attributes in this set.
|
||||
func (l *Set) Iter() Iterator {
|
||||
return Iterator{
|
||||
storage: l,
|
||||
@ -150,18 +143,17 @@ func (l *Set) Iter() Iterator {
|
||||
}
|
||||
}
|
||||
|
||||
// ToSlice returns the set of labels belonging to this set, sorted,
|
||||
// where keys appear no more than once.
|
||||
// ToSlice returns the set of attributes belonging to this set, sorted, where
|
||||
// keys appear no more than once.
|
||||
func (l *Set) ToSlice() []KeyValue {
|
||||
iter := l.Iter()
|
||||
return iter.ToSlice()
|
||||
}
|
||||
|
||||
// Equivalent returns a value that may be used as a map key. The
|
||||
// Distinct type guarantees that the result will equal the equivalent
|
||||
// Distinct value of any label set with the same elements as this,
|
||||
// where sets are made unique by choosing the last value in the input
|
||||
// for any given key.
|
||||
// Equivalent returns a value that may be used as a map key. The Distinct type
|
||||
// guarantees that the result will equal the equivalent. Distinct value of any
|
||||
// attribute set with the same elements as this, where sets are made unique by
|
||||
// choosing the last value in the input for any given key.
|
||||
func (l *Set) Equivalent() Distinct {
|
||||
if l == nil || !l.equivalent.Valid() {
|
||||
return emptySet.equivalent
|
||||
@ -174,8 +166,7 @@ func (l *Set) Equals(o *Set) bool {
|
||||
return l.Equivalent() == o.Equivalent()
|
||||
}
|
||||
|
||||
// Encoded returns the encoded form of this set, according to
|
||||
// `encoder`.
|
||||
// Encoded returns the encoded form of this set, according to encoder.
|
||||
func (l *Set) Encoded(encoder Encoder) string {
|
||||
if l == nil || encoder == nil {
|
||||
return ""
|
||||
@ -190,11 +181,11 @@ func empty() Set {
|
||||
}
|
||||
}
|
||||
|
||||
// NewSet returns a new `Set`. See the documentation for
|
||||
// `NewSetWithSortableFiltered` for more details.
|
||||
// NewSet returns a new Set. See the documentation for
|
||||
// NewSetWithSortableFiltered for more details.
|
||||
//
|
||||
// Except for empty sets, this method adds an additional allocation
|
||||
// compared with calls that include a `*Sortable`.
|
||||
// Except for empty sets, this method adds an additional allocation compared
|
||||
// with calls that include a Sortable.
|
||||
func NewSet(kvs ...KeyValue) Set {
|
||||
// Check for empty set.
|
||||
if len(kvs) == 0 {
|
||||
@ -204,10 +195,10 @@ func NewSet(kvs ...KeyValue) Set {
|
||||
return s
|
||||
}
|
||||
|
||||
// NewSetWithSortable returns a new `Set`. See the documentation for
|
||||
// `NewSetWithSortableFiltered` for more details.
|
||||
// NewSetWithSortable returns a new Set. See the documentation for
|
||||
// NewSetWithSortableFiltered for more details.
|
||||
//
|
||||
// This call includes a `*Sortable` option as a memory optimization.
|
||||
// This call includes a Sortable option as a memory optimization.
|
||||
func NewSetWithSortable(kvs []KeyValue, tmp *Sortable) Set {
|
||||
// Check for empty set.
|
||||
if len(kvs) == 0 {
|
||||
@ -217,12 +208,11 @@ func NewSetWithSortable(kvs []KeyValue, tmp *Sortable) Set {
|
||||
return s
|
||||
}
|
||||
|
||||
// NewSetWithFiltered returns a new `Set`. See the documentation for
|
||||
// `NewSetWithSortableFiltered` for more details.
|
||||
// NewSetWithFiltered returns a new Set. See the documentation for
|
||||
// NewSetWithSortableFiltered for more details.
|
||||
//
|
||||
// This call includes a `Filter` to include/exclude label keys from
|
||||
// the return value. Excluded keys are returned as a slice of label
|
||||
// values.
|
||||
// This call includes a Filter to include/exclude attribute keys from the
|
||||
// return value. Excluded keys are returned as a slice of attribute values.
|
||||
func NewSetWithFiltered(kvs []KeyValue, filter Filter) (Set, []KeyValue) {
|
||||
// Check for empty set.
|
||||
if len(kvs) == 0 {
|
||||
@ -231,7 +221,7 @@ func NewSetWithFiltered(kvs []KeyValue, filter Filter) (Set, []KeyValue) {
|
||||
return NewSetWithSortableFiltered(kvs, new(Sortable), filter)
|
||||
}
|
||||
|
||||
// NewSetWithSortableFiltered returns a new `Set`.
|
||||
// NewSetWithSortableFiltered returns a new Set.
|
||||
//
|
||||
// Duplicate keys are eliminated by taking the last value. This
|
||||
// re-orders the input slice so that unique last-values are contiguous
|
||||
@ -243,17 +233,16 @@ func NewSetWithFiltered(kvs []KeyValue, filter Filter) (Set, []KeyValue) {
|
||||
// - Caller sees the reordering, but doesn't lose values
|
||||
// - Repeated call preserve last-value wins.
|
||||
//
|
||||
// Note that methods are defined on `*Set`, although this returns `Set`.
|
||||
// Callers can avoid memory allocations by:
|
||||
// Note that methods are defined on Set, although this returns Set. Callers
|
||||
// can avoid memory allocations by:
|
||||
//
|
||||
// - allocating a `Sortable` for use as a temporary in this method
|
||||
// - allocating a `Set` for storing the return value of this
|
||||
// constructor.
|
||||
// - allocating a Sortable for use as a temporary in this method
|
||||
// - allocating a Set for storing the return value of this constructor.
|
||||
//
|
||||
// The result maintains a cache of encoded labels, by attribute.EncoderID.
|
||||
// The result maintains a cache of encoded attributes, by attribute.EncoderID.
|
||||
// This value should not be copied after its first use.
|
||||
//
|
||||
// The second `[]KeyValue` return value is a list of labels that were
|
||||
// The second []KeyValue return value is a list of attributes that were
|
||||
// excluded by the Filter (if non-nil).
|
||||
func NewSetWithSortableFiltered(kvs []KeyValue, tmp *Sortable, filter Filter) (Set, []KeyValue) {
|
||||
// Check for empty set.
|
||||
@ -293,13 +282,13 @@ func NewSetWithSortableFiltered(kvs []KeyValue, tmp *Sortable, filter Filter) (S
|
||||
}, nil
|
||||
}
|
||||
|
||||
// filterSet reorders `kvs` so that included keys are contiguous at
|
||||
// the end of the slice, while excluded keys precede the included keys.
|
||||
// filterSet reorders kvs so that included keys are contiguous at the end of
|
||||
// the slice, while excluded keys precede the included keys.
|
||||
func filterSet(kvs []KeyValue, filter Filter) (Set, []KeyValue) {
|
||||
var excluded []KeyValue
|
||||
|
||||
// Move labels that do not match the filter so
|
||||
// they're adjacent before calling computeDistinct().
|
||||
// Move attributes that do not match the filter so they're adjacent before
|
||||
// calling computeDistinct().
|
||||
distinctPosition := len(kvs)
|
||||
|
||||
// Swap indistinct keys forward and distinct keys toward the
|
||||
@ -319,8 +308,8 @@ func filterSet(kvs []KeyValue, filter Filter) (Set, []KeyValue) {
|
||||
}, excluded
|
||||
}
|
||||
|
||||
// Filter returns a filtered copy of this `Set`. See the
|
||||
// documentation for `NewSetWithSortableFiltered` for more details.
|
||||
// Filter returns a filtered copy of this Set. See the documentation for
|
||||
// NewSetWithSortableFiltered for more details.
|
||||
func (l *Set) Filter(re Filter) (Set, []KeyValue) {
|
||||
if re == nil {
|
||||
return Set{
|
||||
@ -333,9 +322,9 @@ func (l *Set) Filter(re Filter) (Set, []KeyValue) {
|
||||
return filterSet(l.ToSlice(), re)
|
||||
}
|
||||
|
||||
// computeDistinct returns a `Distinct` using either the fixed- or
|
||||
// reflect-oriented code path, depending on the size of the input.
|
||||
// The input slice is assumed to already be sorted and de-duplicated.
|
||||
// computeDistinct returns a Distinct using either the fixed- or
|
||||
// reflect-oriented code path, depending on the size of the input. The input
|
||||
// slice is assumed to already be sorted and de-duplicated.
|
||||
func computeDistinct(kvs []KeyValue) Distinct {
|
||||
iface := computeDistinctFixed(kvs)
|
||||
if iface == nil {
|
||||
@ -346,8 +335,8 @@ func computeDistinct(kvs []KeyValue) Distinct {
|
||||
}
|
||||
}
|
||||
|
||||
// computeDistinctFixed computes a `Distinct` for small slices. It
|
||||
// returns nil if the input is too large for this code path.
|
||||
// computeDistinctFixed computes a Distinct for small slices. It returns nil
|
||||
// if the input is too large for this code path.
|
||||
func computeDistinctFixed(kvs []KeyValue) interface{} {
|
||||
switch len(kvs) {
|
||||
case 1:
|
||||
@ -395,8 +384,8 @@ func computeDistinctFixed(kvs []KeyValue) interface{} {
|
||||
}
|
||||
}
|
||||
|
||||
// computeDistinctReflect computes a `Distinct` using reflection,
|
||||
// works for any size input.
|
||||
// computeDistinctReflect computes a Distinct using reflection, works for any
|
||||
// size input.
|
||||
func computeDistinctReflect(kvs []KeyValue) interface{} {
|
||||
at := reflect.New(reflect.ArrayOf(len(kvs), keyValueType)).Elem()
|
||||
for i, keyValue := range kvs {
|
||||
@ -405,7 +394,7 @@ func computeDistinctReflect(kvs []KeyValue) interface{} {
|
||||
return at.Interface()
|
||||
}
|
||||
|
||||
// MarshalJSON returns the JSON encoding of the `*Set`.
|
||||
// MarshalJSON returns the JSON encoding of the Set.
|
||||
func (l *Set) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(l.equivalent.iface)
|
||||
}
|
||||
@ -419,17 +408,17 @@ func (l Set) MarshalLog() interface{} {
|
||||
return kvs
|
||||
}
|
||||
|
||||
// Len implements `sort.Interface`.
|
||||
// Len implements sort.Interface.
|
||||
func (l *Sortable) Len() int {
|
||||
return len(*l)
|
||||
}
|
||||
|
||||
// Swap implements `sort.Interface`.
|
||||
// Swap implements sort.Interface.
|
||||
func (l *Sortable) Swap(i, j int) {
|
||||
(*l)[i], (*l)[j] = (*l)[j], (*l)[i]
|
||||
}
|
||||
|
||||
// Less implements `sort.Interface`.
|
||||
// Less implements sort.Interface.
|
||||
func (l *Sortable) Less(i, j int) bool {
|
||||
return (*l)[i].Key < (*l)[j].Key
|
||||
}
|
||||
|
@ -159,8 +159,8 @@ func TestUniqueness(t *testing.T) {
|
||||
for _, tc := range cases {
|
||||
cpy := make([]attribute.KeyValue, len(tc.kvs))
|
||||
copy(cpy, tc.kvs)
|
||||
distinct, uniq := attribute.NewSetWithFiltered(cpy, func(label attribute.KeyValue) bool {
|
||||
return tc.keyRe.MatchString(string(label.Key))
|
||||
distinct, uniq := attribute.NewSetWithFiltered(cpy, func(attr attribute.KeyValue) bool {
|
||||
return tc.keyRe.MatchString(string(attr.Key))
|
||||
})
|
||||
|
||||
full := attribute.NewSet(uniq...)
|
||||
|
@ -91,7 +91,7 @@ func (d *metricReader) ForEach(_ aggregation.TemporalitySelector, f func(export.
|
||||
if len(ts.Points) == 0 {
|
||||
continue
|
||||
}
|
||||
ls, err := convertLabels(m.Descriptor.LabelKeys, ts.LabelValues)
|
||||
attrs, err := convertAttrs(m.Descriptor.LabelKeys, ts.LabelValues)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
continue
|
||||
@ -101,7 +101,7 @@ func (d *metricReader) ForEach(_ aggregation.TemporalitySelector, f func(export.
|
||||
func(agg aggregation.Aggregation, end time.Time) error {
|
||||
return f(export.NewRecord(
|
||||
&descriptor,
|
||||
&ls,
|
||||
&attrs,
|
||||
agg,
|
||||
ts.StartTime,
|
||||
end,
|
||||
@ -115,36 +115,36 @@ func (d *metricReader) ForEach(_ aggregation.TemporalitySelector, f func(export.
|
||||
return nil
|
||||
}
|
||||
|
||||
// convertLabels converts from OpenCensus label keys and values to an
|
||||
// OpenTelemetry label Set.
|
||||
func convertLabels(keys []metricdata.LabelKey, values []metricdata.LabelValue) (attribute.Set, error) {
|
||||
// convertAttrs converts from OpenCensus attribute keys and values to an
|
||||
// OpenTelemetry attribute Set.
|
||||
func convertAttrs(keys []metricdata.LabelKey, values []metricdata.LabelValue) (attribute.Set, error) {
|
||||
if len(keys) != len(values) {
|
||||
return attribute.NewSet(), fmt.Errorf("%w different number of label keys (%d) and values (%d)", errConversion, len(keys), len(values))
|
||||
}
|
||||
labels := []attribute.KeyValue{}
|
||||
attrs := []attribute.KeyValue{}
|
||||
for i, lv := range values {
|
||||
if !lv.Present {
|
||||
continue
|
||||
}
|
||||
labels = append(labels, attribute.KeyValue{
|
||||
attrs = append(attrs, attribute.KeyValue{
|
||||
Key: attribute.Key(keys[i].Key),
|
||||
Value: attribute.StringValue(lv.Value),
|
||||
})
|
||||
}
|
||||
return attribute.NewSet(labels...), nil
|
||||
return attribute.NewSet(attrs...), nil
|
||||
}
|
||||
|
||||
// convertResource converts an OpenCensus Resource to an OpenTelemetry Resource
|
||||
// Note: the ocresource.Resource Type field is not used.
|
||||
func convertResource(res *ocresource.Resource) *resource.Resource {
|
||||
labels := []attribute.KeyValue{}
|
||||
attrs := []attribute.KeyValue{}
|
||||
if res == nil {
|
||||
return nil
|
||||
}
|
||||
for k, v := range res.Labels {
|
||||
labels = append(labels, attribute.KeyValue{Key: attribute.Key(k), Value: attribute.StringValue(v)})
|
||||
attrs = append(attrs, attribute.KeyValue{Key: attribute.Key(k), Value: attribute.StringValue(v)})
|
||||
}
|
||||
return resource.NewSchemaless(labels...)
|
||||
return resource.NewSchemaless(attrs...)
|
||||
}
|
||||
|
||||
// convertDescriptor converts an OpenCensus Descriptor to an OpenTelemetry Descriptor
|
||||
|
@ -110,12 +110,12 @@ func TestExportMetrics(t *testing.T) {
|
||||
expectedHandledError: errConversion,
|
||||
},
|
||||
{
|
||||
desc: "labels conversion error",
|
||||
desc: "attrs conversion error",
|
||||
input: []*metricdata.Metric{
|
||||
{
|
||||
// No descriptor with label keys.
|
||||
// No descriptor with attribute keys.
|
||||
TimeSeries: []*metricdata.TimeSeries{
|
||||
// 1 label value, which doens't exist in keys.
|
||||
// 1 attribute value, which doens't exist in keys.
|
||||
{
|
||||
LabelValues: []metricdata.LabelValue{{Value: "foo", Present: true}},
|
||||
Points: []metricdata.Point{
|
||||
@ -269,8 +269,8 @@ func TestExportMetrics(t *testing.T) {
|
||||
}
|
||||
// Don't bother with a complete check of the descriptor.
|
||||
// That is checked as part of descriptor conversion tests below.
|
||||
if !output[i].Labels().Equals(expected.Labels()) {
|
||||
t.Errorf("ExportMetrics(%+v)[i].Labels() = %+v, want %+v", tc.input, output[i].Labels(), expected.Labels())
|
||||
if !output[i].Attributes().Equals(expected.Attributes()) {
|
||||
t.Errorf("ExportMetrics(%+v)[i].Attributes() = %+v, want %+v", tc.input, output[i].Attributes(), expected.Attributes())
|
||||
}
|
||||
if output[i].Aggregation().Kind() != expected.Aggregation().Kind() {
|
||||
t.Errorf("ExportMetrics(%+v)[i].Aggregation() = %+v, want %+v", tc.input, output[i].Aggregation().Kind(), expected.Aggregation().Kind())
|
||||
@ -282,7 +282,7 @@ func TestExportMetrics(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConvertLabels(t *testing.T) {
|
||||
func TestConvertAttributes(t *testing.T) {
|
||||
setWithMultipleKeys := attribute.NewSet(
|
||||
attribute.KeyValue{Key: attribute.Key("first"), Value: attribute.StringValue("1")},
|
||||
attribute.KeyValue{Key: attribute.Key("second"), Value: attribute.StringValue("2")},
|
||||
@ -295,7 +295,7 @@ func TestConvertLabels(t *testing.T) {
|
||||
expectedErr error
|
||||
}{
|
||||
{
|
||||
desc: "no labels",
|
||||
desc: "no attributes",
|
||||
expected: attribute.EmptySet(),
|
||||
},
|
||||
{
|
||||
@ -325,12 +325,12 @@ func TestConvertLabels(t *testing.T) {
|
||||
},
|
||||
} {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
output, err := convertLabels(tc.inputKeys, tc.inputValues)
|
||||
output, err := convertAttrs(tc.inputKeys, tc.inputValues)
|
||||
if !errors.Is(err, tc.expectedErr) {
|
||||
t.Errorf("convertLabels(keys: %v, values: %v) = err(%v), want err(%v)", tc.inputKeys, tc.inputValues, err, tc.expectedErr)
|
||||
t.Errorf("convertAttrs(keys: %v, values: %v) = err(%v), want err(%v)", tc.inputKeys, tc.inputValues, err, tc.expectedErr)
|
||||
}
|
||||
if !output.Equals(tc.expected) {
|
||||
t.Errorf("convertLabels(keys: %v, values: %v) = %+v, want %+v", tc.inputKeys, tc.inputValues, output.ToSlice(), tc.expected.ToSlice())
|
||||
t.Errorf("convertAttrs(keys: %v, values: %v) = %+v, want %+v", tc.inputKeys, tc.inputValues, output.ToSlice(), tc.expected.ToSlice())
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -352,7 +352,7 @@ func TestConvertResource(t *testing.T) {
|
||||
expected: resource.NewSchemaless(),
|
||||
},
|
||||
{
|
||||
desc: "resource with labels",
|
||||
desc: "resource with attributes",
|
||||
input: &ocresource.Resource{
|
||||
Labels: map[string]string{
|
||||
"foo": "bar",
|
||||
|
@ -122,7 +122,7 @@ func (s *bridgeSpan) logRecord(record ot.LogRecord) {
|
||||
s.otelSpan.AddEvent(
|
||||
"",
|
||||
trace.WithTimestamp(record.Timestamp),
|
||||
trace.WithAttributes(otLogFieldsToOTelLabels(record.Fields)...),
|
||||
trace.WithAttributes(otLogFieldsToOTelAttrs(record.Fields)...),
|
||||
)
|
||||
}
|
||||
|
||||
@ -153,7 +153,7 @@ func (s *bridgeSpan) SetTag(key string, value interface{}) ot.Span {
|
||||
s.otelSpan.SetStatus(codes.Error, "")
|
||||
}
|
||||
default:
|
||||
s.otelSpan.SetAttributes(otTagToOTelLabel(key, value))
|
||||
s.otelSpan.SetAttributes(otTagToOTelAttr(key, value))
|
||||
}
|
||||
return s
|
||||
}
|
||||
@ -161,7 +161,7 @@ func (s *bridgeSpan) SetTag(key string, value interface{}) ot.Span {
|
||||
func (s *bridgeSpan) LogFields(fields ...otlog.Field) {
|
||||
s.otelSpan.AddEvent(
|
||||
"",
|
||||
trace.WithAttributes(otLogFieldsToOTelLabels(fields)...),
|
||||
trace.WithAttributes(otLogFieldsToOTelAttrs(fields)...),
|
||||
)
|
||||
}
|
||||
|
||||
@ -216,10 +216,10 @@ func (e *bridgeFieldEncoder) EmitLazyLogger(value otlog.LazyLogger) {
|
||||
}
|
||||
|
||||
func (e *bridgeFieldEncoder) emitCommon(key string, value interface{}) {
|
||||
e.pairs = append(e.pairs, otTagToOTelLabel(key, value))
|
||||
e.pairs = append(e.pairs, otTagToOTelAttr(key, value))
|
||||
}
|
||||
|
||||
func otLogFieldsToOTelLabels(fields []otlog.Field) []attribute.KeyValue {
|
||||
func otLogFieldsToOTelAttrs(fields []otlog.Field) []attribute.KeyValue {
|
||||
encoder := &bridgeFieldEncoder{}
|
||||
for _, field := range fields {
|
||||
field.Marshal(encoder)
|
||||
@ -507,13 +507,13 @@ func otTagsToOTelAttributesKindAndError(tags map[string]interface{}) ([]attribut
|
||||
err = true
|
||||
}
|
||||
default:
|
||||
pairs = append(pairs, otTagToOTelLabel(k, v))
|
||||
pairs = append(pairs, otTagToOTelAttr(k, v))
|
||||
}
|
||||
}
|
||||
return pairs, kind, err
|
||||
}
|
||||
|
||||
// otTagToOTelLabel converts given key-value into attribute.KeyValue.
|
||||
// otTagToOTelAttr converts given key-value into attribute.KeyValue.
|
||||
// Note that some conversions are not obvious:
|
||||
// - int -> int64
|
||||
// - uint -> string
|
||||
@ -521,8 +521,8 @@ func otTagsToOTelAttributesKindAndError(tags map[string]interface{}) ([]attribut
|
||||
// - uint32 -> int64
|
||||
// - uint64 -> string
|
||||
// - float32 -> float64
|
||||
func otTagToOTelLabel(k string, v interface{}) attribute.KeyValue {
|
||||
key := otTagToOTelLabelKey(k)
|
||||
func otTagToOTelAttr(k string, v interface{}) attribute.KeyValue {
|
||||
key := otTagToOTelAttrKey(k)
|
||||
switch val := v.(type) {
|
||||
case bool:
|
||||
return key.Bool(val)
|
||||
@ -549,7 +549,7 @@ func otTagToOTelLabel(k string, v interface{}) attribute.KeyValue {
|
||||
}
|
||||
}
|
||||
|
||||
func otTagToOTelLabelKey(k string) attribute.Key {
|
||||
func otTagToOTelAttrKey(k string) attribute.Key {
|
||||
return attribute.Key(k)
|
||||
}
|
||||
|
||||
|
@ -652,7 +652,7 @@ func runOTOtelOT(t *testing.T, ctx context.Context, name string, callback func(*
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
func TestOtTagToOTelLabelCheckTypeConversions(t *testing.T) {
|
||||
func TestOtTagToOTelAttrCheckTypeConversions(t *testing.T) {
|
||||
tableTest := []struct {
|
||||
key string
|
||||
value interface{}
|
||||
@ -716,7 +716,7 @@ func TestOtTagToOTelLabelCheckTypeConversions(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range tableTest {
|
||||
got := otTagToOTelLabel(test.key, test.value)
|
||||
got := otTagToOTelAttr(test.key, test.value)
|
||||
if test.expectedValueType != got.Value.Type() {
|
||||
t.Errorf("Expected type %s, but got %s after conversion '%v' value",
|
||||
test.expectedValueType,
|
||||
|
@ -88,19 +88,19 @@ func main() {
|
||||
|
||||
tracer := otel.Tracer("test-tracer")
|
||||
|
||||
// labels represent additional key-value descriptors that can be bound to a
|
||||
// metric observer or recorder.
|
||||
commonLabels := []attribute.KeyValue{
|
||||
attribute.String("labelA", "chocolate"),
|
||||
attribute.String("labelB", "raspberry"),
|
||||
attribute.String("labelC", "vanilla"),
|
||||
// Attributes represent additional key-value descriptors that can be bound
|
||||
// to a metric observer or recorder.
|
||||
commonAttrs := []attribute.KeyValue{
|
||||
attribute.String("attrA", "chocolate"),
|
||||
attribute.String("attrB", "raspberry"),
|
||||
attribute.String("attrC", "vanilla"),
|
||||
}
|
||||
|
||||
// work begins
|
||||
ctx, span := tracer.Start(
|
||||
context.Background(),
|
||||
"CollectorExporter-Example",
|
||||
trace.WithAttributes(commonLabels...))
|
||||
trace.WithAttributes(commonAttrs...))
|
||||
defer span.End()
|
||||
for i := 0; i < 10; i++ {
|
||||
_, iSpan := tracer.Start(ctx, fmt.Sprintf("Sample-%d", i))
|
||||
|
@ -72,7 +72,7 @@ func main() {
|
||||
|
||||
observerLock := new(sync.RWMutex)
|
||||
observerValueToReport := new(float64)
|
||||
observerLabelsToReport := new([]attribute.KeyValue)
|
||||
observerAttrsToReport := new([]attribute.KeyValue)
|
||||
|
||||
gaugeObserver, err := meter.AsyncFloat64().Gauge("ex.com.one")
|
||||
if err != nil {
|
||||
@ -81,9 +81,9 @@ func main() {
|
||||
_ = meter.RegisterCallback([]instrument.Asynchronous{gaugeObserver}, func(ctx context.Context) {
|
||||
(*observerLock).RLock()
|
||||
value := *observerValueToReport
|
||||
labels := *observerLabelsToReport
|
||||
attrs := *observerAttrsToReport
|
||||
(*observerLock).RUnlock()
|
||||
gaugeObserver.Observe(ctx, value, labels...)
|
||||
gaugeObserver.Observe(ctx, value, attrs...)
|
||||
})
|
||||
|
||||
histogram, err := meter.SyncFloat64().Histogram("ex.com.two")
|
||||
@ -95,36 +95,36 @@ func main() {
|
||||
log.Panicf("failed to initialize instrument: %v", err)
|
||||
}
|
||||
|
||||
commonLabels := []attribute.KeyValue{lemonsKey.Int(10), attribute.String("A", "1"), attribute.String("B", "2"), attribute.String("C", "3")}
|
||||
notSoCommonLabels := []attribute.KeyValue{lemonsKey.Int(13)}
|
||||
commonAttrs := []attribute.KeyValue{lemonsKey.Int(10), attribute.String("A", "1"), attribute.String("B", "2"), attribute.String("C", "3")}
|
||||
notSoCommonAttrs := []attribute.KeyValue{lemonsKey.Int(13)}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
(*observerLock).Lock()
|
||||
*observerValueToReport = 1.0
|
||||
*observerLabelsToReport = commonLabels
|
||||
*observerAttrsToReport = commonAttrs
|
||||
(*observerLock).Unlock()
|
||||
|
||||
histogram.Record(ctx, 2.0, commonLabels...)
|
||||
counter.Add(ctx, 12.0, commonLabels...)
|
||||
histogram.Record(ctx, 2.0, commonAttrs...)
|
||||
counter.Add(ctx, 12.0, commonAttrs...)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
(*observerLock).Lock()
|
||||
*observerValueToReport = 1.0
|
||||
*observerLabelsToReport = notSoCommonLabels
|
||||
*observerAttrsToReport = notSoCommonAttrs
|
||||
(*observerLock).Unlock()
|
||||
histogram.Record(ctx, 2.0, notSoCommonLabels...)
|
||||
counter.Add(ctx, 22.0, notSoCommonLabels...)
|
||||
histogram.Record(ctx, 2.0, notSoCommonAttrs...)
|
||||
counter.Add(ctx, 22.0, notSoCommonAttrs...)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
(*observerLock).Lock()
|
||||
*observerValueToReport = 13.0
|
||||
*observerLabelsToReport = commonLabels
|
||||
*observerAttrsToReport = commonAttrs
|
||||
(*observerLock).Unlock()
|
||||
histogram.Record(ctx, 12.0, commonLabels...)
|
||||
counter.Add(ctx, 13.0, commonLabels...)
|
||||
histogram.Record(ctx, 12.0, commonAttrs...)
|
||||
counter.Add(ctx, 13.0, commonAttrs...)
|
||||
|
||||
fmt.Println("Example finished updating, please visit :2222")
|
||||
|
||||
|
@ -89,10 +89,10 @@ func pointTime() uint64 {
|
||||
}
|
||||
|
||||
type testRecord struct {
|
||||
name string
|
||||
iKind sdkapi.InstrumentKind
|
||||
nKind number.Kind
|
||||
labels []attribute.KeyValue
|
||||
name string
|
||||
iKind sdkapi.InstrumentKind
|
||||
nKind number.Kind
|
||||
attrs []attribute.KeyValue
|
||||
|
||||
meterName string
|
||||
meterOpts []metric.MeterOption
|
||||
@ -102,14 +102,14 @@ func record(
|
||||
name string,
|
||||
iKind sdkapi.InstrumentKind,
|
||||
nKind number.Kind,
|
||||
labels []attribute.KeyValue,
|
||||
attrs []attribute.KeyValue,
|
||||
meterName string,
|
||||
meterOpts ...metric.MeterOption) testRecord {
|
||||
return testRecord{
|
||||
name: name,
|
||||
iKind: iKind,
|
||||
nKind: nKind,
|
||||
labels: labels,
|
||||
attrs: attrs,
|
||||
meterName: meterName,
|
||||
meterOpts: meterOpts,
|
||||
}
|
||||
@ -121,7 +121,7 @@ var (
|
||||
|
||||
testHistogramBoundaries = []float64{2.0, 4.0, 8.0}
|
||||
|
||||
cpu1Labels = []*commonpb.KeyValue{
|
||||
cpu1Attrs = []*commonpb.KeyValue{
|
||||
{
|
||||
Key: "CPU",
|
||||
Value: &commonpb.AnyValue{
|
||||
@ -139,7 +139,7 @@ var (
|
||||
},
|
||||
},
|
||||
}
|
||||
cpu2Labels = []*commonpb.KeyValue{
|
||||
cpu2Attrs = []*commonpb.KeyValue{
|
||||
{
|
||||
Key: "CPU",
|
||||
Value: &commonpb.AnyValue{
|
||||
@ -203,13 +203,13 @@ func TestNoGroupingExport(t *testing.T) {
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu2Labels,
|
||||
Attributes: cpu2Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
@ -247,7 +247,7 @@ func TestHistogramInt64MetricGroupingExport(t *testing.T) {
|
||||
AggregationTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
|
||||
DataPoints: []*metricpb.HistogramDataPoint{
|
||||
{
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
Count: 2,
|
||||
@ -256,7 +256,7 @@ func TestHistogramInt64MetricGroupingExport(t *testing.T) {
|
||||
BucketCounts: []uint64{1, 0, 0, 1},
|
||||
},
|
||||
{
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
Count: 2,
|
||||
Sum: &sum,
|
||||
ExplicitBounds: testHistogramBoundaries,
|
||||
@ -298,7 +298,7 @@ func TestHistogramFloat64MetricGroupingExport(t *testing.T) {
|
||||
AggregationTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
|
||||
DataPoints: []*metricpb.HistogramDataPoint{
|
||||
{
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
Count: 2,
|
||||
@ -307,7 +307,7 @@ func TestHistogramFloat64MetricGroupingExport(t *testing.T) {
|
||||
BucketCounts: []uint64{1, 0, 0, 1},
|
||||
},
|
||||
{
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
Count: 2,
|
||||
Sum: &sum,
|
||||
ExplicitBounds: testHistogramBoundaries,
|
||||
@ -355,13 +355,13 @@ func TestCountInt64MetricGroupingExport(t *testing.T) {
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
@ -405,13 +405,13 @@ func TestCountFloat64MetricGroupingExport(t *testing.T) {
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsDouble{AsDouble: 11.0},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsDouble{AsDouble: 11.0},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
@ -477,25 +477,25 @@ func TestResourceMetricGroupingExport(t *testing.T) {
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu2Labels,
|
||||
Attributes: cpu2Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
@ -582,19 +582,19 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu2Labels,
|
||||
Attributes: cpu2Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
@ -619,7 +619,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
@ -644,7 +644,7 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) {
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
@ -707,7 +707,7 @@ func TestStatelessAggregationTemporality(t *testing.T) {
|
||||
DataPoints: []*metricpb.NumberDataPoint{
|
||||
{
|
||||
Value: &metricpb.NumberDataPoint_AsInt{AsInt: 11},
|
||||
Attributes: cpu1Labels,
|
||||
Attributes: cpu1Attrs,
|
||||
StartTimeUnixNano: startTime(),
|
||||
TimeUnixNano: pointTime(),
|
||||
},
|
||||
@ -730,8 +730,8 @@ func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, res *resource.
|
||||
|
||||
libraryRecs := map[instrumentation.Library][]export.Record{}
|
||||
for _, r := range records {
|
||||
lcopy := make([]attribute.KeyValue, len(r.labels))
|
||||
copy(lcopy, r.labels)
|
||||
lcopy := make([]attribute.KeyValue, len(r.attrs))
|
||||
copy(lcopy, r.attrs)
|
||||
desc := metrictest.NewDescriptor(r.name, r.iKind, r.nKind)
|
||||
labs := attribute.NewSet(lcopy...)
|
||||
|
||||
|
@ -196,13 +196,11 @@ func sink(ctx context.Context, in <-chan result) ([]*metricpb.Metric, error) {
|
||||
continue
|
||||
|
||||
}
|
||||
// Note: There is extra work happening in this code
|
||||
// that can be improved when the work described in
|
||||
// #2119 is completed. The SDK has a guarantee that
|
||||
// no more than one point per period per label set is
|
||||
// produced, so this fallthrough should never happen.
|
||||
// The final step of #2119 is to remove all the
|
||||
// grouping logic here.
|
||||
// Note: There is extra work happening in this code that can be
|
||||
// improved when the work described in #2119 is completed. The SDK has
|
||||
// a guarantee that no more than one point per period per attribute
|
||||
// set is produced, so this fallthrough should never happen. The final
|
||||
// step of #2119 is to remove all the grouping logic here.
|
||||
switch res.Metric.Data.(type) {
|
||||
case *metricpb.Metric_Gauge:
|
||||
m.GetGauge().DataPoints = append(m.GetGauge().DataPoints, res.Metric.GetGauge().DataPoints...)
|
||||
@ -275,7 +273,7 @@ func Record(temporalitySelector aggregation.TemporalitySelector, r export.Record
|
||||
|
||||
func gaugePoint(record export.Record, num number.Number, start, end time.Time) (*metricpb.Metric, error) {
|
||||
desc := record.Descriptor()
|
||||
labels := record.Labels()
|
||||
attrs := record.Attributes()
|
||||
|
||||
m := &metricpb.Metric{
|
||||
Name: desc.Name(),
|
||||
@ -292,7 +290,7 @@ func gaugePoint(record export.Record, num number.Number, start, end time.Time) (
|
||||
Value: &metricpb.NumberDataPoint_AsInt{
|
||||
AsInt: num.CoerceToInt64(n),
|
||||
},
|
||||
Attributes: Iterator(labels.Iter()),
|
||||
Attributes: Iterator(attrs.Iter()),
|
||||
StartTimeUnixNano: toNanos(start),
|
||||
TimeUnixNano: toNanos(end),
|
||||
},
|
||||
@ -307,7 +305,7 @@ func gaugePoint(record export.Record, num number.Number, start, end time.Time) (
|
||||
Value: &metricpb.NumberDataPoint_AsDouble{
|
||||
AsDouble: num.CoerceToFloat64(n),
|
||||
},
|
||||
Attributes: Iterator(labels.Iter()),
|
||||
Attributes: Iterator(attrs.Iter()),
|
||||
StartTimeUnixNano: toNanos(start),
|
||||
TimeUnixNano: toNanos(end),
|
||||
},
|
||||
@ -333,7 +331,7 @@ func sdkTemporalityToTemporality(temporality aggregation.Temporality) metricpb.A
|
||||
|
||||
func sumPoint(record export.Record, num number.Number, start, end time.Time, temporality aggregation.Temporality, monotonic bool) (*metricpb.Metric, error) {
|
||||
desc := record.Descriptor()
|
||||
labels := record.Labels()
|
||||
attrs := record.Attributes()
|
||||
|
||||
m := &metricpb.Metric{
|
||||
Name: desc.Name(),
|
||||
@ -352,7 +350,7 @@ func sumPoint(record export.Record, num number.Number, start, end time.Time, tem
|
||||
Value: &metricpb.NumberDataPoint_AsInt{
|
||||
AsInt: num.CoerceToInt64(n),
|
||||
},
|
||||
Attributes: Iterator(labels.Iter()),
|
||||
Attributes: Iterator(attrs.Iter()),
|
||||
StartTimeUnixNano: toNanos(start),
|
||||
TimeUnixNano: toNanos(end),
|
||||
},
|
||||
@ -369,7 +367,7 @@ func sumPoint(record export.Record, num number.Number, start, end time.Time, tem
|
||||
Value: &metricpb.NumberDataPoint_AsDouble{
|
||||
AsDouble: num.CoerceToFloat64(n),
|
||||
},
|
||||
Attributes: Iterator(labels.Iter()),
|
||||
Attributes: Iterator(attrs.Iter()),
|
||||
StartTimeUnixNano: toNanos(start),
|
||||
TimeUnixNano: toNanos(end),
|
||||
},
|
||||
@ -399,7 +397,7 @@ func histogramValues(a aggregation.Histogram) (boundaries []float64, counts []ui
|
||||
// histogram transforms a Histogram Aggregator into an OTLP Metric.
|
||||
func histogramPoint(record export.Record, temporality aggregation.Temporality, a aggregation.Histogram) (*metricpb.Metric, error) {
|
||||
desc := record.Descriptor()
|
||||
labels := record.Labels()
|
||||
attrs := record.Attributes()
|
||||
boundaries, counts, err := histogramValues(a)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -426,7 +424,7 @@ func histogramPoint(record export.Record, temporality aggregation.Temporality, a
|
||||
DataPoints: []*metricpb.HistogramDataPoint{
|
||||
{
|
||||
Sum: &sumFloat64,
|
||||
Attributes: Iterator(labels.Iter()),
|
||||
Attributes: Iterator(attrs.Iter()),
|
||||
StartTimeUnixNano: toNanos(record.StartTime()),
|
||||
TimeUnixNano: toNanos(record.EndTime()),
|
||||
Count: uint64(count),
|
||||
|
@ -91,20 +91,20 @@ func TestStringKeyValues(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
labels := attribute.NewSet(test.kvs...)
|
||||
assert.Equal(t, test.expected, Iterator(labels.Iter()))
|
||||
attrs := attribute.NewSet(test.kvs...)
|
||||
assert.Equal(t, test.expected, Iterator(attrs.Iter()))
|
||||
}
|
||||
}
|
||||
|
||||
func TestSumIntDataPoints(t *testing.T) {
|
||||
desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Int64Kind)
|
||||
labels := attribute.NewSet(attribute.String("one", "1"))
|
||||
attrs := attribute.NewSet(attribute.String("one", "1"))
|
||||
sums := sum.New(2)
|
||||
s, ckpt := &sums[0], &sums[1]
|
||||
|
||||
assert.NoError(t, s.Update(context.Background(), number.Number(1), &desc))
|
||||
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
|
||||
record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd)
|
||||
record := export.NewRecord(&desc, &attrs, ckpt.Aggregation(), intervalStart, intervalEnd)
|
||||
|
||||
value, err := ckpt.Sum()
|
||||
require.NoError(t, err)
|
||||
@ -135,13 +135,13 @@ func TestSumIntDataPoints(t *testing.T) {
|
||||
|
||||
func TestSumFloatDataPoints(t *testing.T) {
|
||||
desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Float64Kind)
|
||||
labels := attribute.NewSet(attribute.String("one", "1"))
|
||||
attrs := attribute.NewSet(attribute.String("one", "1"))
|
||||
sums := sum.New(2)
|
||||
s, ckpt := &sums[0], &sums[1]
|
||||
|
||||
assert.NoError(t, s.Update(context.Background(), number.NewFloat64Number(1), &desc))
|
||||
require.NoError(t, s.SynchronizedMove(ckpt, &desc))
|
||||
record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd)
|
||||
record := export.NewRecord(&desc, &attrs, ckpt.Aggregation(), intervalStart, intervalEnd)
|
||||
value, err := ckpt.Sum()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -171,13 +171,13 @@ func TestSumFloatDataPoints(t *testing.T) {
|
||||
|
||||
func TestLastValueIntDataPoints(t *testing.T) {
|
||||
desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Int64Kind)
|
||||
labels := attribute.NewSet(attribute.String("one", "1"))
|
||||
attrs := attribute.NewSet(attribute.String("one", "1"))
|
||||
lvs := lastvalue.New(2)
|
||||
lv, ckpt := &lvs[0], &lvs[1]
|
||||
|
||||
assert.NoError(t, lv.Update(context.Background(), number.Number(100), &desc))
|
||||
require.NoError(t, lv.SynchronizedMove(ckpt, &desc))
|
||||
record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd)
|
||||
record := export.NewRecord(&desc, &attrs, ckpt.Aggregation(), intervalStart, intervalEnd)
|
||||
value, timestamp, err := ckpt.LastValue()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -203,9 +203,9 @@ func TestLastValueIntDataPoints(t *testing.T) {
|
||||
|
||||
func TestSumErrUnknownValueType(t *testing.T) {
|
||||
desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Kind(-1))
|
||||
labels := attribute.NewSet()
|
||||
attrs := attribute.NewSet()
|
||||
s := &sum.New(1)[0]
|
||||
record := export.NewRecord(&desc, &labels, s, intervalStart, intervalEnd)
|
||||
record := export.NewRecord(&desc, &attrs, s, intervalStart, intervalEnd)
|
||||
value, err := s.Sum()
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -271,12 +271,12 @@ var _ aggregation.LastValue = &testErrLastValue{}
|
||||
func TestRecordAggregatorIncompatibleErrors(t *testing.T) {
|
||||
makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) {
|
||||
desc := metrictest.NewDescriptor("things", sdkapi.CounterInstrumentKind, number.Int64Kind)
|
||||
labels := attribute.NewSet()
|
||||
attrs := attribute.NewSet()
|
||||
test := &testAgg{
|
||||
kind: kind,
|
||||
agg: agg,
|
||||
}
|
||||
return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, test, intervalStart, intervalEnd))
|
||||
return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &attrs, test, intervalStart, intervalEnd))
|
||||
}
|
||||
|
||||
mpb, err := makeMpb(aggregation.SumKind, &lastvalue.New(1)[0])
|
||||
@ -295,8 +295,8 @@ func TestRecordAggregatorIncompatibleErrors(t *testing.T) {
|
||||
func TestRecordAggregatorUnexpectedErrors(t *testing.T) {
|
||||
makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) {
|
||||
desc := metrictest.NewDescriptor("things", sdkapi.CounterInstrumentKind, number.Int64Kind)
|
||||
labels := attribute.NewSet()
|
||||
return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, agg, intervalStart, intervalEnd))
|
||||
attrs := attribute.NewSet()
|
||||
return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &attrs, agg, intervalStart, intervalEnd))
|
||||
}
|
||||
|
||||
errEx := fmt.Errorf("timeout")
|
||||
|
@ -44,8 +44,8 @@ func OneRecordReader() export.InstrumentationLibraryReader {
|
||||
}
|
||||
start := time.Date(2020, time.December, 8, 19, 15, 0, 0, time.UTC)
|
||||
end := time.Date(2020, time.December, 8, 19, 16, 0, 0, time.UTC)
|
||||
labels := attribute.NewSet(attribute.String("abc", "def"), attribute.Int64("one", 1))
|
||||
rec := export.NewRecord(&desc, &labels, agg[0].Aggregation(), start, end)
|
||||
attrs := attribute.NewSet(attribute.String("abc", "def"), attribute.Int64("one", 1))
|
||||
rec := export.NewRecord(&desc, &attrs, agg[0].Aggregation(), start, end)
|
||||
|
||||
return processortest.MultiInstrumentationLibraryReader(
|
||||
map[instrumentation.Library][]export.Record{
|
||||
|
@ -44,7 +44,7 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter
|
||||
require.NoError(t, cont.Start(ctx))
|
||||
|
||||
meter := cont.Meter("test-meter")
|
||||
labels := []attribute.KeyValue{attribute.Bool("test", true)}
|
||||
attrs := []attribute.KeyValue{attribute.Bool("test", true)}
|
||||
|
||||
type data struct {
|
||||
iKind sdkapi.InstrumentKind
|
||||
@ -66,10 +66,10 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter
|
||||
switch data.nKind {
|
||||
case number.Int64Kind:
|
||||
c, _ := meter.SyncInt64().Counter(name)
|
||||
c.Add(ctx, data.val, labels...)
|
||||
c.Add(ctx, data.val, attrs...)
|
||||
case number.Float64Kind:
|
||||
c, _ := meter.SyncFloat64().Counter(name)
|
||||
c.Add(ctx, float64(data.val), labels...)
|
||||
c.Add(ctx, float64(data.val), attrs...)
|
||||
default:
|
||||
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
|
||||
}
|
||||
@ -77,10 +77,10 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter
|
||||
switch data.nKind {
|
||||
case number.Int64Kind:
|
||||
c, _ := meter.SyncInt64().Histogram(name)
|
||||
c.Record(ctx, data.val, labels...)
|
||||
c.Record(ctx, data.val, attrs...)
|
||||
case number.Float64Kind:
|
||||
c, _ := meter.SyncFloat64().Histogram(name)
|
||||
c.Record(ctx, float64(data.val), labels...)
|
||||
c.Record(ctx, float64(data.val), attrs...)
|
||||
default:
|
||||
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
|
||||
}
|
||||
@ -89,12 +89,12 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter
|
||||
case number.Int64Kind:
|
||||
g, _ := meter.AsyncInt64().Gauge(name)
|
||||
_ = meter.RegisterCallback([]instrument.Asynchronous{g}, func(ctx context.Context) {
|
||||
g.Observe(ctx, data.val, labels...)
|
||||
g.Observe(ctx, data.val, attrs...)
|
||||
})
|
||||
case number.Float64Kind:
|
||||
g, _ := meter.AsyncFloat64().Gauge(name)
|
||||
_ = meter.RegisterCallback([]instrument.Asynchronous{g}, func(ctx context.Context) {
|
||||
g.Observe(ctx, float64(data.val), labels...)
|
||||
g.Observe(ctx, float64(data.val), attrs...)
|
||||
})
|
||||
default:
|
||||
assert.Failf(t, "unsupported number testing kind", data.nKind.String())
|
||||
|
@ -153,9 +153,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) {
|
||||
|
||||
_ = c.exp.Controller().ForEach(func(_ instrumentation.Library, reader export.Reader) error {
|
||||
return reader.ForEach(c.exp, func(record export.Record) error {
|
||||
var labelKeys []string
|
||||
mergeLabels(record, c.exp.controller.Resource(), &labelKeys, nil)
|
||||
ch <- c.toDesc(record, labelKeys)
|
||||
var attrKeys []string
|
||||
mergeAttrs(record, c.exp.controller.Resource(), &attrKeys, nil)
|
||||
ch <- c.toDesc(record, attrKeys)
|
||||
return nil
|
||||
})
|
||||
})
|
||||
@ -181,25 +181,25 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
numberKind := record.Descriptor().NumberKind()
|
||||
instrumentKind := record.Descriptor().InstrumentKind()
|
||||
|
||||
var labelKeys, labels []string
|
||||
mergeLabels(record, c.exp.controller.Resource(), &labelKeys, &labels)
|
||||
var attrKeys, attrs []string
|
||||
mergeAttrs(record, c.exp.controller.Resource(), &attrKeys, &attrs)
|
||||
|
||||
desc := c.toDesc(record, labelKeys)
|
||||
desc := c.toDesc(record, attrKeys)
|
||||
|
||||
if hist, ok := agg.(aggregation.Histogram); ok {
|
||||
if err := c.exportHistogram(ch, hist, numberKind, desc, labels); err != nil {
|
||||
if err := c.exportHistogram(ch, hist, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting histogram: %w", err)
|
||||
}
|
||||
} else if sum, ok := agg.(aggregation.Sum); ok && instrumentKind.Monotonic() {
|
||||
if err := c.exportMonotonicCounter(ch, sum, numberKind, desc, labels); err != nil {
|
||||
if err := c.exportMonotonicCounter(ch, sum, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting monotonic counter: %w", err)
|
||||
}
|
||||
} else if sum, ok := agg.(aggregation.Sum); ok && !instrumentKind.Monotonic() {
|
||||
if err := c.exportNonMonotonicCounter(ch, sum, numberKind, desc, labels); err != nil {
|
||||
if err := c.exportNonMonotonicCounter(ch, sum, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting non monotonic counter: %w", err)
|
||||
}
|
||||
} else if lastValue, ok := agg.(aggregation.LastValue); ok {
|
||||
if err := c.exportLastValue(ch, lastValue, numberKind, desc, labels); err != nil {
|
||||
if err := c.exportLastValue(ch, lastValue, numberKind, desc, attrs); err != nil {
|
||||
return fmt.Errorf("exporting last value: %w", err)
|
||||
}
|
||||
} else {
|
||||
@ -213,13 +213,13 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregation.LastValue, kind number.Kind, desc *prometheus.Desc, labels []string) error {
|
||||
func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregation.LastValue, kind number.Kind, desc *prometheus.Desc, attrs []string) error {
|
||||
lv, _, err := lvagg.LastValue()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving last value: %w", err)
|
||||
}
|
||||
|
||||
m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, lv.CoerceToFloat64(kind), labels...)
|
||||
m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, lv.CoerceToFloat64(kind), attrs...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating constant metric: %w", err)
|
||||
}
|
||||
@ -228,13 +228,13 @@ func (c *collector) exportLastValue(ch chan<- prometheus.Metric, lvagg aggregati
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *collector) exportNonMonotonicCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind number.Kind, desc *prometheus.Desc, labels []string) error {
|
||||
func (c *collector) exportNonMonotonicCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind number.Kind, desc *prometheus.Desc, attrs []string) error {
|
||||
v, err := sum.Sum()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving counter: %w", err)
|
||||
}
|
||||
|
||||
m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, v.CoerceToFloat64(kind), labels...)
|
||||
m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, v.CoerceToFloat64(kind), attrs...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating constant metric: %w", err)
|
||||
}
|
||||
@ -243,13 +243,13 @@ func (c *collector) exportNonMonotonicCounter(ch chan<- prometheus.Metric, sum a
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *collector) exportMonotonicCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind number.Kind, desc *prometheus.Desc, labels []string) error {
|
||||
func (c *collector) exportMonotonicCounter(ch chan<- prometheus.Metric, sum aggregation.Sum, kind number.Kind, desc *prometheus.Desc, attrs []string) error {
|
||||
v, err := sum.Sum()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving counter: %w", err)
|
||||
}
|
||||
|
||||
m, err := prometheus.NewConstMetric(desc, prometheus.CounterValue, v.CoerceToFloat64(kind), labels...)
|
||||
m, err := prometheus.NewConstMetric(desc, prometheus.CounterValue, v.CoerceToFloat64(kind), attrs...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating constant metric: %w", err)
|
||||
}
|
||||
@ -258,7 +258,7 @@ func (c *collector) exportMonotonicCounter(ch chan<- prometheus.Metric, sum aggr
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregation.Histogram, kind number.Kind, desc *prometheus.Desc, labels []string) error {
|
||||
func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregation.Histogram, kind number.Kind, desc *prometheus.Desc, attrs []string) error {
|
||||
buckets, err := hist.Histogram()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error retrieving histogram: %w", err)
|
||||
@ -280,7 +280,7 @@ func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregatio
|
||||
// Include the +inf bucket in the total count.
|
||||
totalCount += uint64(buckets.Counts[len(buckets.Counts)-1])
|
||||
|
||||
m, err := prometheus.NewConstHistogram(desc, totalCount, sum.CoerceToFloat64(kind), counts, labels...)
|
||||
m, err := prometheus.NewConstHistogram(desc, totalCount, sum.CoerceToFloat64(kind), counts, attrs...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating constant histogram: %w", err)
|
||||
}
|
||||
@ -289,34 +289,34 @@ func (c *collector) exportHistogram(ch chan<- prometheus.Metric, hist aggregatio
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *collector) toDesc(record export.Record, labelKeys []string) *prometheus.Desc {
|
||||
func (c *collector) toDesc(record export.Record, attrKeys []string) *prometheus.Desc {
|
||||
desc := record.Descriptor()
|
||||
return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), labelKeys, nil)
|
||||
return prometheus.NewDesc(sanitize(desc.Name()), desc.Description(), attrKeys, nil)
|
||||
}
|
||||
|
||||
// mergeLabels merges the export.Record's labels and resources into a
|
||||
// single set, giving precedence to the record's labels in case of
|
||||
// duplicate keys. This outputs one or both of the keys and the
|
||||
// values as a slice, and either argument may be nil to avoid
|
||||
// allocating an unnecessary slice.
|
||||
func mergeLabels(record export.Record, res *resource.Resource, keys, values *[]string) {
|
||||
// mergeAttrs merges the export.Record's attributes and resources into a
|
||||
// single set, giving precedence to the record's attributes in case of
|
||||
// duplicate keys. This outputs one or both of the keys and the values as a
|
||||
// slice, and either argument may be nil to avoid allocating an unnecessary
|
||||
// slice.
|
||||
func mergeAttrs(record export.Record, res *resource.Resource, keys, values *[]string) {
|
||||
if keys != nil {
|
||||
*keys = make([]string, 0, record.Labels().Len()+res.Len())
|
||||
*keys = make([]string, 0, record.Attributes().Len()+res.Len())
|
||||
}
|
||||
if values != nil {
|
||||
*values = make([]string, 0, record.Labels().Len()+res.Len())
|
||||
*values = make([]string, 0, record.Attributes().Len()+res.Len())
|
||||
}
|
||||
|
||||
// Duplicate keys are resolved by taking the record label value over
|
||||
// Duplicate keys are resolved by taking the record attribute value over
|
||||
// the resource value.
|
||||
mi := attribute.NewMergeIterator(record.Labels(), res.Set())
|
||||
mi := attribute.NewMergeIterator(record.Attributes(), res.Set())
|
||||
for mi.Next() {
|
||||
label := mi.Label()
|
||||
attr := mi.Attribute()
|
||||
if keys != nil {
|
||||
*keys = append(*keys, sanitize(string(label.Key)))
|
||||
*keys = append(*keys, sanitize(string(attr.Key)))
|
||||
}
|
||||
if values != nil {
|
||||
*values = append(*values, label.Value.Emit())
|
||||
*values = append(*values, attr.Value.Emit())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
histogram, err := meter.SyncFloat64().Histogram("histogram")
|
||||
require.NoError(t, err)
|
||||
|
||||
labels := []attribute.KeyValue{
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.Key("A").String("B"),
|
||||
attribute.Key("C").String("D"),
|
||||
}
|
||||
@ -122,8 +122,8 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
|
||||
var expected []expectedMetric
|
||||
|
||||
counter.Add(ctx, 10, labels...)
|
||||
counter.Add(ctx, 5.3, labels...)
|
||||
counter.Add(ctx, 10, attrs...)
|
||||
counter.Add(ctx, 5.3, attrs...)
|
||||
|
||||
expected = append(expected, expectCounter("counter", `counter{A="B",C="D",R="V"} 15.3`))
|
||||
|
||||
@ -131,16 +131,16 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
err = meter.RegisterCallback([]instrument.Asynchronous{gaugeObserver}, func(ctx context.Context) {
|
||||
gaugeObserver.Observe(ctx, 1, labels...)
|
||||
gaugeObserver.Observe(ctx, 1, attrs...)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
expected = append(expected, expectGauge("intgaugeobserver", `intgaugeobserver{A="B",C="D",R="V"} 1`))
|
||||
|
||||
histogram.Record(ctx, -0.6, labels...)
|
||||
histogram.Record(ctx, -0.4, labels...)
|
||||
histogram.Record(ctx, 0.6, labels...)
|
||||
histogram.Record(ctx, 20, labels...)
|
||||
histogram.Record(ctx, -0.6, attrs...)
|
||||
histogram.Record(ctx, -0.4, attrs...)
|
||||
histogram.Record(ctx, 0.6, attrs...)
|
||||
histogram.Record(ctx, 20, attrs...)
|
||||
|
||||
expected = append(expected, expectHistogram("histogram",
|
||||
`histogram_bucket{A="B",C="D",R="V",le="-0.5"} 1`,
|
||||
@ -150,8 +150,8 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
`histogram_count{A="B",C="D",R="V"} 4`,
|
||||
))
|
||||
|
||||
upDownCounter.Add(ctx, 10, labels...)
|
||||
upDownCounter.Add(ctx, -3.2, labels...)
|
||||
upDownCounter.Add(ctx, 10, attrs...)
|
||||
upDownCounter.Add(ctx, -3.2, attrs...)
|
||||
|
||||
expected = append(expected, expectGauge("updowncounter", `updowncounter{A="B",C="D",R="V"} 6.8`))
|
||||
|
||||
@ -159,7 +159,7 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
err = meter.RegisterCallback([]instrument.Asynchronous{counterObserver}, func(ctx context.Context) {
|
||||
counterObserver.Observe(ctx, 7.7, labels...)
|
||||
counterObserver.Observe(ctx, 7.7, attrs...)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -169,7 +169,7 @@ func TestPrometheusExporter(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
err = meter.RegisterCallback([]instrument.Asynchronous{upDownCounterObserver}, func(ctx context.Context) {
|
||||
upDownCounterObserver.Observe(ctx, -7.7, labels...)
|
||||
upDownCounterObserver.Observe(ctx, -7.7, attrs...)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -22,10 +22,10 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
defaultWriter = os.Stdout
|
||||
defaultPrettyPrint = false
|
||||
defaultTimestamps = true
|
||||
defaultLabelEncoder = attribute.DefaultEncoder()
|
||||
defaultWriter = os.Stdout
|
||||
defaultPrettyPrint = false
|
||||
defaultTimestamps = true
|
||||
defaultAttrEncoder = attribute.DefaultEncoder()
|
||||
)
|
||||
|
||||
// config contains options for the STDOUT exporter.
|
||||
@ -41,17 +41,17 @@ type config struct {
|
||||
// true.
|
||||
Timestamps bool
|
||||
|
||||
// LabelEncoder encodes the labels.
|
||||
LabelEncoder attribute.Encoder
|
||||
// Encoder encodes the attributes.
|
||||
Encoder attribute.Encoder
|
||||
}
|
||||
|
||||
// newConfig creates a validated Config configured with options.
|
||||
func newConfig(options ...Option) (config, error) {
|
||||
cfg := config{
|
||||
Writer: defaultWriter,
|
||||
PrettyPrint: defaultPrettyPrint,
|
||||
Timestamps: defaultTimestamps,
|
||||
LabelEncoder: defaultLabelEncoder,
|
||||
Writer: defaultWriter,
|
||||
PrettyPrint: defaultPrettyPrint,
|
||||
Timestamps: defaultTimestamps,
|
||||
Encoder: defaultAttrEncoder,
|
||||
}
|
||||
for _, opt := range options {
|
||||
cfg = opt.apply(cfg)
|
||||
@ -103,16 +103,16 @@ func (o timestampsOption) apply(cfg config) config {
|
||||
return cfg
|
||||
}
|
||||
|
||||
// WithLabelEncoder sets the label encoder used in export.
|
||||
func WithLabelEncoder(enc attribute.Encoder) Option {
|
||||
return labelEncoderOption{enc}
|
||||
// WithAttributeEncoder sets the attribute encoder used in export.
|
||||
func WithAttributeEncoder(enc attribute.Encoder) Option {
|
||||
return attrEncoderOption{enc}
|
||||
}
|
||||
|
||||
type labelEncoderOption struct {
|
||||
LabelEncoder attribute.Encoder
|
||||
type attrEncoderOption struct {
|
||||
encoder attribute.Encoder
|
||||
}
|
||||
|
||||
func (o labelEncoderOption) apply(cfg config) config {
|
||||
cfg.LabelEncoder = o.LabelEncoder
|
||||
func (o attrEncoderOption) apply(cfg config) config {
|
||||
cfg.Encoder = o.encoder
|
||||
return cfg
|
||||
}
|
||||
|
@ -54,24 +54,24 @@ func (e *metricExporter) Export(_ context.Context, res *resource.Resource, reade
|
||||
var batch []line
|
||||
aggError = reader.ForEach(func(lib instrumentation.Library, mr export.Reader) error {
|
||||
|
||||
var instLabels []attribute.KeyValue
|
||||
var instAttrs []attribute.KeyValue
|
||||
if name := lib.Name; name != "" {
|
||||
instLabels = append(instLabels, attribute.String("instrumentation.name", name))
|
||||
instAttrs = append(instAttrs, attribute.String("instrumentation.name", name))
|
||||
if version := lib.Version; version != "" {
|
||||
instLabels = append(instLabels, attribute.String("instrumentation.version", version))
|
||||
instAttrs = append(instAttrs, attribute.String("instrumentation.version", version))
|
||||
}
|
||||
if schema := lib.SchemaURL; schema != "" {
|
||||
instLabels = append(instLabels, attribute.String("instrumentation.schema_url", schema))
|
||||
instAttrs = append(instAttrs, attribute.String("instrumentation.schema_url", schema))
|
||||
}
|
||||
}
|
||||
instSet := attribute.NewSet(instLabels...)
|
||||
encodedInstLabels := instSet.Encoded(e.config.LabelEncoder)
|
||||
instSet := attribute.NewSet(instAttrs...)
|
||||
encodedInstAttrs := instSet.Encoded(e.config.Encoder)
|
||||
|
||||
return mr.ForEach(e, func(record export.Record) error {
|
||||
desc := record.Descriptor()
|
||||
agg := record.Aggregation()
|
||||
kind := desc.NumberKind()
|
||||
encodedResource := res.Encoded(e.config.LabelEncoder)
|
||||
encodedResource := res.Encoded(e.config.Encoder)
|
||||
|
||||
var expose line
|
||||
|
||||
@ -93,27 +93,27 @@ func (e *metricExporter) Export(_ context.Context, res *resource.Resource, reade
|
||||
}
|
||||
}
|
||||
|
||||
var encodedLabels string
|
||||
iter := record.Labels().Iter()
|
||||
var encodedAttrs string
|
||||
iter := record.Attributes().Iter()
|
||||
if iter.Len() > 0 {
|
||||
encodedLabels = record.Labels().Encoded(e.config.LabelEncoder)
|
||||
encodedAttrs = record.Attributes().Encoded(e.config.Encoder)
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
|
||||
sb.WriteString(desc.Name())
|
||||
|
||||
if len(encodedLabels) > 0 || len(encodedResource) > 0 || len(encodedInstLabels) > 0 {
|
||||
if len(encodedAttrs) > 0 || len(encodedResource) > 0 || len(encodedInstAttrs) > 0 {
|
||||
sb.WriteRune('{')
|
||||
sb.WriteString(encodedResource)
|
||||
if len(encodedInstLabels) > 0 && len(encodedResource) > 0 {
|
||||
if len(encodedInstAttrs) > 0 && len(encodedResource) > 0 {
|
||||
sb.WriteRune(',')
|
||||
}
|
||||
sb.WriteString(encodedInstLabels)
|
||||
if len(encodedLabels) > 0 && (len(encodedInstLabels) > 0 || len(encodedResource) > 0) {
|
||||
sb.WriteString(encodedInstAttrs)
|
||||
if len(encodedAttrs) > 0 && (len(encodedInstAttrs) > 0 || len(encodedResource) > 0) {
|
||||
sb.WriteRune(',')
|
||||
}
|
||||
sb.WriteString(encodedLabels)
|
||||
sb.WriteString(encodedAttrs)
|
||||
sb.WriteRune('}')
|
||||
}
|
||||
|
||||
|
@ -235,7 +235,7 @@ func TestStdoutResource(t *testing.T) {
|
||||
attribute.String("C", "D"),
|
||||
),
|
||||
// We explicitly do not de-duplicate between resources
|
||||
// and metric labels in this exporter.
|
||||
// and metric attributes in this exporter.
|
||||
newCase("resource deduplication",
|
||||
"R1=V1,R2=V2,instrumentation.name=test,R1=V3,R2=V4",
|
||||
resource.NewSchemaless(attribute.String("R1", "V1"), attribute.String("R2", "V2")),
|
||||
|
@ -42,10 +42,9 @@ type (
|
||||
// value needs to be aligned for 64-bit atomic operations.
|
||||
value number.Number
|
||||
|
||||
// timestamp indicates when this record was submitted.
|
||||
// this can be used to pick a winner when multiple
|
||||
// records contain lastValue data for the same labels due
|
||||
// to races.
|
||||
// timestamp indicates when this record was submitted. This can be
|
||||
// used to pick a winner when multiple records contain lastValue data
|
||||
// for the same attributes due to races.
|
||||
timestamp time.Time
|
||||
}
|
||||
)
|
||||
|
@ -88,7 +88,7 @@ func (f *benchFixture) fHistogram(name string) syncfloat64.Histogram {
|
||||
return ctr
|
||||
}
|
||||
|
||||
func makeLabels(n int) []attribute.KeyValue {
|
||||
func makeAttrs(n int) []attribute.KeyValue {
|
||||
used := map[string]bool{}
|
||||
l := make([]attribute.KeyValue, n)
|
||||
for i := 0; i < n; i++ {
|
||||
@ -105,10 +105,10 @@ func makeLabels(n int) []attribute.KeyValue {
|
||||
return l
|
||||
}
|
||||
|
||||
func benchmarkLabels(b *testing.B, n int) {
|
||||
func benchmarkAttrs(b *testing.B, n int) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(n)
|
||||
labs := makeAttrs(n)
|
||||
cnt := fix.iCounter("int64.sum")
|
||||
|
||||
b.ResetTimer()
|
||||
@ -118,40 +118,40 @@ func benchmarkLabels(b *testing.B, n int) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkInt64CounterAddWithLabels_1(b *testing.B) {
|
||||
benchmarkLabels(b, 1)
|
||||
func BenchmarkInt64CounterAddWithAttrs_1(b *testing.B) {
|
||||
benchmarkAttrs(b, 1)
|
||||
}
|
||||
|
||||
func BenchmarkInt64CounterAddWithLabels_2(b *testing.B) {
|
||||
benchmarkLabels(b, 2)
|
||||
func BenchmarkInt64CounterAddWithAttrs_2(b *testing.B) {
|
||||
benchmarkAttrs(b, 2)
|
||||
}
|
||||
|
||||
func BenchmarkInt64CounterAddWithLabels_4(b *testing.B) {
|
||||
benchmarkLabels(b, 4)
|
||||
func BenchmarkInt64CounterAddWithAttrs_4(b *testing.B) {
|
||||
benchmarkAttrs(b, 4)
|
||||
}
|
||||
|
||||
func BenchmarkInt64CounterAddWithLabels_8(b *testing.B) {
|
||||
benchmarkLabels(b, 8)
|
||||
func BenchmarkInt64CounterAddWithAttrs_8(b *testing.B) {
|
||||
benchmarkAttrs(b, 8)
|
||||
}
|
||||
|
||||
func BenchmarkInt64CounterAddWithLabels_16(b *testing.B) {
|
||||
benchmarkLabels(b, 16)
|
||||
func BenchmarkInt64CounterAddWithAttrs_16(b *testing.B) {
|
||||
benchmarkAttrs(b, 16)
|
||||
}
|
||||
|
||||
// Note: performance does not depend on label set size for the
|
||||
// benchmarks below--all are benchmarked for a single attribute.
|
||||
// Note: performance does not depend on attribute set size for the benchmarks
|
||||
// below--all are benchmarked for a single attribute.
|
||||
|
||||
// Iterators
|
||||
|
||||
var benchmarkIteratorVar attribute.KeyValue
|
||||
|
||||
func benchmarkIterator(b *testing.B, n int) {
|
||||
labels := attribute.NewSet(makeLabels(n)...)
|
||||
attrs := attribute.NewSet(makeAttrs(n)...)
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
iter := labels.Iter()
|
||||
iter := attrs.Iter()
|
||||
for iter.Next() {
|
||||
benchmarkIteratorVar = iter.Label()
|
||||
benchmarkIteratorVar = iter.Attribute()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -205,7 +205,7 @@ func BenchmarkGlobalInt64CounterAddWithSDK(b *testing.B) {
|
||||
func BenchmarkInt64CounterAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
labs := makeAttrs(1)
|
||||
cnt := fix.iCounter("int64.sum")
|
||||
|
||||
b.ResetTimer()
|
||||
@ -218,7 +218,7 @@ func BenchmarkInt64CounterAdd(b *testing.B) {
|
||||
func BenchmarkFloat64CounterAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
labs := makeAttrs(1)
|
||||
cnt := fix.fCounter("float64.sum")
|
||||
|
||||
b.ResetTimer()
|
||||
@ -233,7 +233,7 @@ func BenchmarkFloat64CounterAdd(b *testing.B) {
|
||||
func BenchmarkInt64LastValueAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
labs := makeAttrs(1)
|
||||
mea := fix.iHistogram("int64.lastvalue")
|
||||
|
||||
b.ResetTimer()
|
||||
@ -246,7 +246,7 @@ func BenchmarkInt64LastValueAdd(b *testing.B) {
|
||||
func BenchmarkFloat64LastValueAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
labs := makeAttrs(1)
|
||||
mea := fix.fHistogram("float64.lastvalue")
|
||||
|
||||
b.ResetTimer()
|
||||
@ -261,7 +261,7 @@ func BenchmarkFloat64LastValueAdd(b *testing.B) {
|
||||
func BenchmarkInt64HistogramAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
labs := makeAttrs(1)
|
||||
mea := fix.iHistogram("int64.histogram")
|
||||
|
||||
b.ResetTimer()
|
||||
@ -274,7 +274,7 @@ func BenchmarkInt64HistogramAdd(b *testing.B) {
|
||||
func BenchmarkFloat64HistogramAdd(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
labs := makeAttrs(1)
|
||||
mea := fix.fHistogram("float64.histogram")
|
||||
|
||||
b.ResetTimer()
|
||||
@ -304,7 +304,7 @@ func BenchmarkObserverRegistration(b *testing.B) {
|
||||
func BenchmarkGaugeObserverObservationInt64(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
labs := makeAttrs(1)
|
||||
ctr, _ := fix.meter.AsyncInt64().Counter("test.lastvalue")
|
||||
err := fix.meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
@ -324,7 +324,7 @@ func BenchmarkGaugeObserverObservationInt64(b *testing.B) {
|
||||
func BenchmarkGaugeObserverObservationFloat64(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(1)
|
||||
labs := makeAttrs(1)
|
||||
ctr, _ := fix.meter.AsyncFloat64().Counter("test.lastvalue")
|
||||
err := fix.meter.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
@ -343,11 +343,11 @@ func BenchmarkGaugeObserverObservationFloat64(b *testing.B) {
|
||||
|
||||
// BatchRecord
|
||||
|
||||
func benchmarkBatchRecord8Labels(b *testing.B, numInst int) {
|
||||
const numLabels = 8
|
||||
func benchmarkBatchRecord8Attrs(b *testing.B, numInst int) {
|
||||
const numAttrs = 8
|
||||
ctx := context.Background()
|
||||
fix := newFixture(b)
|
||||
labs := makeLabels(numLabels)
|
||||
labs := makeAttrs(numAttrs)
|
||||
var meas []syncint64.Counter
|
||||
|
||||
for i := 0; i < numInst; i++ {
|
||||
@ -363,20 +363,20 @@ func benchmarkBatchRecord8Labels(b *testing.B, numInst int) {
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkBatchRecord8Labels_1Instrument(b *testing.B) {
|
||||
benchmarkBatchRecord8Labels(b, 1)
|
||||
func BenchmarkBatchRecord8Attrs_1Instrument(b *testing.B) {
|
||||
benchmarkBatchRecord8Attrs(b, 1)
|
||||
}
|
||||
|
||||
func BenchmarkBatchRecord_8Labels_2Instruments(b *testing.B) {
|
||||
benchmarkBatchRecord8Labels(b, 2)
|
||||
func BenchmarkBatchRecord_8Attrs_2Instruments(b *testing.B) {
|
||||
benchmarkBatchRecord8Attrs(b, 2)
|
||||
}
|
||||
|
||||
func BenchmarkBatchRecord_8Labels_4Instruments(b *testing.B) {
|
||||
benchmarkBatchRecord8Labels(b, 4)
|
||||
func BenchmarkBatchRecord_8Attrs_4Instruments(b *testing.B) {
|
||||
benchmarkBatchRecord8Attrs(b, 4)
|
||||
}
|
||||
|
||||
func BenchmarkBatchRecord_8Labels_8Instruments(b *testing.B) {
|
||||
benchmarkBatchRecord8Labels(b, 8)
|
||||
func BenchmarkBatchRecord_8Attrs_8Instruments(b *testing.B) {
|
||||
benchmarkBatchRecord8Attrs(b, 8)
|
||||
}
|
||||
|
||||
// Record creation
|
||||
|
@ -188,7 +188,7 @@ func TestRecordNaN(t *testing.T) {
|
||||
require.Error(t, testHandler.Flush())
|
||||
}
|
||||
|
||||
func TestSDKLabelsDeduplication(t *testing.T) {
|
||||
func TestSDKAttrsDeduplication(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
meter, sdk, _, processor := newSDK(t)
|
||||
|
||||
@ -250,11 +250,11 @@ func TestSDKLabelsDeduplication(t *testing.T) {
|
||||
}
|
||||
|
||||
func newSetIter(kvs ...attribute.KeyValue) attribute.Iterator {
|
||||
labels := attribute.NewSet(kvs...)
|
||||
return labels.Iter()
|
||||
attrs := attribute.NewSet(kvs...)
|
||||
return attrs.Iter()
|
||||
}
|
||||
|
||||
func TestDefaultLabelEncoder(t *testing.T) {
|
||||
func TestDefaultAttributeEncoder(t *testing.T) {
|
||||
encoder := attribute.DefaultEncoder()
|
||||
|
||||
encoded := encoder.Encode(newSetIter(attribute.String("A", "B"), attribute.String("C", "D")))
|
||||
@ -266,8 +266,8 @@ func TestDefaultLabelEncoder(t *testing.T) {
|
||||
encoded = encoder.Encode(newSetIter(attribute.String(`\`, `=`), attribute.String(`,`, `\`)))
|
||||
require.Equal(t, `\,=\\,\\=\=`, encoded)
|
||||
|
||||
// Note: the label encoder does not sort or de-dup values,
|
||||
// that is done in Labels(...).
|
||||
// Note: the attr encoder does not sort or de-dup values,
|
||||
// that is done in Attributes(...).
|
||||
encoded = encoder.Encode(newSetIter(
|
||||
attribute.Int("I", 1),
|
||||
attribute.Int64("I64", 1),
|
||||
@ -490,9 +490,9 @@ func TestObserverBatch(t *testing.T) {
|
||||
}, processor.Values())
|
||||
}
|
||||
|
||||
// TestRecordPersistence ensures that a direct-called instrument that
|
||||
// is repeatedly used each interval results in a persistent record, so
|
||||
// that its encoded labels will be cached across collection intervals.
|
||||
// TestRecordPersistence ensures that a direct-called instrument that is
|
||||
// repeatedly used each interval results in a persistent record, so that its
|
||||
// encoded attribute will be cached across collection intervals.
|
||||
func TestRecordPersistence(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
meter, sdk, selector, _ := newSDK(t)
|
||||
|
@ -39,15 +39,15 @@ instrument callbacks.
|
||||
Internal Structure
|
||||
|
||||
Each observer also has its own kind of record stored in the SDK. This
|
||||
record contains a set of recorders for every specific label set used in the
|
||||
callback.
|
||||
record contains a set of recorders for every specific attribute set used in
|
||||
the callback.
|
||||
|
||||
A sync.Map maintains the mapping of current instruments and label sets to
|
||||
internal records. To find a record, the SDK consults the Map to
|
||||
locate an existing record, otherwise it constructs a new record. The SDK
|
||||
maintains a count of the number of references to each record, ensuring
|
||||
that records are not reclaimed from the Map while they are still active
|
||||
from the user's perspective.
|
||||
A sync.Map maintains the mapping of current instruments and attribute sets to
|
||||
internal records. To find a record, the SDK consults the Map to locate an
|
||||
existing record, otherwise it constructs a new record. The SDK maintains a
|
||||
count of the number of references to each record, ensuring that records are
|
||||
not reclaimed from the Map while they are still active from the user's
|
||||
perspective.
|
||||
|
||||
Metric collection is performed via a single-threaded call to Collect that
|
||||
sweeps through all records in the SDK, checkpointing their state. When a
|
||||
@ -106,11 +106,6 @@ Processor implementations are provided, the "defaultkeys" Processor groups
|
||||
aggregate metrics by their recommended Descriptor.Keys(), the
|
||||
"simple" Processor aggregates metrics at full dimensionality.
|
||||
|
||||
LabelEncoder is an optional optimization that allows an exporter to
|
||||
provide the serialization logic for labels. This allows avoiding
|
||||
duplicate serialization of labels, once as a unique key in the SDK (or
|
||||
Processor) and once in the exporter.
|
||||
|
||||
Reader is an interface between the Processor and the Exporter.
|
||||
After completing a collection pass, the Processor.Reader() method
|
||||
returns a Reader, which the Exporter uses to iterate over all
|
||||
@ -118,10 +113,7 @@ the updated metrics.
|
||||
|
||||
Record is a struct containing the state of an individual exported
|
||||
metric. This is the result of one collection interface for one
|
||||
instrument and one label set.
|
||||
|
||||
Labels is a struct containing an ordered set of labels, the
|
||||
corresponding unique encoding, and the encoder that produced it.
|
||||
instrument and one attribute set.
|
||||
|
||||
Exporter is the final stage of an export pipeline. It is called with
|
||||
a Reader capable of enumerating all the updated metrics.
|
||||
|
@ -64,12 +64,11 @@ type Processor interface {
|
||||
// disable metrics with active records.
|
||||
AggregatorSelector
|
||||
|
||||
// Process is called by the SDK once per internal record,
|
||||
// passing the export Accumulation (a Descriptor, the corresponding
|
||||
// Labels, and the checkpointed Aggregator). This call has no
|
||||
// Context argument because it is expected to perform only
|
||||
// computation. An SDK is not expected to call exporters from
|
||||
// with Process, use a controller for that (see
|
||||
// Process is called by the SDK once per internal record, passing the
|
||||
// export Accumulation (a Descriptor, the corresponding attributes, and
|
||||
// the checkpointed Aggregator). This call has no Context argument because
|
||||
// it is expected to perform only computation. An SDK is not expected to
|
||||
// call exporters from with Process, use a controller for that (see
|
||||
// ./controllers/{pull,push}.
|
||||
Process(accum Accumulation) error
|
||||
}
|
||||
@ -198,18 +197,18 @@ type Reader interface {
|
||||
// steps.
|
||||
type Metadata struct {
|
||||
descriptor *sdkapi.Descriptor
|
||||
labels *attribute.Set
|
||||
attrs *attribute.Set
|
||||
}
|
||||
|
||||
// Accumulation contains the exported data for a single metric instrument
|
||||
// and label set, as prepared by an Accumulator for the Processor.
|
||||
// and attribute set, as prepared by an Accumulator for the Processor.
|
||||
type Accumulation struct {
|
||||
Metadata
|
||||
aggregator aggregator.Aggregator
|
||||
}
|
||||
|
||||
// Record contains the exported data for a single metric instrument
|
||||
// and label set, as prepared by the Processor for the Exporter.
|
||||
// and attribute set, as prepared by the Processor for the Exporter.
|
||||
// This includes the effective start and end time for the aggregation.
|
||||
type Record struct {
|
||||
Metadata
|
||||
@ -223,21 +222,21 @@ func (m Metadata) Descriptor() *sdkapi.Descriptor {
|
||||
return m.descriptor
|
||||
}
|
||||
|
||||
// Labels describes the labels associated with the instrument and the
|
||||
// Attributes returns the attribute set associated with the instrument and the
|
||||
// aggregated data.
|
||||
func (m Metadata) Labels() *attribute.Set {
|
||||
return m.labels
|
||||
func (m Metadata) Attributes() *attribute.Set {
|
||||
return m.attrs
|
||||
}
|
||||
|
||||
// NewAccumulation allows Accumulator implementations to construct new
|
||||
// Accumulations to send to Processors. The Descriptor, Labels,
|
||||
// and Aggregator represent aggregate metric events received over a single
|
||||
// Accumulations to send to Processors. The Descriptor, attributes, and
|
||||
// Aggregator represent aggregate metric events received over a single
|
||||
// collection period.
|
||||
func NewAccumulation(descriptor *sdkapi.Descriptor, labels *attribute.Set, aggregator aggregator.Aggregator) Accumulation {
|
||||
func NewAccumulation(descriptor *sdkapi.Descriptor, attrs *attribute.Set, aggregator aggregator.Aggregator) Accumulation {
|
||||
return Accumulation{
|
||||
Metadata: Metadata{
|
||||
descriptor: descriptor,
|
||||
labels: labels,
|
||||
attrs: attrs,
|
||||
},
|
||||
aggregator: aggregator,
|
||||
}
|
||||
@ -249,14 +248,14 @@ func (r Accumulation) Aggregator() aggregator.Aggregator {
|
||||
return r.aggregator
|
||||
}
|
||||
|
||||
// NewRecord allows Processor implementations to construct export
|
||||
// records. The Descriptor, Labels, and Aggregator represent
|
||||
// aggregate metric events received over a single collection period.
|
||||
func NewRecord(descriptor *sdkapi.Descriptor, labels *attribute.Set, aggregation aggregation.Aggregation, start, end time.Time) Record {
|
||||
// NewRecord allows Processor implementations to construct export records.
|
||||
// The Descriptor, attributes, and Aggregator represent aggregate metric
|
||||
// events received over a single collection period.
|
||||
func NewRecord(descriptor *sdkapi.Descriptor, attrs *attribute.Set, aggregation aggregation.Aggregation, start, end time.Time) Record {
|
||||
return Record{
|
||||
Metadata: Metadata{
|
||||
descriptor: descriptor,
|
||||
labels: labels,
|
||||
attrs: attrs,
|
||||
},
|
||||
aggregation: aggregation,
|
||||
start: start,
|
||||
|
@ -28,24 +28,24 @@ var testSlice = []attribute.KeyValue{
|
||||
}
|
||||
|
||||
func newIter(slice []attribute.KeyValue) attribute.Iterator {
|
||||
labels := attribute.NewSet(slice...)
|
||||
return labels.Iter()
|
||||
attrs := attribute.NewSet(slice...)
|
||||
return attrs.Iter()
|
||||
}
|
||||
|
||||
func TestLabelIterator(t *testing.T) {
|
||||
func TestAttributeIterator(t *testing.T) {
|
||||
iter := newIter(testSlice)
|
||||
require.Equal(t, 2, iter.Len())
|
||||
|
||||
require.True(t, iter.Next())
|
||||
require.Equal(t, attribute.String("bar", "baz"), iter.Label())
|
||||
idx, kv := iter.IndexedLabel()
|
||||
require.Equal(t, attribute.String("bar", "baz"), iter.Attribute())
|
||||
idx, kv := iter.IndexedAttribute()
|
||||
require.Equal(t, 0, idx)
|
||||
require.Equal(t, attribute.String("bar", "baz"), kv)
|
||||
require.Equal(t, 2, iter.Len())
|
||||
|
||||
require.True(t, iter.Next())
|
||||
require.Equal(t, attribute.Int("foo", 42), iter.Label())
|
||||
idx, kv = iter.IndexedLabel()
|
||||
require.Equal(t, attribute.Int("foo", 42), iter.Attribute())
|
||||
idx, kv = iter.IndexedAttribute()
|
||||
require.Equal(t, 1, idx)
|
||||
require.Equal(t, attribute.Int("foo", 42), kv)
|
||||
require.Equal(t, 2, iter.Len())
|
||||
@ -54,7 +54,7 @@ func TestLabelIterator(t *testing.T) {
|
||||
require.Equal(t, 2, iter.Len())
|
||||
}
|
||||
|
||||
func TestEmptyLabelIterator(t *testing.T) {
|
||||
func TestEmptyAttributeIterator(t *testing.T) {
|
||||
iter := newIter(nil)
|
||||
require.Equal(t, 0, iter.Len())
|
||||
require.False(t, iter.Next())
|
||||
|
@ -37,7 +37,7 @@ type (
|
||||
// Measurement needs to be aligned for 64-bit atomic operations.
|
||||
Measurements []Measurement
|
||||
Ctx context.Context
|
||||
Labels []attribute.KeyValue
|
||||
Attributes []attribute.KeyValue
|
||||
Library Library
|
||||
}
|
||||
|
||||
|
@ -52,8 +52,8 @@ type (
|
||||
}
|
||||
|
||||
stateValue struct {
|
||||
// labels corresponds to the stateKey.distinct field.
|
||||
labels *attribute.Set
|
||||
// attrs corresponds to the stateKey.distinct field.
|
||||
attrs *attribute.Set
|
||||
|
||||
// updated indicates the last sequence number when this value had
|
||||
// Process() called by an accumulator.
|
||||
@ -167,7 +167,7 @@ func (b *Processor) Process(accum export.Accumulation) error {
|
||||
desc := accum.Descriptor()
|
||||
key := stateKey{
|
||||
descriptor: desc,
|
||||
distinct: accum.Labels().Equivalent(),
|
||||
distinct: accum.Attributes().Equivalent(),
|
||||
}
|
||||
agg := accum.Aggregator()
|
||||
|
||||
@ -177,7 +177,7 @@ func (b *Processor) Process(accum export.Accumulation) error {
|
||||
stateful := b.TemporalityFor(desc, agg.Aggregation().Kind()).MemoryRequired(desc.InstrumentKind())
|
||||
|
||||
newValue := &stateValue{
|
||||
labels: accum.Labels(),
|
||||
attrs: accum.Attributes(),
|
||||
updated: b.state.finishedCollection,
|
||||
stateful: stateful,
|
||||
current: agg,
|
||||
@ -230,7 +230,7 @@ func (b *Processor) Process(accum export.Accumulation) error {
|
||||
// indicating that the stateKey for Accumulation has already
|
||||
// been seen in the same collection. When this happens, it
|
||||
// implies that multiple Accumulators are being used, or that
|
||||
// a single Accumulator has been configured with a label key
|
||||
// a single Accumulator has been configured with a attribute key
|
||||
// filter.
|
||||
|
||||
if !sameCollection {
|
||||
@ -370,7 +370,7 @@ func (b *state) ForEach(exporter aggregation.TemporalitySelector, f func(export.
|
||||
|
||||
if err := f(export.NewRecord(
|
||||
key.descriptor,
|
||||
value.labels,
|
||||
value.attrs,
|
||||
agg,
|
||||
start,
|
||||
b.intervalEnd,
|
||||
|
@ -235,8 +235,8 @@ func testProcessor(
|
||||
exp := map[string]float64{}
|
||||
if hasMemory || !repetitionAfterEmptyInterval {
|
||||
exp = map[string]float64{
|
||||
fmt.Sprintf("inst1%s/L1=V/", instSuffix): float64(multiplier * 10), // labels1
|
||||
fmt.Sprintf("inst2%s/L2=V/", instSuffix): float64(multiplier * 10), // labels2
|
||||
fmt.Sprintf("inst1%s/L1=V/", instSuffix): float64(multiplier * 10), // attrs1
|
||||
fmt.Sprintf("inst2%s/L2=V/", instSuffix): float64(multiplier * 10), // attrs2
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,10 +16,10 @@ package basic // import "go.opentelemetry.io/otel/sdk/metric/processor/basic"
|
||||
|
||||
// config contains the options for configuring a basic metric processor.
|
||||
type config struct {
|
||||
// Memory controls whether the processor remembers metric
|
||||
// instruments and label sets that were previously reported.
|
||||
// When Memory is true, Reader.ForEach() will visit
|
||||
// metrics that were not updated in the most recent interval.
|
||||
// Memory controls whether the processor remembers metric instruments and
|
||||
// attribute sets that were previously reported. When Memory is true,
|
||||
// Reader.ForEach() will visit metrics that were not updated in the most
|
||||
// recent interval.
|
||||
Memory bool
|
||||
}
|
||||
|
||||
@ -27,10 +27,9 @@ type Option interface {
|
||||
applyProcessor(config) config
|
||||
}
|
||||
|
||||
// WithMemory sets the memory behavior of a Processor. If this is
|
||||
// true, the processor will report metric instruments and label sets
|
||||
// that were previously reported but not updated in the most recent
|
||||
// interval.
|
||||
// WithMemory sets the memory behavior of a Processor. If this is true, the
|
||||
// processor will report metric instruments and attribute sets that were
|
||||
// previously reported but not updated in the most recent interval.
|
||||
func WithMemory(memory bool) Option {
|
||||
return memoryOption(memory)
|
||||
}
|
||||
|
@ -34,27 +34,26 @@ import (
|
||||
)
|
||||
|
||||
type (
|
||||
// mapKey is the unique key for a metric, consisting of its
|
||||
// unique descriptor, distinct labels, and distinct resource
|
||||
// attributes.
|
||||
// mapKey is the unique key for a metric, consisting of its unique
|
||||
// descriptor, distinct attributes, and distinct resource attributes.
|
||||
mapKey struct {
|
||||
desc *sdkapi.Descriptor
|
||||
labels attribute.Distinct
|
||||
attrs attribute.Distinct
|
||||
resource attribute.Distinct
|
||||
}
|
||||
|
||||
// mapValue is value stored in a processor used to produce a
|
||||
// Reader.
|
||||
mapValue struct {
|
||||
labels *attribute.Set
|
||||
attrs *attribute.Set
|
||||
resource *resource.Resource
|
||||
aggregator aggregator.Aggregator
|
||||
}
|
||||
|
||||
// Output implements export.Reader.
|
||||
Output struct {
|
||||
m map[mapKey]mapValue
|
||||
labelEncoder attribute.Encoder
|
||||
m map[mapKey]mapValue
|
||||
attrEncoder attribute.Encoder
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
@ -120,7 +119,7 @@ func (f testFactory) NewCheckpointer() export.Checkpointer {
|
||||
// "counter.sum/A=1,B=2/R=V": 100,
|
||||
// }, processor.Values())
|
||||
//
|
||||
// Where in the example A=1,B=2 is the encoded labels and R=V is the
|
||||
// Where in the example A=1,B=2 is the encoded attributes and R=V is the
|
||||
// encoded resource value.
|
||||
func NewProcessor(selector export.AggregatorSelector, encoder attribute.Encoder) *Processor {
|
||||
return &Processor{
|
||||
@ -134,10 +133,10 @@ func (p *Processor) Process(accum export.Accumulation) error {
|
||||
return p.output.AddAccumulation(accum)
|
||||
}
|
||||
|
||||
// Values returns the mapping from label set to point values for the
|
||||
// accumulations that were processed. Point values are chosen as
|
||||
// either the Sum or the LastValue, whichever is implemented. (All
|
||||
// the built-in Aggregators implement one of these interfaces.)
|
||||
// Values returns the mapping from attribute set to point values for the
|
||||
// accumulations that were processed. Point values are chosen as either the
|
||||
// Sum or the LastValue, whichever is implemented. (All the built-in
|
||||
// Aggregators implement one of these interfaces.)
|
||||
func (p *Processor) Values() map[string]float64 {
|
||||
return p.output.Map()
|
||||
}
|
||||
@ -210,10 +209,10 @@ func (testAggregatorSelector) AggregatorFor(desc *sdkapi.Descriptor, aggPtrs ...
|
||||
// (from an Accumulator) or an expected set of Records (from a
|
||||
// Processor). If testing with an Accumulator, it may be simpler to
|
||||
// use the test Processor in this package.
|
||||
func NewOutput(labelEncoder attribute.Encoder) *Output {
|
||||
func NewOutput(attrEncoder attribute.Encoder) *Output {
|
||||
return &Output{
|
||||
m: make(map[mapKey]mapValue),
|
||||
labelEncoder: labelEncoder,
|
||||
m: make(map[mapKey]mapValue),
|
||||
attrEncoder: attrEncoder,
|
||||
}
|
||||
}
|
||||
|
||||
@ -222,7 +221,7 @@ func (o *Output) ForEach(_ aggregation.TemporalitySelector, ff func(export.Recor
|
||||
for key, value := range o.m {
|
||||
if err := ff(export.NewRecord(
|
||||
key.desc,
|
||||
value.labels,
|
||||
value.attrs,
|
||||
value.aggregator.Aggregation(),
|
||||
time.Time{},
|
||||
time.Time{},
|
||||
@ -248,7 +247,7 @@ func (o *Output) AddInstrumentationLibraryRecord(_ instrumentation.Library, rec
|
||||
func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource) error {
|
||||
key := mapKey{
|
||||
desc: rec.Descriptor(),
|
||||
labels: rec.Labels().Equivalent(),
|
||||
attrs: rec.Attributes().Equivalent(),
|
||||
resource: res.Equivalent(),
|
||||
}
|
||||
if _, ok := o.m[key]; !ok {
|
||||
@ -256,7 +255,7 @@ func (o *Output) AddRecordWithResource(rec export.Record, res *resource.Resource
|
||||
testAggregatorSelector{}.AggregatorFor(rec.Descriptor(), &agg)
|
||||
o.m[key] = mapValue{
|
||||
aggregator: agg,
|
||||
labels: rec.Labels(),
|
||||
attrs: rec.Attributes(),
|
||||
resource: res,
|
||||
}
|
||||
}
|
||||
@ -271,8 +270,8 @@ func (o *Output) Map() map[string]float64 {
|
||||
r := make(map[string]float64)
|
||||
err := o.ForEach(aggregation.StatelessTemporalitySelector(), func(record export.Record) error {
|
||||
for key, entry := range o.m {
|
||||
encoded := entry.labels.Encoded(o.labelEncoder)
|
||||
rencoded := entry.resource.Encoded(o.labelEncoder)
|
||||
encoded := entry.attrs.Encoded(o.attrEncoder)
|
||||
rencoded := entry.resource.Encoded(o.attrEncoder)
|
||||
value := 0.0
|
||||
if s, ok := entry.aggregator.(aggregation.Sum); ok {
|
||||
sum, _ := s.Sum()
|
||||
@ -308,7 +307,7 @@ func (o *Output) AddAccumulation(acc export.Accumulation) error {
|
||||
return o.AddRecord(
|
||||
export.NewRecord(
|
||||
acc.Descriptor(),
|
||||
acc.Labels(),
|
||||
acc.Attributes(),
|
||||
acc.Aggregator().Aggregation(),
|
||||
time.Time{},
|
||||
time.Time{},
|
||||
@ -323,7 +322,7 @@ func (o *Output) AddAccumulation(acc export.Accumulation) error {
|
||||
// "counter.sum/A=1,B=2/R=V": 100,
|
||||
// }, exporter.Values())
|
||||
//
|
||||
// Where in the example A=1,B=2 is the encoded labels and R=V is the
|
||||
// Where in the example A=1,B=2 is the encoded attributes and R=V is the
|
||||
// encoded resource value.
|
||||
func New(selector aggregation.TemporalitySelector, encoder attribute.Encoder) *Exporter {
|
||||
return &Exporter{
|
||||
@ -348,10 +347,10 @@ func (e *Exporter) Export(_ context.Context, res *resource.Resource, ckpt export
|
||||
})
|
||||
}
|
||||
|
||||
// Values returns the mapping from label set to point values for the
|
||||
// accumulations that were processed. Point values are chosen as
|
||||
// either the Sum or the LastValue, whichever is implemented. (All
|
||||
// the built-in Aggregators implement one of these interfaces.)
|
||||
// Values returns the mapping from attribute set to point values for the
|
||||
// accumulations that were processed. Point values are chosen as either the
|
||||
// Sum or the LastValue, whichever is implemented. (All the built-in
|
||||
// Aggregators implement one of these interfaces.)
|
||||
func (e *Exporter) Values() map[string]float64 {
|
||||
e.output.Lock()
|
||||
defer e.output.Unlock()
|
||||
|
@ -13,16 +13,16 @@
|
||||
// limitations under the License.
|
||||
|
||||
/*
|
||||
Package reducer implements a metrics Processor component to reduce labels.
|
||||
Package reducer implements a metrics Processor component to reduce attributes.
|
||||
|
||||
This package is currently in a pre-GA phase. Backwards incompatible changes
|
||||
may be introduced in subsequent minor version releases as we work to track the
|
||||
evolving OpenTelemetry specification and user feedback.
|
||||
|
||||
The metrics Processor component this package implements applies a
|
||||
`attribute.Filter` to each processed `export.Accumulation` to remove labels before
|
||||
passing the result to another Processor. This Processor can be used to reduce
|
||||
inherent dimensionality in the data, as a way to control the cost of
|
||||
The metrics Processor component this package implements applies an
|
||||
attribute.Filter to each processed export.Accumulation to remove attributes
|
||||
before passing the result to another Processor. This Processor can be used to
|
||||
reduce inherent dimensionality in the data, as a way to control the cost of
|
||||
collecting high cardinality metric data.
|
||||
|
||||
For example, to compose a push controller with a reducer and a basic
|
||||
@ -33,9 +33,9 @@ type someFilter struct{
|
||||
// ...
|
||||
}
|
||||
|
||||
func (someFilter) LabelFilterFor(_ *sdkapi.Descriptor) attribute.Filter {
|
||||
return func(label kv.KeyValue) bool {
|
||||
// return true to keep this label, false to drop this label
|
||||
func (someFilter) AttributeFilterFor(_ *sdkapi.Descriptor) attribute.Filter {
|
||||
return func(attr kv.KeyValue) bool {
|
||||
// return true to keep this attr, false to drop this attr.
|
||||
// ...
|
||||
}
|
||||
}
|
||||
|
@ -22,25 +22,25 @@ import (
|
||||
|
||||
type (
|
||||
// Processor implements "dimensionality reduction" by
|
||||
// filtering keys from export label sets.
|
||||
// filtering keys from export attribute sets.
|
||||
Processor struct {
|
||||
export.Checkpointer
|
||||
filterSelector LabelFilterSelector
|
||||
filterSelector AttributeFilterSelector
|
||||
}
|
||||
|
||||
// LabelFilterSelector is the interface used to configure a
|
||||
// specific Filter to an instrument.
|
||||
LabelFilterSelector interface {
|
||||
LabelFilterFor(descriptor *sdkapi.Descriptor) attribute.Filter
|
||||
// AttributeFilterSelector selects an attribute filter based on the
|
||||
// instrument described by the descriptor.
|
||||
AttributeFilterSelector interface {
|
||||
AttributeFilterFor(descriptor *sdkapi.Descriptor) attribute.Filter
|
||||
}
|
||||
)
|
||||
|
||||
var _ export.Processor = &Processor{}
|
||||
var _ export.Checkpointer = &Processor{}
|
||||
|
||||
// New returns a dimensionality-reducing Processor that passes data to
|
||||
// the next stage in an export pipeline.
|
||||
func New(filterSelector LabelFilterSelector, ckpter export.Checkpointer) *Processor {
|
||||
// New returns a dimensionality-reducing Processor that passes data to the
|
||||
// next stage in an export pipeline.
|
||||
func New(filterSelector AttributeFilterSelector, ckpter export.Checkpointer) *Processor {
|
||||
return &Processor{
|
||||
Checkpointer: ckpter,
|
||||
filterSelector: filterSelector,
|
||||
@ -49,10 +49,10 @@ func New(filterSelector LabelFilterSelector, ckpter export.Checkpointer) *Proces
|
||||
|
||||
// Process implements export.Processor.
|
||||
func (p *Processor) Process(accum export.Accumulation) error {
|
||||
// Note: the removed labels are returned and ignored here.
|
||||
// Note: the removed attributes are returned and ignored here.
|
||||
// Conceivably these inputs could be useful to a sampler.
|
||||
reduced, _ := accum.Labels().Filter(
|
||||
p.filterSelector.LabelFilterFor(
|
||||
reduced, _ := accum.Attributes().Filter(
|
||||
p.filterSelector.AttributeFilterFor(
|
||||
accum.Descriptor(),
|
||||
),
|
||||
)
|
||||
|
@ -48,9 +48,9 @@ var (
|
||||
|
||||
type testFilter struct{}
|
||||
|
||||
func (testFilter) LabelFilterFor(_ *sdkapi.Descriptor) attribute.Filter {
|
||||
return func(label attribute.KeyValue) bool {
|
||||
return label.Key == "A" || label.Key == "C"
|
||||
func (testFilter) AttributeFilterFor(_ *sdkapi.Descriptor) attribute.Filter {
|
||||
return func(attr attribute.KeyValue) bool {
|
||||
return attr.Key == "A" || attr.Key == "C"
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,8 +75,8 @@ type (
|
||||
instrument.Synchronous
|
||||
}
|
||||
|
||||
// mapkey uniquely describes a metric instrument in terms of
|
||||
// its InstrumentID and the encoded form of its labels.
|
||||
// mapkey uniquely describes a metric instrument in terms of its
|
||||
// InstrumentID and the encoded form of its attributes.
|
||||
mapkey struct {
|
||||
descriptor *sdkapi.Descriptor
|
||||
ordered attribute.Distinct
|
||||
@ -98,14 +98,12 @@ type (
|
||||
// supports checking for no updates during a round.
|
||||
collectedCount int64
|
||||
|
||||
// labels is the stored label set for this record,
|
||||
// except in cases where a label set is shared due to
|
||||
// batch recording.
|
||||
labels attribute.Set
|
||||
// attrs is the stored attribute set for this record, except in cases
|
||||
// where a attribute set is shared due to batch recording.
|
||||
attrs attribute.Set
|
||||
|
||||
// sortSlice has a single purpose - as a temporary
|
||||
// place for sorting during labels creation to avoid
|
||||
// allocation.
|
||||
// sortSlice has a single purpose - as a temporary place for sorting
|
||||
// during attributes creation to avoid allocation.
|
||||
sortSlice attribute.Sortable
|
||||
|
||||
// inst is a pointer to the corresponding instrument.
|
||||
@ -146,20 +144,20 @@ func (s *syncInstrument) Implementation() interface{} {
|
||||
}
|
||||
|
||||
// acquireHandle gets or creates a `*record` corresponding to `kvs`,
|
||||
// the input labels.
|
||||
// the input attributes.
|
||||
func (b *baseInstrument) acquireHandle(kvs []attribute.KeyValue) *record {
|
||||
|
||||
// This memory allocation may not be used, but it's
|
||||
// needed for the `sortSlice` field, to avoid an
|
||||
// allocation while sorting.
|
||||
rec := &record{}
|
||||
rec.labels = attribute.NewSetWithSortable(kvs, &rec.sortSlice)
|
||||
rec.attrs = attribute.NewSetWithSortable(kvs, &rec.sortSlice)
|
||||
|
||||
// Create lookup key for sync.Map (one allocation, as this
|
||||
// passes through an interface{})
|
||||
mk := mapkey{
|
||||
descriptor: &b.descriptor,
|
||||
ordered: rec.labels.Equivalent(),
|
||||
ordered: rec.attrs.Equivalent(),
|
||||
}
|
||||
|
||||
if actual, ok := b.meter.current.Load(mk); ok {
|
||||
@ -372,7 +370,7 @@ func (m *Accumulator) checkpointRecord(r *record) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
a := export.NewAccumulation(&r.inst.descriptor, &r.labels, r.checkpoint)
|
||||
a := export.NewAccumulation(&r.inst.descriptor, &r.attrs, r.checkpoint)
|
||||
err = m.processor.Process(a)
|
||||
if err != nil {
|
||||
otel.Handle(err)
|
||||
@ -405,7 +403,7 @@ func (r *record) unbind() {
|
||||
func (r *record) mapkey() mapkey {
|
||||
return mapkey{
|
||||
descriptor: &r.inst.descriptor,
|
||||
ordered: r.labels.Equivalent(),
|
||||
ordered: r.attrs.Equivalent(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,5 +79,5 @@ func (n noopInstrument) Descriptor() Descriptor {
|
||||
func (noopSyncInstrument) RecordOne(context.Context, number.Number, []attribute.KeyValue) {
|
||||
}
|
||||
|
||||
func (noopAsyncInstrument) ObserveOne(ctx context.Context, number number.Number, labels []attribute.KeyValue) {
|
||||
func (noopAsyncInstrument) ObserveOne(context.Context, number.Number, []attribute.KeyValue) {
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ type SyncImpl interface {
|
||||
instrument.Synchronous
|
||||
|
||||
// RecordOne captures a single synchronous metric event.
|
||||
RecordOne(ctx context.Context, number number.Number, labels []attribute.KeyValue)
|
||||
RecordOne(ctx context.Context, number number.Number, attrs []attribute.KeyValue)
|
||||
}
|
||||
|
||||
// AsyncImpl is an implementation-level interface to an
|
||||
@ -68,7 +68,7 @@ type AsyncImpl interface {
|
||||
instrument.Asynchronous
|
||||
|
||||
// ObserveOne captures a single synchronous metric event.
|
||||
ObserveOne(ctx context.Context, number number.Number, labels []attribute.KeyValue)
|
||||
ObserveOne(ctx context.Context, number number.Number, attrs []attribute.KeyValue)
|
||||
}
|
||||
|
||||
// AsyncRunner is expected to convert into an AsyncSingleRunner or an
|
||||
|
@ -25,7 +25,7 @@ import (
|
||||
|
||||
const conflict = 0.5
|
||||
|
||||
func makeLabels(n int) (_, _ *resource.Resource) {
|
||||
func makeAttrs(n int) (_, _ *resource.Resource) {
|
||||
used := map[string]bool{}
|
||||
l1 := make([]attribute.KeyValue, n)
|
||||
l2 := make([]attribute.KeyValue, n)
|
||||
@ -51,7 +51,7 @@ func makeLabels(n int) (_, _ *resource.Resource) {
|
||||
}
|
||||
|
||||
func benchmarkMergeResource(b *testing.B, size int) {
|
||||
r1, r2 := makeLabels(size)
|
||||
r1, r2 := makeAttrs(size)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
|
@ -194,7 +194,7 @@ func Merge(a, b *Resource) (*Resource, error) {
|
||||
mi := attribute.NewMergeIterator(b.Set(), a.Set())
|
||||
combine := make([]attribute.KeyValue, 0, a.Len()+b.Len())
|
||||
for mi.Next() {
|
||||
combine = append(combine, mi.Label())
|
||||
combine = append(combine, mi.Attribute())
|
||||
}
|
||||
merged := NewWithAttributes(schemaURL, combine...)
|
||||
return merged, nil
|
||||
|
@ -1009,13 +1009,13 @@ func protoToInts(proto string) (int, int) {
|
||||
func kvStr(kvs []attribute.KeyValue) string {
|
||||
sb := strings.Builder{}
|
||||
sb.WriteRune('[')
|
||||
for idx, label := range kvs {
|
||||
for idx, attr := range kvs {
|
||||
if idx > 0 {
|
||||
sb.WriteString(", ")
|
||||
}
|
||||
sb.WriteString((string)(label.Key))
|
||||
sb.WriteString((string)(attr.Key))
|
||||
sb.WriteString(": ")
|
||||
sb.WriteString(label.Value.Emit())
|
||||
sb.WriteString(attr.Value.Emit())
|
||||
}
|
||||
sb.WriteRune(']')
|
||||
return sb.String()
|
||||
|
Loading…
x
Reference in New Issue
Block a user